caveman/butcher/src/main.rs

59 lines
1.6 KiB
Rust
Raw Normal View History

2023-01-22 00:05:10 +01:00
use futures::StreamExt;
use cave::{
config::LoadConfig,
firehose::FirehoseFactory,
};
use trend_setter::UpdateSet;
mod config;
mod trend_setter;
mod tag_trimmer;
#[tokio::main]
async fn main() {
cave::systemd::extend_timeout(100_000);
cave::init::exit_on_panic();
cave::init::init_logger(5555);
let config = config::Config::load();
let store = cave::store::Store::new(16, config.redis.clone()).await;
cave::systemd::status("Starting trend_setter");
let trend_setter_tx = trend_setter::start(store.clone());
cave::systemd::status("Starting tag_trimmer");
tag_trimmer::start(store.clone());
let firehose_factory = FirehoseFactory::new(config.redis);
let firehose = firehose_factory.produce()
.await
.expect("firehose")
.filter_map(|item| async move { item.ok() });
cave::systemd::ready();
firehose.for_each(move |data| {
let trend_setter_tx = trend_setter_tx.clone();
let mut store = store.clone();
async move {
let post = match serde_json::from_slice(&data) {
Ok(post) =>
post,
Err(e) => {
tracing::error!("Cannot parse JSON: {:?}", e);
return;
},
};
store.save_post_tags(&post).await;
let update_set = UpdateSet::from(&post);
if ! update_set.is_empty() {
trend_setter_tx.send(update_set).await.unwrap();
}
2023-01-22 21:02:43 +01:00
cave::systemd::watchdog();
2023-01-22 00:05:10 +01:00
}
}).await;
}