use std::io::Read; use std::fs::read_to_string; use std::sync::{Mutex, RwLock}; use std::collections::VecDeque; use rustorm::{pool::Pool, DbError}; use chrono::offset::Utc; use reqwest::header::{IF_NONE_MATCH, IF_MODIFIED_SINCE, ETAG, LAST_MODIFIED, USER_AGENT}; mod config; use config::{Config, CalendarOptions}; mod schema; use schema::{new, Calendar, Event}; mod transaction; use transaction::{Transaction, QuerySql}; mod ics; use ics::Parser; pub struct Resources { db_pool: Mutex, db_url: String, http_client: reqwest::Client, queue: RwLock>, } impl Resources { pub fn new(db_url: String, queue: VecDeque<(String, CalendarOptions)>) -> Self { let db_pool = Mutex::new(Pool::new()); let http_client = reqwest::Client::new(); let queue = RwLock::new(queue); Resources { db_pool, db_url, http_client, queue } } fn transaction(&self) -> Result { let em = self.db_pool.lock() .unwrap() .em(&self.db_url)?; Transaction::new(em) } fn next(&self) -> Option<(String, CalendarOptions)> { let mut queue = self.queue.write().unwrap(); queue.pop_front() } fn worker(&self) -> Result<(), Error> { let (id, cal_opts) = match self.next() { Some(next) => next, None => return Ok(()), }; let (etag, last_modified) = { let tx = self.transaction()?; let cal: Option = tx.query("SELECT * FROM calendar WHERE id=$1", &[&id])?; let result = match cal { None => { tx.insert(&[&new::Calendar { id: id.clone(), url: cal_opts.url.clone(), last_fetch: Utc::now(), }])?; (None, None) } Some(cal) => { tx.exec("UPDATE calendar SET last_fetch=$2 WHERE id=$1", &[&id, &Utc::now()])?; (cal.etag, cal.last_modified) } }; tx.commit()?; result }; let mut req = self.http_client.get(&cal_opts.url) .header(USER_AGENT, "Ticker/0.0.0"); match etag { Some(etag) => req = req.header(IF_NONE_MATCH, etag), None => (), } match last_modified { Some(last_modified) => req = req.header(IF_MODIFIED_SINCE, last_modified), None => (), } let mut res = req.send()?; println!("{} {}", res.status(), cal_opts.url); if res.status() != 200 { let tx = self.transaction()?; let msg = format!("HTTP {}", res.status()); tx.exec("UPDATE calendar SET last_success=$2, error_message=$3 WHERE id=$1", &[&id, &Utc::now(), &msg])?; tx.commit()?; return Ok(()) } let etag = res.headers().get(ETAG) .and_then(|v| v.to_str().ok()); let last_modified = res.headers().get(LAST_MODIFIED) .and_then(|v| v.to_str().ok()); let tx = self.transaction()?; tx.exec("UPDATE calendar SET last_success=$2, error_message=NULL, etag=$3, last_modified=$4 WHERE id=$1", &[&id, &Utc::now(), &etag, &last_modified])?; let mut p = Parser::new(); let mut buf = [0; 1024]; loop { match res.read(&mut buf)? { len if len > 0 => { let data = &buf[..len]; p.feed(data, |obj| { println!("- [{}] {}", obj.get("DTSTART").unwrap_or("?"), obj.get("SUMMARY").unwrap_or("?")); print!(" {}", obj.get("LOCATION").unwrap_or("?")); obj.get("URL").map(|url| print!(" <{}>", url)); println!(""); }); } _ => break, } } tx.commit()?; Ok(()) } } #[derive(Debug)] pub enum Error { Db(DbError), Http(reqwest::Error), Io(std::io::Error), } impl From for Error { fn from(e: DbError) -> Self { Error::Db(e) } } impl From for Error { fn from(e: reqwest::Error) -> Self { Error::Http(e) } } impl From for Error { fn from(e: std::io::Error) -> Self { Error::Io(e) } } fn main() { let config_file = read_to_string("config.yaml") .expect("config.yaml"); let config: Config = serde_yaml::from_str(&config_file) .expect("config"); let res = Resources::new( config.db_url, config.calendars.into_iter() .collect() ); crossbeam::scope(|s| { let cpus = num_cpus::get(); let workers: Vec<_> = (0..cpus) .map(|_| s.spawn(|_| { res.worker() .map_err(|e| println!("{:?}", e)) })) .collect(); for worker in workers.into_iter() { worker.join().expect("worker").unwrap(); } }).unwrap(); }