diff --git a/butcher/src/main.rs b/butcher/src/main.rs index 44b7306..fc9d25e 100644 --- a/butcher/src/main.rs +++ b/butcher/src/main.rs @@ -13,7 +13,6 @@ use trend_setter::UpdateSet; mod config; mod trend_setter; -mod tag_trimmer; async fn is_profane(profanity: &WordList, post: &Post) -> bool { if post.sensitive == Some(true) { @@ -43,9 +42,6 @@ async fn main() { cave::systemd::status("Starting trend_setter"); let trend_setter_tx = trend_setter::start(store.clone()); - cave::systemd::status("Starting tag_trimmer"); - tag_trimmer::start(store.clone()); - let firehose_factory = FirehoseFactory::new(config.redis); let firehose = firehose_factory.produce() .await diff --git a/butcher/src/tag_trimmer.rs b/butcher/src/tag_trimmer.rs deleted file mode 100644 index 18dadc2..0000000 --- a/butcher/src/tag_trimmer.rs +++ /dev/null @@ -1,79 +0,0 @@ -use std::time::{Duration, Instant}; - -use futures::{ - future::join, - StreamExt, -}; -use tokio::time::sleep; -use cave::store::Store; - -const TRIM_INTERVAL: Duration = Duration::from_secs(86400); -const TRIM_STEP_SLEEP: Duration = Duration::from_millis(30); - -pub fn start(store: Store) { - tokio::spawn(async move { - loop { - let start = Instant::now(); - run(&store).await; - let end = Instant::now(); - tracing::info!("trimmed in {:.3?}", end - start); - - sleep(TRIM_INTERVAL).await; - } - }); -} - -async fn run(store: &Store) { - let mut store1 = store.clone(); - let mut store3 = store.clone(); - - join(async { - let store2 = store1.clone(); - store1.get_tags_global() - .await - .unwrap() - .for_each(move |tag| { - let mut store2 = store2.clone(); - async move { - trim_tag(None, tag, &mut store2).await; - sleep(TRIM_STEP_SLEEP).await; - } - }) - .await - }, async { - let store4 = store3.clone(); - store3.get_tags_by_language() - .await - .unwrap() - .for_each(move |(language, tag)| { - let mut store4 = store4.clone(); - async move { - trim_tag(language, tag, &mut store4).await; - sleep(TRIM_STEP_SLEEP).await; - }}) - .await - }).await; -} - -async fn trim_tag(language: Option, tag: String, store: &mut Store) { - let t1 = Instant::now(); - let lang = if language.is_some() { "some" } else { "any" }; - let trend_tags = store.get_trend_tags(&language, [tag].into_iter()) - .await - .unwrap(); - let trend_tag = &trend_tags[0]; - - if trend_tag.hour_users.iter().all(|(_, users)| *users == 0) { - // drop the whole wholly-outdated tag - tracing::debug!("Deleting whole tag {:?} {}", language, trend_tag.name); - store.delete_tag(&language, &trend_tag.name).await - .expect("delete_tag"); - metrics::increment_counter!("hunter_trimmed_tags", "type" => "delete_tag", "lang" => lang); - } else { - store.clean_trend_tag(&language, trend_tag).await - .expect("clean_trend_tag"); - metrics::increment_counter!("hunter_trimmed_tags", "type" => "clean_trend_tag", "lang" => lang); - } - let t2 = Instant::now(); - metrics::histogram!("hunter_trim_tag_seconds", t2 - t1, "lang" => lang); -} diff --git a/hunter/src/tag_trimmer.rs b/hunter/src/tag_trimmer.rs deleted file mode 100644 index 18dadc2..0000000 --- a/hunter/src/tag_trimmer.rs +++ /dev/null @@ -1,79 +0,0 @@ -use std::time::{Duration, Instant}; - -use futures::{ - future::join, - StreamExt, -}; -use tokio::time::sleep; -use cave::store::Store; - -const TRIM_INTERVAL: Duration = Duration::from_secs(86400); -const TRIM_STEP_SLEEP: Duration = Duration::from_millis(30); - -pub fn start(store: Store) { - tokio::spawn(async move { - loop { - let start = Instant::now(); - run(&store).await; - let end = Instant::now(); - tracing::info!("trimmed in {:.3?}", end - start); - - sleep(TRIM_INTERVAL).await; - } - }); -} - -async fn run(store: &Store) { - let mut store1 = store.clone(); - let mut store3 = store.clone(); - - join(async { - let store2 = store1.clone(); - store1.get_tags_global() - .await - .unwrap() - .for_each(move |tag| { - let mut store2 = store2.clone(); - async move { - trim_tag(None, tag, &mut store2).await; - sleep(TRIM_STEP_SLEEP).await; - } - }) - .await - }, async { - let store4 = store3.clone(); - store3.get_tags_by_language() - .await - .unwrap() - .for_each(move |(language, tag)| { - let mut store4 = store4.clone(); - async move { - trim_tag(language, tag, &mut store4).await; - sleep(TRIM_STEP_SLEEP).await; - }}) - .await - }).await; -} - -async fn trim_tag(language: Option, tag: String, store: &mut Store) { - let t1 = Instant::now(); - let lang = if language.is_some() { "some" } else { "any" }; - let trend_tags = store.get_trend_tags(&language, [tag].into_iter()) - .await - .unwrap(); - let trend_tag = &trend_tags[0]; - - if trend_tag.hour_users.iter().all(|(_, users)| *users == 0) { - // drop the whole wholly-outdated tag - tracing::debug!("Deleting whole tag {:?} {}", language, trend_tag.name); - store.delete_tag(&language, &trend_tag.name).await - .expect("delete_tag"); - metrics::increment_counter!("hunter_trimmed_tags", "type" => "delete_tag", "lang" => lang); - } else { - store.clean_trend_tag(&language, trend_tag).await - .expect("clean_trend_tag"); - metrics::increment_counter!("hunter_trimmed_tags", "type" => "clean_trend_tag", "lang" => lang); - } - let t2 = Instant::now(); - metrics::histogram!("hunter_trim_tag_seconds", t2 - t1, "lang" => lang); -}