refactor worker into worker/worker_job
This commit is contained in:
parent
54dc6db900
commit
844221616e
241
src/main.rs
241
src/main.rs
|
@ -73,127 +73,136 @@ impl Resources {
|
|||
queue.pop_front()
|
||||
}
|
||||
|
||||
fn worker(&self) -> Result<(), Error> {
|
||||
let db = PgConnection::establish(&self.db_url)?;
|
||||
fn worker_job(&self, db: &PgConnection) -> Result<bool, Error> {
|
||||
let (id, cal_opts) = match self.next() {
|
||||
Some(next) => next,
|
||||
None => return Ok(true),
|
||||
};
|
||||
|
||||
loop {
|
||||
let (id, cal_opts) = match self.next() {
|
||||
Some(next) => next,
|
||||
None => return Ok(()),
|
||||
};
|
||||
|
||||
let (etag, last_modified) = db.transaction::<_, Error, _>(|| {
|
||||
let cals = calendars
|
||||
.filter(schema::calendars::dsl::id.eq(id.clone()))
|
||||
.limit(1)
|
||||
.load::<Calendar>(&db)?;
|
||||
match cals.first() {
|
||||
None => {
|
||||
let cal = NewCalendar {
|
||||
id: id.clone(),
|
||||
url: cal_opts.url.clone(),
|
||||
last_fetch: Some(Utc::now().naive_utc()),
|
||||
let (etag, last_modified) = db.transaction::<_, Error, _>(|| {
|
||||
let cals = calendars
|
||||
.filter(schema::calendars::dsl::id.eq(id.clone()))
|
||||
.limit(1)
|
||||
.load::<Calendar>(db)?;
|
||||
match cals.first() {
|
||||
None => {
|
||||
let cal = NewCalendar {
|
||||
id: id.clone(),
|
||||
url: cal_opts.url.clone(),
|
||||
last_fetch: Some(Utc::now().naive_utc()),
|
||||
};
|
||||
diesel::insert_into(calendars)
|
||||
.values(&cal)
|
||||
.execute(db)?;
|
||||
Ok((None, None))
|
||||
}
|
||||
Some(cal) => {
|
||||
diesel::update(calendars)
|
||||
.filter(schema::calendars::dsl::id.eq(id.clone()))
|
||||
.set(schema::calendars::dsl::last_fetch.eq(Utc::now().naive_utc()))
|
||||
.execute(db)?;
|
||||
let result =
|
||||
// Use ETag/Last-Modified only if URL hasn't changed
|
||||
if cal.url == cal_opts.url {
|
||||
(cal.etag.clone(), cal.last_modified.clone())
|
||||
} else {
|
||||
(None, None)
|
||||
};
|
||||
diesel::insert_into(calendars)
|
||||
.values(&cal)
|
||||
.execute(&db)?;
|
||||
Ok((None, None))
|
||||
}
|
||||
Some(cal) => {
|
||||
diesel::update(calendars)
|
||||
.filter(schema::calendars::dsl::id.eq(id.clone()))
|
||||
.set(schema::calendars::dsl::last_fetch.eq(Utc::now().naive_utc()))
|
||||
.execute(&db)?;
|
||||
let result =
|
||||
// Use ETag/Last-Modified only if URL hasn't changed
|
||||
if cal.url == cal_opts.url {
|
||||
(cal.etag.clone(), cal.last_modified.clone())
|
||||
} else {
|
||||
(None, None)
|
||||
};
|
||||
Ok(result)
|
||||
}
|
||||
Ok(result)
|
||||
}
|
||||
})?;
|
||||
|
||||
let mut req = self.http_client.get(&cal_opts.url)
|
||||
.header(USER_AGENT, "Ticker/0.0.0");
|
||||
match etag {
|
||||
Some(etag) => req = req.header(IF_NONE_MATCH, etag),
|
||||
None => (),
|
||||
}
|
||||
match last_modified {
|
||||
Some(last_modified) => req = req.header(IF_MODIFIED_SINCE, last_modified),
|
||||
None => (),
|
||||
}
|
||||
let mut res = req.send()?;
|
||||
})?;
|
||||
|
||||
println!("[{}] HTTP {}", id, res.status());
|
||||
if res.status() != 200 {
|
||||
let msg = format!("HTTP {}", res.status());
|
||||
diesel::update(calendars)
|
||||
.filter(schema::calendars::dsl::id.eq(id.clone()))
|
||||
.set((schema::calendars::dsl::last_success.eq(Utc::now().naive_utc()),
|
||||
schema::calendars::dsl::error_message.eq(msg)))
|
||||
.execute(&db)?;
|
||||
return Ok(())
|
||||
}
|
||||
|
||||
let etag = res.headers().get(ETAG)
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.map(|s| s.to_owned());
|
||||
let last_modified = res.headers().get(LAST_MODIFIED)
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.map(|s| s.to_owned());
|
||||
|
||||
db.transaction::<_, Error, _>(|| {
|
||||
diesel::update(calendars)
|
||||
.filter(schema::calendars::dsl::id.eq(id.clone()))
|
||||
.set((schema::calendars::dsl::last_success.eq(Utc::now().naive_utc()),
|
||||
schema::calendars::dsl::etag.eq(etag),
|
||||
schema::calendars::dsl::last_modified.eq(last_modified)))
|
||||
.execute(&db)?;
|
||||
|
||||
let mut p = Parser::new();
|
||||
let mut buf = [0; 1024];
|
||||
let mut events = HashMap::new();
|
||||
loop {
|
||||
match res.read(&mut buf)? {
|
||||
len if len > 0 => {
|
||||
let data = &buf[..len];
|
||||
p.feed(data, |obj| {
|
||||
let mut objs = vec![];
|
||||
extract_vevent_objs(&mut objs, obj);
|
||||
for obj in objs {
|
||||
if let Some(event) = obj_to_event(id.clone(), &obj) {
|
||||
events.insert(event.id.clone(), event);
|
||||
} else {
|
||||
let dtstart: Option<&str> = obj.get("DTSTART");
|
||||
let summary: Option<&str> = obj.get("SUMMARY");
|
||||
println!("cannot convert {} {:?} {:?}",
|
||||
obj.name, dtstart, summary);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
_ => break,
|
||||
}
|
||||
}
|
||||
drop(res);
|
||||
|
||||
println!("[{}] {} events", id, events.len());
|
||||
diesel::delete(schema::events::dsl::events)
|
||||
.filter(schema::events::dsl::calendar.eq(id))
|
||||
.execute(&db)?;
|
||||
for event in events.values() {
|
||||
diesel::insert_into(schema::events::dsl::events)
|
||||
.values(event)
|
||||
.execute(&db)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})?;
|
||||
let mut req = self.http_client.get(&cal_opts.url)
|
||||
.header(USER_AGENT, "Ticker/0.0.0");
|
||||
match etag {
|
||||
Some(etag) => req = req.header(IF_NONE_MATCH, etag),
|
||||
None => (),
|
||||
}
|
||||
match last_modified {
|
||||
Some(last_modified) => req = req.header(IF_MODIFIED_SINCE, last_modified),
|
||||
None => (),
|
||||
}
|
||||
let mut res = req.send()?;
|
||||
|
||||
println!("[{}] HTTP {}", id, res.status());
|
||||
if res.status() != 200 {
|
||||
let msg = format!("HTTP {}", res.status());
|
||||
diesel::update(calendars)
|
||||
.filter(schema::calendars::dsl::id.eq(id.clone()))
|
||||
.set((schema::calendars::dsl::last_success.eq(Utc::now().naive_utc()),
|
||||
schema::calendars::dsl::error_message.eq(msg)))
|
||||
.execute(db)?;
|
||||
return Ok(false)
|
||||
}
|
||||
|
||||
let etag = res.headers().get(ETAG)
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.map(|s| s.to_owned());
|
||||
let last_modified = res.headers().get(LAST_MODIFIED)
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.map(|s| s.to_owned());
|
||||
|
||||
db.transaction::<_, Error, _>(|| {
|
||||
diesel::update(calendars)
|
||||
.filter(schema::calendars::dsl::id.eq(id.clone()))
|
||||
.set((schema::calendars::dsl::last_success.eq(Utc::now().naive_utc()),
|
||||
schema::calendars::dsl::etag.eq(etag),
|
||||
schema::calendars::dsl::last_modified.eq(last_modified)))
|
||||
.execute(db)?;
|
||||
|
||||
let mut p = Parser::new();
|
||||
let mut buf = [0; 1024];
|
||||
let mut events = HashMap::new();
|
||||
loop {
|
||||
match res.read(&mut buf)? {
|
||||
len if len > 0 => {
|
||||
let data = &buf[..len];
|
||||
p.feed(data, |obj| {
|
||||
let mut objs = vec![];
|
||||
extract_vevent_objs(&mut objs, obj);
|
||||
for obj in objs {
|
||||
if let Some(event) = obj_to_event(id.clone(), &obj) {
|
||||
events.insert(event.id.clone(), event);
|
||||
} else {
|
||||
let dtstart: Option<&str> = obj.get("DTSTART");
|
||||
let summary: Option<&str> = obj.get("SUMMARY");
|
||||
println!("cannot convert {} {:?} {:?}",
|
||||
obj.name, dtstart, summary);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
_ => break,
|
||||
}
|
||||
}
|
||||
drop(res);
|
||||
|
||||
println!("[{}] {} events", id, events.len());
|
||||
diesel::delete(schema::events::dsl::events)
|
||||
.filter(schema::events::dsl::calendar.eq(id))
|
||||
.execute(db)?;
|
||||
for event in events.values() {
|
||||
diesel::insert_into(schema::events::dsl::events)
|
||||
.values(event)
|
||||
.execute(db)?;
|
||||
}
|
||||
|
||||
Ok(false)
|
||||
})?;
|
||||
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
fn worker_loop(&self) -> Result<(), Error> {
|
||||
let db = PgConnection::establish(&self.db_url)?;
|
||||
let mut done = false;
|
||||
while ! done {
|
||||
done = self.worker_job(&db)
|
||||
.map_err(|e| println!("{:?}", e))
|
||||
.unwrap_or(false);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -244,7 +253,7 @@ fn main() {
|
|||
let cpus = num_cpus::get();
|
||||
let workers: Vec<_> = (0..cpus)
|
||||
.map(|_| s.spawn(|_| {
|
||||
res.worker()
|
||||
res.worker_loop()
|
||||
.map_err(|e| println!("{:?}", e))
|
||||
}))
|
||||
.collect();
|
||||
|
|
Loading…
Reference in New Issue