ticker/ticker-update/src/main.rs

308 lines
9.7 KiB
Rust
Raw Normal View History

2021-05-24 23:25:24 +02:00
use std::str::FromStr;
2019-10-10 18:42:49 +02:00
use std::mem::replace;
2019-10-06 23:28:39 +02:00
use std::io::Read;
2019-10-10 04:09:14 +02:00
use std::sync::RwLock;
2019-10-11 21:16:33 +02:00
use std::collections::{HashMap, VecDeque};
2021-05-24 23:25:24 +02:00
use chrono::{Duration, offset::Utc};
2019-10-10 02:58:12 +02:00
use reqwest::header::{IF_NONE_MATCH, IF_MODIFIED_SINCE, ETAG, LAST_MODIFIED, USER_AGENT};
2019-10-10 04:09:14 +02:00
use diesel::{Connection, pg::PgConnection, prelude::*};
2021-05-24 23:25:24 +02:00
use rrule::RRule;
2019-10-06 23:28:39 +02:00
2019-10-26 00:19:25 +02:00
use libticker::{
config::{Config, CalendarOptions},
schema::{self, calendars::dsl::calendars},
model::{Calendar, NewCalendar, Event},
ics::{Parser, Object, Timestamp, GetValue},
};
2019-10-10 17:39:34 +02:00
2019-10-10 18:42:49 +02:00
fn extract_vevent_objs(results: &mut Vec<Object>, mut obj: Object) {
let children = replace(&mut obj.children, vec![]);
if obj.name == "VEVENT" {
results.push(obj);
}
for child in children.into_iter() {
extract_vevent_objs(results, *child);
}
}
2021-05-24 23:25:24 +02:00
const RRULE_LOOKBACK: i64 = 30;
const RRULE_LOOKAHEAD: i64 = 366;
fn obj_to_events(calendar: String, obj: &Object) -> Vec<Event> {
2019-10-10 17:39:34 +02:00
if obj.name != "VEVENT" {
2021-05-24 23:25:24 +02:00
return vec![];
2019-10-10 17:39:34 +02:00
}
let dtstart: Timestamp = match obj.get("DTSTART") {
Some(dtstart) => dtstart,
None => return vec![],
2021-05-24 23:25:24 +02:00
};
2019-10-11 21:43:24 +02:00
let dtstart = dtstart.or_hms(0, 0, 0);
let dtend: Option<Timestamp> = obj.get("DTEND");
let dtend = dtend.map(|time| time.or_hms(23, 59, 59));
2021-05-24 23:25:24 +02:00
let summary: &str = match obj.get("SUMMARY") {
Some(summary) => summary,
None => return vec![],
};
let uid = obj.get("UID").unwrap_or("");
let dtstamp = obj.get("DTSTAMP").unwrap_or("");
let location = obj.get("LOCATION");
let url = obj.get("URL");
2021-05-26 20:46:26 +02:00
let generate_event = |dtstart, recurrence| {
2021-05-24 23:25:24 +02:00
let id = format!("{}{}{}", uid, dtstart, dtstamp);
Event {
calendar: calendar.clone(),
id,
dtstart,
dtend,
summary: summary.to_owned(),
location: location.clone(),
url: url.clone(),
2021-05-26 20:46:26 +02:00
recurrence,
2021-05-24 23:25:24 +02:00
}
};
match RRule::from_str(&format!("{}", obj)) {
Ok(rrule) => {
2021-05-24 23:25:24 +02:00
let now = Utc::now();
let start = now - Duration::days(RRULE_LOOKBACK);
let end = now + Duration::days(RRULE_LOOKAHEAD);
rrule.into_iter()
.skip_while(|d| *d < start)
.take_while(|d| *d <= end)
.map(|dtstart| dtstart.naive_utc())
2021-05-26 20:46:26 +02:00
.map(|dtstart| generate_event(dtstart, true))
2021-05-24 23:25:24 +02:00
.collect()
}
Err(e) => {
println!("Error parsing RRULE: {}", e);
vec![generate_event(dtstart, false)]
}
2021-05-24 23:25:24 +02:00
}
2019-10-10 17:39:34 +02:00
}
2019-10-06 23:28:39 +02:00
2019-10-10 02:37:24 +02:00
pub struct Resources {
db_url: String,
http_client: reqwest::Client,
queue: RwLock<VecDeque<(String, CalendarOptions)>>,
}
impl Resources {
pub fn new(db_url: String, queue: VecDeque<(String, CalendarOptions)>) -> Self {
let http_client = reqwest::Client::new();
let queue = RwLock::new(queue);
2019-10-10 04:09:14 +02:00
Resources { db_url, http_client, queue }
2019-10-10 02:37:24 +02:00
}
fn next(&self) -> Option<(String, CalendarOptions)> {
let mut queue = self.queue.write().unwrap();
queue.pop_front()
}
fn worker_fetch(&self, cal_id: &str, cal_opts: &CalendarOptions, etag: Option<String>, last_modified: Option<String>)
-> Result<(Option<String>, Option<String>, HashMap<String, Event>), Error>
{
let mut req = self.http_client.get(&cal_opts.url)
.header(USER_AGENT, "Ticker/0.0.0");
match etag {
Some(etag) => req = req.header(IF_NONE_MATCH, etag),
None => (),
}
match last_modified {
Some(last_modified) => req = req.header(IF_MODIFIED_SINCE, last_modified),
None => (),
}
let mut res = req.send()?;
if res.status() != 200 {
let msg = format!("HTTP {}", res.status());
return Err(Error::Message(msg));
}
let etag = res.headers().get(ETAG)
.and_then(|v| v.to_str().ok())
.map(|s| s.to_owned());
let last_modified = res.headers().get(LAST_MODIFIED)
.and_then(|v| v.to_str().ok())
.map(|s| s.to_owned());
let mut p = Parser::new();
let mut buf = [0; 1024];
let mut events = HashMap::new();
loop {
match res.read(&mut buf)? {
len if len > 0 => {
let data = &buf[..len];
p.feed(data, |obj| {
let mut objs = vec![];
extract_vevent_objs(&mut objs, obj);
for obj in objs {
2021-05-24 23:25:24 +02:00
for event in obj_to_events(cal_id.to_owned(), &obj) {
events.insert(event.id.clone(), event);
}
}
});
}
_ => break,
}
}
Ok((etag, last_modified, events))
}
2019-10-11 22:15:38 +02:00
fn worker_job(&self, db: &PgConnection) -> Result<bool, Error> {
let (cal_id, cal_opts) = match self.next() {
2019-10-11 22:15:38 +02:00
Some(next) => next,
None => return Ok(true),
};
2019-10-10 02:37:24 +02:00
2019-10-11 22:15:38 +02:00
let (etag, last_modified) = db.transaction::<_, Error, _>(|| {
let cals = calendars
.filter(schema::calendars::dsl::id.eq(cal_id.clone()))
2019-10-11 22:15:38 +02:00
.limit(1)
.load::<Calendar>(db)?;
match cals.first() {
None => {
let cal = NewCalendar {
id: cal_id.clone(),
2019-10-11 22:15:38 +02:00
url: cal_opts.url.clone(),
last_fetch: Some(Utc::now().naive_utc()),
};
diesel::insert_into(calendars)
.values(&cal)
.execute(db)?;
Ok((None, None))
}
Some(cal) => {
diesel::update(calendars)
.filter(schema::calendars::dsl::id.eq(cal_id.clone()))
2019-10-11 22:15:38 +02:00
.set(schema::calendars::dsl::last_fetch.eq(Utc::now().naive_utc()))
.execute(db)?;
let result =
// Use ETag/Last-Modified only if URL hasn't changed
if cal.url == cal_opts.url {
(cal.etag.clone(), cal.last_modified.clone())
} else {
(None, None)
2019-10-10 04:09:14 +02:00
};
2019-10-11 22:15:38 +02:00
Ok(result)
2019-10-10 02:37:24 +02:00
}
2019-10-06 23:28:39 +02:00
}
2019-10-11 22:15:38 +02:00
})?;
let (etag, last_modified, events) =
self.worker_fetch(&cal_id, &cal_opts, etag, last_modified)
.map_err(|e| {
2019-10-11 22:46:05 +02:00
let msg = format!("{}", e);
println!("[{}] {}", cal_id, msg);
let _ = diesel::update(calendars)
.filter(schema::calendars::dsl::id.eq(cal_id.clone()))
.set(schema::calendars::dsl::error_message.eq(msg))
.execute(db);
e
})?;
2019-10-11 22:15:38 +02:00
println!("[{}] {} events", cal_id, events.len());
2019-10-11 22:15:38 +02:00
db.transaction::<_, Error, _>(|| {
diesel::update(calendars)
.filter(schema::calendars::dsl::id.eq(cal_id.clone()))
2019-10-11 22:15:38 +02:00
.set((schema::calendars::dsl::last_success.eq(Utc::now().naive_utc()),
schema::calendars::dsl::etag.eq(etag),
schema::calendars::dsl::last_modified.eq(last_modified),
schema::calendars::dsl::error_message.eq(None as Option<String>)))
2019-10-11 22:15:38 +02:00
.execute(db)?;
2019-10-10 02:37:24 +02:00
2019-10-11 22:15:38 +02:00
diesel::delete(schema::events::dsl::events)
.filter(schema::events::dsl::calendar.eq(cal_id))
2019-10-11 22:15:38 +02:00
.execute(db)?;
for event in events.values() {
diesel::insert_into(schema::events::dsl::events)
.values(event)
.execute(db)?;
}
Ok(())
2019-10-11 22:15:38 +02:00
})?;
Ok(false)
}
fn worker_loop(&self) -> Result<(), Error> {
let db = PgConnection::establish(&self.db_url)?;
let mut done = false;
while ! done {
done = self.worker_job(&db)
2019-10-11 22:46:05 +02:00
.map_err(|e| println!("{}", e))
2019-10-11 22:15:38 +02:00
.unwrap_or(false);
2019-10-10 04:09:14 +02:00
}
2019-10-11 22:15:38 +02:00
Ok(())
2019-10-06 23:28:39 +02:00
}
2019-10-10 02:37:24 +02:00
}
#[derive(Debug)]
pub enum Error {
2019-10-10 04:09:14 +02:00
DbConnection(diesel::ConnectionError),
Db(diesel::result::Error),
2019-10-10 02:37:24 +02:00
Http(reqwest::Error),
Io(std::io::Error),
Message(String),
2019-10-10 02:37:24 +02:00
}
2019-10-10 04:09:14 +02:00
impl From<diesel::ConnectionError> for Error {
fn from(e: diesel::ConnectionError) -> Self {
Error::DbConnection(e)
}
}
impl From<diesel::result::Error> for Error {
fn from(e: diesel::result::Error) -> Self {
2019-10-10 02:37:24 +02:00
Error::Db(e)
}
}
2019-10-06 23:28:39 +02:00
2019-10-10 02:37:24 +02:00
impl From<reqwest::Error> for Error {
fn from(e: reqwest::Error) -> Self {
Error::Http(e)
}
}
impl From<std::io::Error> for Error {
fn from(e: std::io::Error) -> Self {
Error::Io(e)
}
2019-10-06 23:28:39 +02:00
}
2019-10-11 22:46:05 +02:00
impl std::fmt::Display for Error {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
match self {
Error::DbConnection(e) => e.fmt(fmt),
Error::Db(e) => e.fmt(fmt),
Error::Http(e) => e.fmt(fmt),
Error::Io(e) => e.fmt(fmt),
Error::Message(e) => e.fmt(fmt),
}
}
}
2019-10-06 23:28:39 +02:00
fn main() {
2020-10-27 19:20:34 +01:00
let config = Config::read_yaml_file("config.yaml");
2019-10-10 02:37:24 +02:00
let res = Resources::new(
config.db_url,
config.calendars.into_iter()
.collect()
);
crossbeam::scope(|s| {
let cpus = num_cpus::get();
let workers: Vec<_> = (0..cpus)
.map(|_| s.spawn(|_| {
2019-10-11 22:15:38 +02:00
res.worker_loop()
2019-10-10 02:37:24 +02:00
}))
.collect();
for worker in workers.into_iter() {
2019-10-11 22:46:43 +02:00
worker.join().expect("worker join").expect("worker");
2019-10-10 02:37:24 +02:00
}
}).unwrap();
2019-10-06 23:28:39 +02:00
}