use std::str::FromStr; use std::mem::replace; use std::io::Read; use std::sync::RwLock; use std::collections::{HashMap, VecDeque}; use chrono::{Duration, offset::Utc}; use reqwest::header::{IF_NONE_MATCH, IF_MODIFIED_SINCE, ETAG, LAST_MODIFIED, USER_AGENT}; use diesel::{Connection, pg::PgConnection, prelude::*}; use rrule::RRule; use libticker::{ config::{Config, CalendarOptions}, schema::{self, calendars::dsl::calendars}, model::{Calendar, NewCalendar, Event}, ics::{Parser, Object, Timestamp, GetValue}, }; fn extract_vevent_objs(results: &mut Vec, mut obj: Object) { let children = replace(&mut obj.children, vec![]); if obj.name == "VEVENT" { results.push(obj); } for child in children.into_iter() { extract_vevent_objs(results, *child); } } const RRULE_LOOKBACK: i64 = 30; const RRULE_LOOKAHEAD: i64 = 366; fn obj_to_events(calendar: String, obj: &Object) -> Vec { if obj.name != "VEVENT" { return vec![]; } let dtstart: Timestamp = match obj.get("DTSTART") { Some(dtstart) => dtstart, None => return vec![], }; let dtstart = dtstart.or_hms(0, 0, 0); let dtend: Option = obj.get("DTEND"); let dtend = dtend.map(|time| time.or_hms(23, 59, 59)); let summary: &str = match obj.get("SUMMARY") { Some(summary) => summary, None => return vec![], }; let uid = obj.get("UID").unwrap_or(""); let dtstamp = obj.get("DTSTAMP").unwrap_or(""); let location = obj.get("LOCATION"); let url = obj.get("URL"); let generate_event = |dtstart, recurrence| { let id = format!("{}{}{}", uid, dtstart, dtstamp); Event { calendar: calendar.clone(), id, dtstart, dtend, summary: summary.to_owned(), location: location.clone(), url: url.clone(), recurrence, } }; match RRule::from_str(&obj.fmt_rrule().unwrap()) { Ok(rrule) => { let now = Utc::now(); let start = now - Duration::days(RRULE_LOOKBACK); let end = now + Duration::days(RRULE_LOOKAHEAD); rrule.into_iter() .skip_while(|d| *d < start) .take_while(|d| *d <= end) .map(|dtstart| dtstart.naive_local()) .map(|dtstart| generate_event(dtstart, true)) .collect() } Err(_) => { vec![generate_event(dtstart, false)] } } } pub struct Resources { db_url: String, http_client: reqwest::Client, queue: RwLock>, } impl Resources { pub fn new(db_url: String, queue: VecDeque<(String, CalendarOptions)>) -> Self { let http_client = reqwest::Client::new(); let queue = RwLock::new(queue); Resources { db_url, http_client, queue } } fn next(&self) -> Option<(String, CalendarOptions)> { let mut queue = self.queue.write().unwrap(); queue.pop_front() } fn worker_fetch(&self, cal_id: &str, cal_opts: &CalendarOptions, etag: Option, last_modified: Option) -> Result<(Option, Option, HashMap), Error> { let mut req = self.http_client.get(&cal_opts.url) .header(USER_AGENT, "Ticker/0.0.0"); match etag { Some(etag) => req = req.header(IF_NONE_MATCH, etag), None => (), } match last_modified { Some(last_modified) => req = req.header(IF_MODIFIED_SINCE, last_modified), None => (), } let mut res = req.send()?; if res.status() != 200 { let msg = format!("HTTP {}", res.status()); return Err(Error::Message(msg)); } let etag = res.headers().get(ETAG) .and_then(|v| v.to_str().ok()) .map(|s| s.to_owned()); let last_modified = res.headers().get(LAST_MODIFIED) .and_then(|v| v.to_str().ok()) .map(|s| s.to_owned()); let mut p = Parser::new(); let mut buf = [0; 1024]; let mut events = HashMap::new(); loop { match res.read(&mut buf)? { len if len > 0 => { let data = &buf[..len]; p.feed(data, |obj| { let mut objs = vec![]; extract_vevent_objs(&mut objs, obj); for obj in objs { for event in obj_to_events(cal_id.to_owned(), &obj) { events.insert(event.id.clone(), event); } } }); } _ => break, } } Ok((etag, last_modified, events)) } fn worker_job(&self, db: &PgConnection) -> Result { let (cal_id, cal_opts) = match self.next() { Some(next) => next, None => return Ok(true), }; let (etag, last_modified) = db.transaction::<_, Error, _>(|| { let cals = calendars .filter(schema::calendars::dsl::id.eq(cal_id.clone())) .limit(1) .load::(db)?; match cals.first() { None => { let cal = NewCalendar { id: cal_id.clone(), url: cal_opts.url.clone(), last_fetch: Some(Utc::now().naive_utc()), }; diesel::insert_into(calendars) .values(&cal) .execute(db)?; Ok((None, None)) } Some(cal) => { diesel::update(calendars) .filter(schema::calendars::dsl::id.eq(cal_id.clone())) .set(schema::calendars::dsl::last_fetch.eq(Utc::now().naive_utc())) .execute(db)?; let result = // Use ETag/Last-Modified only if URL hasn't changed if cal.url == cal_opts.url { (cal.etag.clone(), cal.last_modified.clone()) } else { (None, None) }; Ok(result) } } })?; let (etag, last_modified, events) = self.worker_fetch(&cal_id, &cal_opts, etag, last_modified) .map_err(|e| { let msg = format!("{}", e); println!("[{}] {}", cal_id, msg); let _ = diesel::update(calendars) .filter(schema::calendars::dsl::id.eq(cal_id.clone())) .set(schema::calendars::dsl::error_message.eq(msg)) .execute(db); e })?; println!("[{}] {} events", cal_id, events.len()); db.transaction::<_, Error, _>(|| { diesel::update(calendars) .filter(schema::calendars::dsl::id.eq(cal_id.clone())) .set((schema::calendars::dsl::last_success.eq(Utc::now().naive_utc()), schema::calendars::dsl::etag.eq(etag), schema::calendars::dsl::last_modified.eq(last_modified), schema::calendars::dsl::error_message.eq(None as Option))) .execute(db)?; diesel::delete(schema::events::dsl::events) .filter(schema::events::dsl::calendar.eq(cal_id)) .execute(db)?; for event in events.values() { diesel::insert_into(schema::events::dsl::events) .values(event) .execute(db)?; } Ok(()) })?; Ok(false) } fn worker_loop(&self) -> Result<(), Error> { let db = PgConnection::establish(&self.db_url)?; let mut done = false; while ! done { done = self.worker_job(&db) .map_err(|e| println!("{}", e)) .unwrap_or(false); } Ok(()) } } #[derive(Debug)] pub enum Error { DbConnection(diesel::ConnectionError), Db(diesel::result::Error), Http(reqwest::Error), Io(std::io::Error), Message(String), } impl From for Error { fn from(e: diesel::ConnectionError) -> Self { Error::DbConnection(e) } } impl From for Error { fn from(e: diesel::result::Error) -> Self { Error::Db(e) } } impl From for Error { fn from(e: reqwest::Error) -> Self { Error::Http(e) } } impl From for Error { fn from(e: std::io::Error) -> Self { Error::Io(e) } } impl std::fmt::Display for Error { fn fmt(&self, fmt: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { match self { Error::DbConnection(e) => e.fmt(fmt), Error::Db(e) => e.fmt(fmt), Error::Http(e) => e.fmt(fmt), Error::Io(e) => e.fmt(fmt), Error::Message(e) => e.fmt(fmt), } } } fn main() { let config = Config::read_yaml_file("config.yaml"); let res = Resources::new( config.db_url, config.calendars.into_iter() .collect() ); crossbeam::scope(|s| { let cpus = num_cpus::get(); let workers: Vec<_> = (0..cpus) .map(|_| s.spawn(|_| { res.worker_loop() })) .collect(); for worker in workers.into_iter() { worker.join().expect("worker join").expect("worker"); } }).unwrap(); }