refactor worker_job into worker_job/worker_fetch

This commit is contained in:
Astro 2019-10-11 22:42:16 +02:00
parent 844221616e
commit 1d98cbd1ff
1 changed files with 77 additions and 64 deletions

View File

@ -73,21 +73,76 @@ impl Resources {
queue.pop_front() queue.pop_front()
} }
fn worker_fetch(&self, cal_id: &str, cal_opts: &CalendarOptions, etag: Option<String>, last_modified: Option<String>)
-> Result<(Option<String>, Option<String>, HashMap<String, Event>), Error>
{
let mut req = self.http_client.get(&cal_opts.url)
.header(USER_AGENT, "Ticker/0.0.0");
match etag {
Some(etag) => req = req.header(IF_NONE_MATCH, etag),
None => (),
}
match last_modified {
Some(last_modified) => req = req.header(IF_MODIFIED_SINCE, last_modified),
None => (),
}
let mut res = req.send()?;
if res.status() != 200 {
let msg = format!("HTTP {}", res.status());
return Err(Error::Message(msg));
}
let etag = res.headers().get(ETAG)
.and_then(|v| v.to_str().ok())
.map(|s| s.to_owned());
let last_modified = res.headers().get(LAST_MODIFIED)
.and_then(|v| v.to_str().ok())
.map(|s| s.to_owned());
let mut p = Parser::new();
let mut buf = [0; 1024];
let mut events = HashMap::new();
loop {
match res.read(&mut buf)? {
len if len > 0 => {
let data = &buf[..len];
p.feed(data, |obj| {
let mut objs = vec![];
extract_vevent_objs(&mut objs, obj);
for obj in objs {
if let Some(event) = obj_to_event(cal_id.to_owned(), &obj) {
events.insert(event.id.clone(), event);
} else {
let dtstart: Option<&str> = obj.get("DTSTART");
let summary: Option<&str> = obj.get("SUMMARY");
println!("cannot convert {} {:?} {:?}",
obj.name, dtstart, summary);
}
}
});
}
_ => break,
}
}
Ok((etag, last_modified, events))
}
fn worker_job(&self, db: &PgConnection) -> Result<bool, Error> { fn worker_job(&self, db: &PgConnection) -> Result<bool, Error> {
let (id, cal_opts) = match self.next() { let (cal_id, cal_opts) = match self.next() {
Some(next) => next, Some(next) => next,
None => return Ok(true), None => return Ok(true),
}; };
let (etag, last_modified) = db.transaction::<_, Error, _>(|| { let (etag, last_modified) = db.transaction::<_, Error, _>(|| {
let cals = calendars let cals = calendars
.filter(schema::calendars::dsl::id.eq(id.clone())) .filter(schema::calendars::dsl::id.eq(cal_id.clone()))
.limit(1) .limit(1)
.load::<Calendar>(db)?; .load::<Calendar>(db)?;
match cals.first() { match cals.first() {
None => { None => {
let cal = NewCalendar { let cal = NewCalendar {
id: id.clone(), id: cal_id.clone(),
url: cal_opts.url.clone(), url: cal_opts.url.clone(),
last_fetch: Some(Utc::now().naive_utc()), last_fetch: Some(Utc::now().naive_utc()),
}; };
@ -98,7 +153,7 @@ impl Resources {
} }
Some(cal) => { Some(cal) => {
diesel::update(calendars) diesel::update(calendars)
.filter(schema::calendars::dsl::id.eq(id.clone())) .filter(schema::calendars::dsl::id.eq(cal_id.clone()))
.set(schema::calendars::dsl::last_fetch.eq(Utc::now().naive_utc())) .set(schema::calendars::dsl::last_fetch.eq(Utc::now().naive_utc()))
.execute(db)?; .execute(db)?;
let result = let result =
@ -113,74 +168,31 @@ impl Resources {
} }
})?; })?;
let mut req = self.http_client.get(&cal_opts.url) let (etag, last_modified, events) =
.header(USER_AGENT, "Ticker/0.0.0"); self.worker_fetch(&cal_id, &cal_opts, etag, last_modified)
match etag { .map_err(|e| {
Some(etag) => req = req.header(IF_NONE_MATCH, etag), let msg = format!("{:?}", e);
None => (), println!("[{}] {}", cal_id, msg);
} let _ = diesel::update(calendars)
match last_modified { .filter(schema::calendars::dsl::id.eq(cal_id.clone()))
Some(last_modified) => req = req.header(IF_MODIFIED_SINCE, last_modified), .set(schema::calendars::dsl::error_message.eq(msg))
None => (), .execute(db);
} e
let mut res = req.send()?; })?;
println!("[{}] HTTP {}", id, res.status()); println!("[{}] {} events", cal_id, events.len());
if res.status() != 200 {
let msg = format!("HTTP {}", res.status());
diesel::update(calendars)
.filter(schema::calendars::dsl::id.eq(id.clone()))
.set((schema::calendars::dsl::last_success.eq(Utc::now().naive_utc()),
schema::calendars::dsl::error_message.eq(msg)))
.execute(db)?;
return Ok(false)
}
let etag = res.headers().get(ETAG)
.and_then(|v| v.to_str().ok())
.map(|s| s.to_owned());
let last_modified = res.headers().get(LAST_MODIFIED)
.and_then(|v| v.to_str().ok())
.map(|s| s.to_owned());
db.transaction::<_, Error, _>(|| { db.transaction::<_, Error, _>(|| {
diesel::update(calendars) diesel::update(calendars)
.filter(schema::calendars::dsl::id.eq(id.clone())) .filter(schema::calendars::dsl::id.eq(cal_id.clone()))
.set((schema::calendars::dsl::last_success.eq(Utc::now().naive_utc()), .set((schema::calendars::dsl::last_success.eq(Utc::now().naive_utc()),
schema::calendars::dsl::etag.eq(etag), schema::calendars::dsl::etag.eq(etag),
schema::calendars::dsl::last_modified.eq(last_modified))) schema::calendars::dsl::last_modified.eq(last_modified),
schema::calendars::dsl::error_message.eq(None as Option<String>)))
.execute(db)?; .execute(db)?;
let mut p = Parser::new();
let mut buf = [0; 1024];
let mut events = HashMap::new();
loop {
match res.read(&mut buf)? {
len if len > 0 => {
let data = &buf[..len];
p.feed(data, |obj| {
let mut objs = vec![];
extract_vevent_objs(&mut objs, obj);
for obj in objs {
if let Some(event) = obj_to_event(id.clone(), &obj) {
events.insert(event.id.clone(), event);
} else {
let dtstart: Option<&str> = obj.get("DTSTART");
let summary: Option<&str> = obj.get("SUMMARY");
println!("cannot convert {} {:?} {:?}",
obj.name, dtstart, summary);
}
}
});
}
_ => break,
}
}
drop(res);
println!("[{}] {} events", id, events.len());
diesel::delete(schema::events::dsl::events) diesel::delete(schema::events::dsl::events)
.filter(schema::events::dsl::calendar.eq(id)) .filter(schema::events::dsl::calendar.eq(cal_id))
.execute(db)?; .execute(db)?;
for event in events.values() { for event in events.values() {
diesel::insert_into(schema::events::dsl::events) diesel::insert_into(schema::events::dsl::events)
@ -188,7 +200,7 @@ impl Resources {
.execute(db)?; .execute(db)?;
} }
Ok(false) Ok(())
})?; })?;
Ok(false) Ok(false)
@ -212,6 +224,7 @@ pub enum Error {
Db(diesel::result::Error), Db(diesel::result::Error),
Http(reqwest::Error), Http(reqwest::Error),
Io(std::io::Error), Io(std::io::Error),
Message(String),
} }
impl From<diesel::ConnectionError> for Error { impl From<diesel::ConnectionError> for Error {