caveman/butcher/src/main.rs

85 lines
2.2 KiB
Rust

use std::{
sync::Arc,
ops::Deref,
};
use futures::StreamExt;
use cave::{
config::LoadConfig,
feed::Post,
firehose::FirehoseFactory,
word_list::WordList,
};
use trend_setter::UpdateSet;
mod config;
mod trend_setter;
mod tag_trimmer;
async fn is_profane(profanity: &WordList, post: &Post) -> bool {
if post.sensitive == Some(true) {
return true;
}
let tags_set = post.tags_set();
let tagged_profanity =
futures::stream::iter(
tags_set.iter()
)
.any(|(tag, _spellings)| profanity.contains(tag));
tagged_profanity.await
}
#[tokio::main]
async fn main() {
cave::init::exit_on_panic();
cave::init::init_logger(5555);
let config = config::Config::load();
let profanity = WordList::new(&config.profanity).await;
let store = cave::store::Store::new(16, config.redis.clone()).await;
cave::systemd::status("Starting trend_setter");
let trend_setter_tx = trend_setter::start(store.clone());
cave::systemd::status("Starting tag_trimmer");
tag_trimmer::start(store.clone());
let firehose_factory = FirehoseFactory::new(config.redis);
let firehose = firehose_factory.produce()
.await
.expect("firehose")
.filter_map(|item| async move { item.ok() });
cave::systemd::ready();
firehose.for_each(move |data| {
let trend_setter_tx = trend_setter_tx.clone();
let mut store = store.clone();
let profanity = profanity.clone();
tokio::spawn(async move {
let post = match serde_json::from_slice(&data) {
Ok(post) =>
post,
Err(e) => {
tracing::error!("Cannot parse JSON: {:?}", e);
return;
},
};
let post = Arc::new(post);
store.save_post_tags(&post, is_profane(&profanity, &post).await).await;
let update_set = UpdateSet::from(post.deref());
if ! update_set.is_empty() {
trend_setter_tx.send(update_set).await.unwrap();
}
cave::systemd::watchdog();
});
futures::future::ready(())
}).await;
panic!("End")
}