Web-based Calendar Aggregator
https://ticker.c3d2.de/
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
279 lines
9.0 KiB
279 lines
9.0 KiB
use std::mem::replace; |
|
use std::io::Read; |
|
use std::sync::RwLock; |
|
use std::collections::{HashMap, VecDeque}; |
|
use chrono::offset::Utc; |
|
use reqwest::header::{IF_NONE_MATCH, IF_MODIFIED_SINCE, ETAG, LAST_MODIFIED, USER_AGENT}; |
|
use diesel::{Connection, pg::PgConnection, prelude::*}; |
|
|
|
use libticker::{ |
|
config::{Config, CalendarOptions}, |
|
schema::{self, calendars::dsl::calendars}, |
|
model::{Calendar, NewCalendar, Event}, |
|
ics::{Parser, Object, Timestamp, GetValue}, |
|
}; |
|
|
|
fn extract_vevent_objs(results: &mut Vec<Object>, mut obj: Object) { |
|
let children = replace(&mut obj.children, vec![]); |
|
if obj.name == "VEVENT" { |
|
results.push(obj); |
|
} |
|
for child in children.into_iter() { |
|
extract_vevent_objs(results, *child); |
|
} |
|
} |
|
|
|
fn obj_to_event(calendar: String, obj: &Object) -> Option<Event> { |
|
if obj.name != "VEVENT" { |
|
return None; |
|
} |
|
|
|
let dtstart: Timestamp = obj.get("DTSTART")?; |
|
let dtstart = dtstart.or_hms(0, 0, 0); |
|
let dtend: Option<Timestamp> = obj.get("DTEND"); |
|
let dtend = dtend.map(|time| time.or_hms(23, 59, 59)); |
|
let summary = obj.get("SUMMARY")?; |
|
let id = format!("{}{}{}{}", |
|
obj.get("UID").unwrap_or(""), |
|
dtstart, |
|
obj.get("DTSTAMP").unwrap_or(""), |
|
obj.get("RECURRENCE-ID").unwrap_or("")); |
|
// TODO: DESCRIPTION |
|
Some(Event { |
|
calendar, id, |
|
dtstart, |
|
dtend, |
|
summary, |
|
location: obj.get("LOCATION"), |
|
url: obj.get("URL"), |
|
}) |
|
} |
|
|
|
pub struct Resources { |
|
db_url: String, |
|
http_client: reqwest::Client, |
|
queue: RwLock<VecDeque<(String, CalendarOptions)>>, |
|
} |
|
|
|
impl Resources { |
|
pub fn new(db_url: String, queue: VecDeque<(String, CalendarOptions)>) -> Self { |
|
let http_client = reqwest::Client::new(); |
|
let queue = RwLock::new(queue); |
|
Resources { db_url, http_client, queue } |
|
} |
|
|
|
fn next(&self) -> Option<(String, CalendarOptions)> { |
|
let mut queue = self.queue.write().unwrap(); |
|
queue.pop_front() |
|
} |
|
|
|
fn worker_fetch(&self, cal_id: &str, cal_opts: &CalendarOptions, etag: Option<String>, last_modified: Option<String>) |
|
-> Result<(Option<String>, Option<String>, HashMap<String, Event>), Error> |
|
{ |
|
let mut req = self.http_client.get(&cal_opts.url) |
|
.header(USER_AGENT, "Ticker/0.0.0"); |
|
match etag { |
|
Some(etag) => req = req.header(IF_NONE_MATCH, etag), |
|
None => (), |
|
} |
|
match last_modified { |
|
Some(last_modified) => req = req.header(IF_MODIFIED_SINCE, last_modified), |
|
None => (), |
|
} |
|
let mut res = req.send()?; |
|
|
|
if res.status() != 200 { |
|
let msg = format!("HTTP {}", res.status()); |
|
return Err(Error::Message(msg)); |
|
} |
|
|
|
let etag = res.headers().get(ETAG) |
|
.and_then(|v| v.to_str().ok()) |
|
.map(|s| s.to_owned()); |
|
let last_modified = res.headers().get(LAST_MODIFIED) |
|
.and_then(|v| v.to_str().ok()) |
|
.map(|s| s.to_owned()); |
|
|
|
let mut p = Parser::new(); |
|
let mut buf = [0; 1024]; |
|
let mut events = HashMap::new(); |
|
loop { |
|
match res.read(&mut buf)? { |
|
len if len > 0 => { |
|
let data = &buf[..len]; |
|
p.feed(data, |obj| { |
|
let mut objs = vec![]; |
|
extract_vevent_objs(&mut objs, obj); |
|
for obj in objs { |
|
if let Some(event) = obj_to_event(cal_id.to_owned(), &obj) { |
|
events.insert(event.id.clone(), event); |
|
} else { |
|
let dtstart: Option<&str> = obj.get("DTSTART"); |
|
let summary: Option<&str> = obj.get("SUMMARY"); |
|
println!("cannot convert {} {:?} {:?}", |
|
obj.name, dtstart, summary); |
|
} |
|
} |
|
}); |
|
} |
|
_ => break, |
|
} |
|
} |
|
Ok((etag, last_modified, events)) |
|
} |
|
|
|
fn worker_job(&self, db: &PgConnection) -> Result<bool, Error> { |
|
let (cal_id, cal_opts) = match self.next() { |
|
Some(next) => next, |
|
None => return Ok(true), |
|
}; |
|
|
|
let (etag, last_modified) = db.transaction::<_, Error, _>(|| { |
|
let cals = calendars |
|
.filter(schema::calendars::dsl::id.eq(cal_id.clone())) |
|
.limit(1) |
|
.load::<Calendar>(db)?; |
|
match cals.first() { |
|
None => { |
|
let cal = NewCalendar { |
|
id: cal_id.clone(), |
|
url: cal_opts.url.clone(), |
|
last_fetch: Some(Utc::now().naive_utc()), |
|
}; |
|
diesel::insert_into(calendars) |
|
.values(&cal) |
|
.execute(db)?; |
|
Ok((None, None)) |
|
} |
|
Some(cal) => { |
|
diesel::update(calendars) |
|
.filter(schema::calendars::dsl::id.eq(cal_id.clone())) |
|
.set(schema::calendars::dsl::last_fetch.eq(Utc::now().naive_utc())) |
|
.execute(db)?; |
|
let result = |
|
// Use ETag/Last-Modified only if URL hasn't changed |
|
if cal.url == cal_opts.url { |
|
(cal.etag.clone(), cal.last_modified.clone()) |
|
} else { |
|
(None, None) |
|
}; |
|
Ok(result) |
|
} |
|
} |
|
})?; |
|
|
|
let (etag, last_modified, events) = |
|
self.worker_fetch(&cal_id, &cal_opts, etag, last_modified) |
|
.map_err(|e| { |
|
let msg = format!("{}", e); |
|
println!("[{}] {}", cal_id, msg); |
|
let _ = diesel::update(calendars) |
|
.filter(schema::calendars::dsl::id.eq(cal_id.clone())) |
|
.set(schema::calendars::dsl::error_message.eq(msg)) |
|
.execute(db); |
|
e |
|
})?; |
|
|
|
println!("[{}] {} events", cal_id, events.len()); |
|
|
|
db.transaction::<_, Error, _>(|| { |
|
diesel::update(calendars) |
|
.filter(schema::calendars::dsl::id.eq(cal_id.clone())) |
|
.set((schema::calendars::dsl::last_success.eq(Utc::now().naive_utc()), |
|
schema::calendars::dsl::etag.eq(etag), |
|
schema::calendars::dsl::last_modified.eq(last_modified), |
|
schema::calendars::dsl::error_message.eq(None as Option<String>))) |
|
.execute(db)?; |
|
|
|
diesel::delete(schema::events::dsl::events) |
|
.filter(schema::events::dsl::calendar.eq(cal_id)) |
|
.execute(db)?; |
|
for event in events.values() { |
|
diesel::insert_into(schema::events::dsl::events) |
|
.values(event) |
|
.execute(db)?; |
|
} |
|
|
|
Ok(()) |
|
})?; |
|
|
|
Ok(false) |
|
} |
|
|
|
fn worker_loop(&self) -> Result<(), Error> { |
|
let db = PgConnection::establish(&self.db_url)?; |
|
let mut done = false; |
|
while ! done { |
|
done = self.worker_job(&db) |
|
.map_err(|e| println!("{}", e)) |
|
.unwrap_or(false); |
|
} |
|
Ok(()) |
|
} |
|
} |
|
|
|
#[derive(Debug)] |
|
pub enum Error { |
|
DbConnection(diesel::ConnectionError), |
|
Db(diesel::result::Error), |
|
Http(reqwest::Error), |
|
Io(std::io::Error), |
|
Message(String), |
|
} |
|
|
|
impl From<diesel::ConnectionError> for Error { |
|
fn from(e: diesel::ConnectionError) -> Self { |
|
Error::DbConnection(e) |
|
} |
|
} |
|
|
|
impl From<diesel::result::Error> for Error { |
|
fn from(e: diesel::result::Error) -> Self { |
|
Error::Db(e) |
|
} |
|
} |
|
|
|
impl From<reqwest::Error> for Error { |
|
fn from(e: reqwest::Error) -> Self { |
|
Error::Http(e) |
|
} |
|
} |
|
|
|
impl From<std::io::Error> for Error { |
|
fn from(e: std::io::Error) -> Self { |
|
Error::Io(e) |
|
} |
|
} |
|
|
|
impl std::fmt::Display for Error { |
|
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> { |
|
match self { |
|
Error::DbConnection(e) => e.fmt(fmt), |
|
Error::Db(e) => e.fmt(fmt), |
|
Error::Http(e) => e.fmt(fmt), |
|
Error::Io(e) => e.fmt(fmt), |
|
Error::Message(e) => e.fmt(fmt), |
|
} |
|
} |
|
} |
|
|
|
fn main() { |
|
let config = Config::read_yaml_file("config.yaml"); |
|
let res = Resources::new( |
|
config.db_url, |
|
config.calendars.into_iter() |
|
.collect() |
|
); |
|
|
|
crossbeam::scope(|s| { |
|
let cpus = num_cpus::get(); |
|
let workers: Vec<_> = (0..cpus) |
|
.map(|_| s.spawn(|_| { |
|
res.worker_loop() |
|
})) |
|
.collect(); |
|
for worker in workers.into_iter() { |
|
worker.join().expect("worker join").expect("worker"); |
|
} |
|
}).unwrap(); |
|
}
|
|
|