ticker/src/main.rs

227 lines
7.3 KiB
Rust

#[macro_use]
extern crate diesel;
use std::io::Read;
use std::fs::read_to_string;
use std::sync::RwLock;
use std::collections::VecDeque;
use chrono::offset::Utc;
use reqwest::header::{IF_NONE_MATCH, IF_MODIFIED_SINCE, ETAG, LAST_MODIFIED, USER_AGENT};
use diesel::{Connection, pg::PgConnection, prelude::*};
mod config;
use config::{Config, CalendarOptions};
mod schema;
use schema::{calendars::dsl::calendars};
mod model;
use model::{Calendar, NewCalendar, Event};
mod ics;
use ics::{Parser, Object, GetValue};
fn obj_to_event(calendar: String, obj: Object) -> Option<Event> {
if obj.name != "VEVENT" {
return None;
}
let dtstart = obj.get("DTSTART")?;
let summary = obj.get("SUMMARY")?;
let id = format!("{}{}{}{}",
obj.get("UID").unwrap_or(""),
dtstart,
obj.get("DTSTAMP").unwrap_or(""),
obj.get("RECURRENCE-ID").unwrap_or(""));
// TODO: DESCRIPTION
Some(Event {
calendar, id,
dtstart,
dtend: obj.get("DTEND"),
summary,
location: obj.get("LOCATION"),
url: obj.get("URL"),
})
}
pub struct Resources {
db_url: String,
http_client: reqwest::Client,
queue: RwLock<VecDeque<(String, CalendarOptions)>>,
}
impl Resources {
pub fn new(db_url: String, queue: VecDeque<(String, CalendarOptions)>) -> Self {
let http_client = reqwest::Client::new();
let queue = RwLock::new(queue);
Resources { db_url, http_client, queue }
}
fn next(&self) -> Option<(String, CalendarOptions)> {
let mut queue = self.queue.write().unwrap();
queue.pop_front()
}
fn worker(&self) -> Result<(), Error> {
let db = PgConnection::establish(&self.db_url)?;
loop {
let (id, cal_opts) = match self.next() {
Some(next) => next,
None => return Ok(()),
};
let (etag, last_modified) = db.transaction::<_, Error, _>(|| {
let cals = calendars
.filter(schema::calendars::dsl::id.eq(id.clone()))
.limit(1)
.load::<Calendar>(&db)?;
match cals.first() {
None => {
let cal = NewCalendar {
id: id.clone(),
url: cal_opts.url.clone(),
last_fetch: Some(Utc::now().naive_utc()),
};
diesel::insert_into(calendars)
.values(&cal)
.execute(&db)?;
Ok((None, None))
}
Some(cal) => {
diesel::update(calendars)
.filter(schema::calendars::dsl::id.eq(id.clone()))
.set(schema::calendars::dsl::last_fetch.eq(Utc::now().naive_utc()))
.execute(&db)?;
Ok((cal.etag.clone(), cal.last_modified.clone()))
}
}
})?;
let mut req = self.http_client.get(&cal_opts.url)
.header(USER_AGENT, "Ticker/0.0.0");
match etag {
Some(etag) => req = req.header(IF_NONE_MATCH, etag),
None => (),
}
match last_modified {
Some(last_modified) => req = req.header(IF_MODIFIED_SINCE, last_modified),
None => (),
}
let mut res = req.send()?;
if res.status() != 200 {
let msg = format!("HTTP {}", res.status());
println!("{} {}", res.status(), cal_opts.url);
diesel::update(calendars)
.filter(schema::calendars::dsl::id.eq(id.clone()))
.set((schema::calendars::dsl::last_success.eq(Utc::now().naive_utc()),
schema::calendars::dsl::error_message.eq(msg)))
.execute(&db)?;
return Ok(())
}
let etag = res.headers().get(ETAG)
.and_then(|v| v.to_str().ok())
.map(|s| s.to_owned());
let last_modified = res.headers().get(LAST_MODIFIED)
.and_then(|v| v.to_str().ok())
.map(|s| s.to_owned());
db.transaction::<_, Error, _>(|| {
diesel::update(calendars)
.filter(schema::calendars::dsl::id.eq(id.clone()))
.set((schema::calendars::dsl::last_success.eq(Utc::now().naive_utc()),
schema::calendars::dsl::etag.eq(etag),
schema::calendars::dsl::last_modified.eq(last_modified)))
.execute(&db)?;
let mut p = Parser::new();
let mut buf = [0; 1024];
let mut events = vec![];
loop {
match res.read(&mut buf)? {
len if len > 0 => {
let data = &buf[..len];
p.feed(data, |obj| {
if let Some(event) = obj_to_event(cal_opts.url.clone(), obj) {
events.push(event);
}
});
}
_ => break,
}
}
drop(res);
println!("{} events {}", events.len(), cal_opts.url);
diesel::delete(schema::events::dsl::events)
.filter(schema::events::dsl::calendar.eq(cal_opts.url))
.execute(&db)?;
for event in events {
// println!("insert {:?}", event);
diesel::insert_into(schema::events::dsl::events)
.values(&event)
.execute(&db)?;
}
Ok(())
})?;
}
}
}
#[derive(Debug)]
pub enum Error {
DbConnection(diesel::ConnectionError),
Db(diesel::result::Error),
Http(reqwest::Error),
Io(std::io::Error),
}
impl From<diesel::ConnectionError> for Error {
fn from(e: diesel::ConnectionError) -> Self {
Error::DbConnection(e)
}
}
impl From<diesel::result::Error> for Error {
fn from(e: diesel::result::Error) -> Self {
Error::Db(e)
}
}
impl From<reqwest::Error> for Error {
fn from(e: reqwest::Error) -> Self {
Error::Http(e)
}
}
impl From<std::io::Error> for Error {
fn from(e: std::io::Error) -> Self {
Error::Io(e)
}
}
fn main() {
let config_file = read_to_string("config.yaml")
.expect("config.yaml");
let config: Config = serde_yaml::from_str(&config_file)
.expect("config");
let res = Resources::new(
config.db_url,
config.calendars.into_iter()
.collect()
);
crossbeam::scope(|s| {
let cpus = num_cpus::get();
let workers: Vec<_> = (0..cpus)
.map(|_| s.spawn(|_| {
res.worker()
.map_err(|e| println!("{:?}", e))
}))
.collect();
for worker in workers.into_iter() {
worker.join().expect("worker").unwrap();
}
}).unwrap();
}