db, config, worker threads
This commit is contained in:
parent
723992d159
commit
0bc81040a0
File diff suppressed because it is too large
Load Diff
|
@ -8,3 +8,10 @@ edition = "2018"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
reqwest = "~0.9"
|
reqwest = "~0.9"
|
||||||
|
rustorm = { version = "~0.14", features = ["with-postgres"] }
|
||||||
|
rustorm_dao = "~0.4"
|
||||||
|
chrono = "~0.4"
|
||||||
|
serde = { version = "~1.0", features = ["derive"] }
|
||||||
|
serde_yaml = "~0.8"
|
||||||
|
num_cpus = "~1"
|
||||||
|
crossbeam = "~0.7"
|
||||||
|
|
|
@ -0,0 +1,11 @@
|
||||||
|
CREATE TABLE calendar (
|
||||||
|
id TEXT PRIMARY KEY,
|
||||||
|
url TEXT NOT NULL,
|
||||||
|
|
||||||
|
last_fetch TIMESTAMP,
|
||||||
|
last_success TIMESTAMP,
|
||||||
|
error_message TEXT,
|
||||||
|
|
||||||
|
etag TEXT,
|
||||||
|
last_modified TEXT
|
||||||
|
);
|
|
@ -0,0 +1,14 @@
|
||||||
|
use std::collections::BTreeMap;
|
||||||
|
use serde::{Serialize, Deserialize};
|
||||||
|
|
||||||
|
|
||||||
|
#[derive(Debug, PartialEq, Serialize, Deserialize)]
|
||||||
|
pub struct CalendarOptions {
|
||||||
|
pub url: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, PartialEq, Serialize, Deserialize)]
|
||||||
|
pub struct Config {
|
||||||
|
pub db_url: String,
|
||||||
|
pub calendars: BTreeMap<String, CalendarOptions>,
|
||||||
|
}
|
148
src/main.rs
148
src/main.rs
|
@ -1,35 +1,137 @@
|
||||||
use std::io::Read;
|
use std::io::Read;
|
||||||
|
use std::fs::read_to_string;
|
||||||
|
use std::sync::{Mutex, RwLock};
|
||||||
|
use std::collections::VecDeque;
|
||||||
|
use rustorm::{pool::Pool, DbError};
|
||||||
|
use chrono::offset::Utc;
|
||||||
|
|
||||||
|
mod config;
|
||||||
|
use config::{Config, CalendarOptions};
|
||||||
|
mod schema;
|
||||||
|
use schema::{new, Calendar, Event};
|
||||||
|
mod transaction;
|
||||||
|
use transaction::{Transaction, QuerySql};
|
||||||
mod ics;
|
mod ics;
|
||||||
use ics::Parser;
|
use ics::Parser;
|
||||||
|
|
||||||
fn fetch(client: &reqwest::Client, url: &str) -> Result<(), Box<dyn std::error::Error>> {
|
pub struct Resources {
|
||||||
let mut res = client.get(url).send()?;
|
db_pool: Mutex<Pool>,
|
||||||
|
db_url: String,
|
||||||
|
http_client: reqwest::Client,
|
||||||
|
queue: RwLock<VecDeque<(String, CalendarOptions)>>,
|
||||||
|
}
|
||||||
|
|
||||||
println!("{} {}", res.status(), url);
|
impl Resources {
|
||||||
|
pub fn new(db_url: String, queue: VecDeque<(String, CalendarOptions)>) -> Self {
|
||||||
let mut p = Parser::new();
|
let db_pool = Mutex::new(Pool::new());
|
||||||
let mut buf = [0; 8192];
|
let http_client = reqwest::Client::new();
|
||||||
loop {
|
let queue = RwLock::new(queue);
|
||||||
match res.read(&mut buf)? {
|
Resources { db_pool, db_url, http_client, queue }
|
||||||
len if len > 0 => {
|
|
||||||
let data = &buf[..len];
|
|
||||||
p.feed(data, |obj| {
|
|
||||||
println!("- [{}] {}", obj.get("DTSTART").unwrap_or("?"), obj.get("SUMMARY").unwrap_or("?"));
|
|
||||||
print!(" {}", obj.get("LOCATION").unwrap_or("?"));
|
|
||||||
obj.get("URL").map(|url| print!(" <{}>", url));
|
|
||||||
println!("");
|
|
||||||
});
|
|
||||||
}
|
|
||||||
_ => break,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
fn transaction(&self) -> Result<Transaction, DbError> {
|
||||||
|
let em = self.db_pool.lock()
|
||||||
|
.unwrap()
|
||||||
|
.em(&self.db_url)?;
|
||||||
|
Transaction::new(em)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn next(&self) -> Option<(String, CalendarOptions)> {
|
||||||
|
let mut queue = self.queue.write().unwrap();
|
||||||
|
queue.pop_front()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn worker(&self) -> Result<(), Error> {
|
||||||
|
let (id, cal_opts) = match self.next() {
|
||||||
|
Some(next) => next,
|
||||||
|
None => return Ok(()),
|
||||||
|
};
|
||||||
|
let tx = self.transaction()?;
|
||||||
|
|
||||||
|
let cal: Option<Calendar> =
|
||||||
|
tx.query("SELECT * FROM calendar WHERE id=$1", &[&id])?;
|
||||||
|
if cal.is_none() {
|
||||||
|
tx.insert(&[&new::Calendar {
|
||||||
|
id: id.clone(),
|
||||||
|
url: cal_opts.url.clone(),
|
||||||
|
last_fetch: Utc::now(),
|
||||||
|
}])?;
|
||||||
|
} else {
|
||||||
|
tx.exec("UPDATE calendar SET last_fetch=$2 WHERE id=$1", &[&id, &"NOW()"])?;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut res = self.http_client.get(&cal_opts.url).send()?;
|
||||||
|
println!("{} {} {:?}", res.status(), cal_opts.url, res.headers());
|
||||||
|
|
||||||
|
let mut p = Parser::new();
|
||||||
|
let mut buf = [0; 1024];
|
||||||
|
loop {
|
||||||
|
match res.read(&mut buf)? {
|
||||||
|
len if len > 0 => {
|
||||||
|
let data = &buf[..len];
|
||||||
|
p.feed(data, |obj| {
|
||||||
|
println!("- [{}] {}", obj.get("DTSTART").unwrap_or("?"), obj.get("SUMMARY").unwrap_or("?"));
|
||||||
|
print!(" {}", obj.get("LOCATION").unwrap_or("?"));
|
||||||
|
obj.get("URL").map(|url| print!(" <{}>", url));
|
||||||
|
println!("");
|
||||||
|
});
|
||||||
|
}
|
||||||
|
_ => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tx.commit()?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum Error {
|
||||||
|
Db(DbError),
|
||||||
|
Http(reqwest::Error),
|
||||||
|
Io(std::io::Error),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<DbError> for Error {
|
||||||
|
fn from(e: DbError) -> Self {
|
||||||
|
Error::Db(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<reqwest::Error> for Error {
|
||||||
|
fn from(e: reqwest::Error) -> Self {
|
||||||
|
Error::Http(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<std::io::Error> for Error {
|
||||||
|
fn from(e: std::io::Error) -> Self {
|
||||||
|
Error::Io(e)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
let client = reqwest::Client::new();
|
let config_file = read_to_string("config.yaml")
|
||||||
fetch(&client, "https://c3d2.de/ical.ics").expect("fetch");
|
.expect("config.yaml");
|
||||||
fetch(&client, "https://www.dresden-science-calendar.de/calendar/de/iCalSync.ics").expect("fetch");
|
let config: Config = serde_yaml::from_str(&config_file)
|
||||||
|
.expect("config");
|
||||||
|
let res = Resources::new(
|
||||||
|
config.db_url,
|
||||||
|
config.calendars.into_iter()
|
||||||
|
.collect()
|
||||||
|
);
|
||||||
|
|
||||||
|
crossbeam::scope(|s| {
|
||||||
|
let cpus = num_cpus::get();
|
||||||
|
let workers: Vec<_> = (0..cpus)
|
||||||
|
.map(|_| s.spawn(|_| {
|
||||||
|
res.worker()
|
||||||
|
.map_err(|e| println!("{:?}", e))
|
||||||
|
}))
|
||||||
|
.collect();
|
||||||
|
for worker in workers.into_iter() {
|
||||||
|
worker.join().expect("worker").unwrap();
|
||||||
|
}
|
||||||
|
}).unwrap();
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,43 @@
|
||||||
|
use rustorm::{ToTableName, ToColumnNames, FromDao};
|
||||||
|
use chrono::{DateTime, offset::Utc};
|
||||||
|
|
||||||
|
#[derive(Debug, FromDao, ToColumnNames, ToTableName)]
|
||||||
|
pub struct SqlResult {
|
||||||
|
result: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub mod new {
|
||||||
|
use rustorm::{ToTableName, ToColumnNames, FromDao, ToDao};
|
||||||
|
use chrono::{DateTime, offset::Utc};
|
||||||
|
|
||||||
|
#[derive(Debug, FromDao, ToDao, ToColumnNames, ToTableName)]
|
||||||
|
pub struct Calendar {
|
||||||
|
pub id: String,
|
||||||
|
pub url: String,
|
||||||
|
pub last_fetch: DateTime<Utc>,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, FromDao, ToColumnNames, ToTableName)]
|
||||||
|
pub struct Calendar {
|
||||||
|
pub id: String,
|
||||||
|
pub url: String,
|
||||||
|
|
||||||
|
pub last_fetch: Option<DateTime<Utc>>,
|
||||||
|
pub last_success: Option<DateTime<Utc>>,
|
||||||
|
pub error_message: Option<String>,
|
||||||
|
|
||||||
|
pub etag: Option<String>,
|
||||||
|
pub last_modified: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, FromDao, ToColumnNames, ToTableName)]
|
||||||
|
pub struct Event {
|
||||||
|
pub calendar: String,
|
||||||
|
|
||||||
|
pub start: DateTime<Utc>,
|
||||||
|
pub end: Option<DateTime<Utc>>,
|
||||||
|
pub summary: String,
|
||||||
|
pub location: String,
|
||||||
|
pub url: String,
|
||||||
|
}
|
|
@ -0,0 +1,63 @@
|
||||||
|
use rustorm::{DbError, EntityManager, ToValue};
|
||||||
|
use rustorm::dao::{ToTableName, ToColumnNames, ToDao, FromDao};
|
||||||
|
|
||||||
|
use super::schema::SqlResult;
|
||||||
|
|
||||||
|
pub struct Transaction {
|
||||||
|
em: EntityManager,
|
||||||
|
committed: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Transaction {
|
||||||
|
pub fn new(em: EntityManager) -> Result<Self, DbError> {
|
||||||
|
let tx = Transaction {
|
||||||
|
em,
|
||||||
|
committed: false,
|
||||||
|
};
|
||||||
|
tx.exec("BEGIN", &[])?;
|
||||||
|
Ok(tx)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn commit(mut self) -> Result<(), DbError> {
|
||||||
|
self.exec("COMMIT", &[])?;
|
||||||
|
self.committed = true;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn exec<'a>(&self, sql: &str, params: &[&'a dyn ToValue]) -> Result<(), DbError> {
|
||||||
|
let _: Option<SqlResult> = self.em.execute_sql_with_maybe_one_return(sql, params)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn insert<T>(&self, entities: &[&T]) -> Result<(), DbError>
|
||||||
|
where
|
||||||
|
T: ToTableName + ToColumnNames + ToDao + FromDao,
|
||||||
|
{
|
||||||
|
let _: Vec<T> = self.em.insert(entities)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for Transaction {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
if !self.committed {
|
||||||
|
self.exec("ROLLBACK", &[]).unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub trait QuerySql<R> {
|
||||||
|
fn query<'a>(&self, sql: &str, params: &[&'a dyn ToValue]) -> Result<R, DbError>;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<R: FromDao> QuerySql<Vec<R>> for Transaction {
|
||||||
|
fn query<'a>(&self, sql: &str, params: &[&'a dyn ToValue]) -> Result<Vec<R>, DbError> {
|
||||||
|
self.em.execute_sql_with_return(sql, params)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<R: FromDao> QuerySql<Option<R>> for Transaction {
|
||||||
|
fn query<'a>(&self, sql: &str, params: &[&'a dyn ToValue]) -> Result<Option<R>, DbError> {
|
||||||
|
self.em.execute_sql_with_maybe_one_return(sql, params)
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue