From 844221616e42af930ae5489f7ad35c73407c628d Mon Sep 17 00:00:00 2001 From: Astro Date: Fri, 11 Oct 2019 22:15:38 +0200 Subject: [PATCH] refactor worker into worker/worker_job --- src/main.rs | 241 +++++++++++++++++++++++++++------------------------- 1 file changed, 125 insertions(+), 116 deletions(-) diff --git a/src/main.rs b/src/main.rs index 321733a..c888a90 100644 --- a/src/main.rs +++ b/src/main.rs @@ -73,127 +73,136 @@ impl Resources { queue.pop_front() } - fn worker(&self) -> Result<(), Error> { - let db = PgConnection::establish(&self.db_url)?; + fn worker_job(&self, db: &PgConnection) -> Result { + let (id, cal_opts) = match self.next() { + Some(next) => next, + None => return Ok(true), + }; - loop { - let (id, cal_opts) = match self.next() { - Some(next) => next, - None => return Ok(()), - }; - - let (etag, last_modified) = db.transaction::<_, Error, _>(|| { - let cals = calendars - .filter(schema::calendars::dsl::id.eq(id.clone())) - .limit(1) - .load::(&db)?; - match cals.first() { - None => { - let cal = NewCalendar { - id: id.clone(), - url: cal_opts.url.clone(), - last_fetch: Some(Utc::now().naive_utc()), + let (etag, last_modified) = db.transaction::<_, Error, _>(|| { + let cals = calendars + .filter(schema::calendars::dsl::id.eq(id.clone())) + .limit(1) + .load::(db)?; + match cals.first() { + None => { + let cal = NewCalendar { + id: id.clone(), + url: cal_opts.url.clone(), + last_fetch: Some(Utc::now().naive_utc()), + }; + diesel::insert_into(calendars) + .values(&cal) + .execute(db)?; + Ok((None, None)) + } + Some(cal) => { + diesel::update(calendars) + .filter(schema::calendars::dsl::id.eq(id.clone())) + .set(schema::calendars::dsl::last_fetch.eq(Utc::now().naive_utc())) + .execute(db)?; + let result = + // Use ETag/Last-Modified only if URL hasn't changed + if cal.url == cal_opts.url { + (cal.etag.clone(), cal.last_modified.clone()) + } else { + (None, None) }; - diesel::insert_into(calendars) - .values(&cal) - .execute(&db)?; - Ok((None, None)) - } - Some(cal) => { - diesel::update(calendars) - .filter(schema::calendars::dsl::id.eq(id.clone())) - .set(schema::calendars::dsl::last_fetch.eq(Utc::now().naive_utc())) - .execute(&db)?; - let result = - // Use ETag/Last-Modified only if URL hasn't changed - if cal.url == cal_opts.url { - (cal.etag.clone(), cal.last_modified.clone()) - } else { - (None, None) - }; - Ok(result) - } + Ok(result) } - })?; - - let mut req = self.http_client.get(&cal_opts.url) - .header(USER_AGENT, "Ticker/0.0.0"); - match etag { - Some(etag) => req = req.header(IF_NONE_MATCH, etag), - None => (), } - match last_modified { - Some(last_modified) => req = req.header(IF_MODIFIED_SINCE, last_modified), - None => (), - } - let mut res = req.send()?; + })?; - println!("[{}] HTTP {}", id, res.status()); - if res.status() != 200 { - let msg = format!("HTTP {}", res.status()); - diesel::update(calendars) - .filter(schema::calendars::dsl::id.eq(id.clone())) - .set((schema::calendars::dsl::last_success.eq(Utc::now().naive_utc()), - schema::calendars::dsl::error_message.eq(msg))) - .execute(&db)?; - return Ok(()) - } - - let etag = res.headers().get(ETAG) - .and_then(|v| v.to_str().ok()) - .map(|s| s.to_owned()); - let last_modified = res.headers().get(LAST_MODIFIED) - .and_then(|v| v.to_str().ok()) - .map(|s| s.to_owned()); - - db.transaction::<_, Error, _>(|| { - diesel::update(calendars) - .filter(schema::calendars::dsl::id.eq(id.clone())) - .set((schema::calendars::dsl::last_success.eq(Utc::now().naive_utc()), - schema::calendars::dsl::etag.eq(etag), - schema::calendars::dsl::last_modified.eq(last_modified))) - .execute(&db)?; - - let mut p = Parser::new(); - let mut buf = [0; 1024]; - let mut events = HashMap::new(); - loop { - match res.read(&mut buf)? { - len if len > 0 => { - let data = &buf[..len]; - p.feed(data, |obj| { - let mut objs = vec![]; - extract_vevent_objs(&mut objs, obj); - for obj in objs { - if let Some(event) = obj_to_event(id.clone(), &obj) { - events.insert(event.id.clone(), event); - } else { - let dtstart: Option<&str> = obj.get("DTSTART"); - let summary: Option<&str> = obj.get("SUMMARY"); - println!("cannot convert {} {:?} {:?}", - obj.name, dtstart, summary); - } - } - }); - } - _ => break, - } - } - drop(res); - - println!("[{}] {} events", id, events.len()); - diesel::delete(schema::events::dsl::events) - .filter(schema::events::dsl::calendar.eq(id)) - .execute(&db)?; - for event in events.values() { - diesel::insert_into(schema::events::dsl::events) - .values(event) - .execute(&db)?; - } - - Ok(()) - })?; + let mut req = self.http_client.get(&cal_opts.url) + .header(USER_AGENT, "Ticker/0.0.0"); + match etag { + Some(etag) => req = req.header(IF_NONE_MATCH, etag), + None => (), } + match last_modified { + Some(last_modified) => req = req.header(IF_MODIFIED_SINCE, last_modified), + None => (), + } + let mut res = req.send()?; + + println!("[{}] HTTP {}", id, res.status()); + if res.status() != 200 { + let msg = format!("HTTP {}", res.status()); + diesel::update(calendars) + .filter(schema::calendars::dsl::id.eq(id.clone())) + .set((schema::calendars::dsl::last_success.eq(Utc::now().naive_utc()), + schema::calendars::dsl::error_message.eq(msg))) + .execute(db)?; + return Ok(false) + } + + let etag = res.headers().get(ETAG) + .and_then(|v| v.to_str().ok()) + .map(|s| s.to_owned()); + let last_modified = res.headers().get(LAST_MODIFIED) + .and_then(|v| v.to_str().ok()) + .map(|s| s.to_owned()); + + db.transaction::<_, Error, _>(|| { + diesel::update(calendars) + .filter(schema::calendars::dsl::id.eq(id.clone())) + .set((schema::calendars::dsl::last_success.eq(Utc::now().naive_utc()), + schema::calendars::dsl::etag.eq(etag), + schema::calendars::dsl::last_modified.eq(last_modified))) + .execute(db)?; + + let mut p = Parser::new(); + let mut buf = [0; 1024]; + let mut events = HashMap::new(); + loop { + match res.read(&mut buf)? { + len if len > 0 => { + let data = &buf[..len]; + p.feed(data, |obj| { + let mut objs = vec![]; + extract_vevent_objs(&mut objs, obj); + for obj in objs { + if let Some(event) = obj_to_event(id.clone(), &obj) { + events.insert(event.id.clone(), event); + } else { + let dtstart: Option<&str> = obj.get("DTSTART"); + let summary: Option<&str> = obj.get("SUMMARY"); + println!("cannot convert {} {:?} {:?}", + obj.name, dtstart, summary); + } + } + }); + } + _ => break, + } + } + drop(res); + + println!("[{}] {} events", id, events.len()); + diesel::delete(schema::events::dsl::events) + .filter(schema::events::dsl::calendar.eq(id)) + .execute(db)?; + for event in events.values() { + diesel::insert_into(schema::events::dsl::events) + .values(event) + .execute(db)?; + } + + Ok(false) + })?; + + Ok(false) + } + + fn worker_loop(&self) -> Result<(), Error> { + let db = PgConnection::establish(&self.db_url)?; + let mut done = false; + while ! done { + done = self.worker_job(&db) + .map_err(|e| println!("{:?}", e)) + .unwrap_or(false); + } + Ok(()) } } @@ -244,7 +253,7 @@ fn main() { let cpus = num_cpus::get(); let workers: Vec<_> = (0..cpus) .map(|_| s.spawn(|_| { - res.worker() + res.worker_loop() .map_err(|e| println!("{:?}", e)) })) .collect();