butcher: remove useless tag_trimmer
This commit is contained in:
parent
a871723e9b
commit
93d01f4233
|
@ -13,7 +13,6 @@ use trend_setter::UpdateSet;
|
|||
|
||||
mod config;
|
||||
mod trend_setter;
|
||||
mod tag_trimmer;
|
||||
|
||||
async fn is_profane(profanity: &WordList, post: &Post) -> bool {
|
||||
if post.sensitive == Some(true) {
|
||||
|
@ -43,9 +42,6 @@ async fn main() {
|
|||
cave::systemd::status("Starting trend_setter");
|
||||
let trend_setter_tx = trend_setter::start(store.clone());
|
||||
|
||||
cave::systemd::status("Starting tag_trimmer");
|
||||
tag_trimmer::start(store.clone());
|
||||
|
||||
let firehose_factory = FirehoseFactory::new(config.redis);
|
||||
let firehose = firehose_factory.produce()
|
||||
.await
|
||||
|
|
|
@ -1,79 +0,0 @@
|
|||
use std::time::{Duration, Instant};
|
||||
|
||||
use futures::{
|
||||
future::join,
|
||||
StreamExt,
|
||||
};
|
||||
use tokio::time::sleep;
|
||||
use cave::store::Store;
|
||||
|
||||
const TRIM_INTERVAL: Duration = Duration::from_secs(86400);
|
||||
const TRIM_STEP_SLEEP: Duration = Duration::from_millis(30);
|
||||
|
||||
pub fn start(store: Store) {
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
let start = Instant::now();
|
||||
run(&store).await;
|
||||
let end = Instant::now();
|
||||
tracing::info!("trimmed in {:.3?}", end - start);
|
||||
|
||||
sleep(TRIM_INTERVAL).await;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async fn run(store: &Store) {
|
||||
let mut store1 = store.clone();
|
||||
let mut store3 = store.clone();
|
||||
|
||||
join(async {
|
||||
let store2 = store1.clone();
|
||||
store1.get_tags_global()
|
||||
.await
|
||||
.unwrap()
|
||||
.for_each(move |tag| {
|
||||
let mut store2 = store2.clone();
|
||||
async move {
|
||||
trim_tag(None, tag, &mut store2).await;
|
||||
sleep(TRIM_STEP_SLEEP).await;
|
||||
}
|
||||
})
|
||||
.await
|
||||
}, async {
|
||||
let store4 = store3.clone();
|
||||
store3.get_tags_by_language()
|
||||
.await
|
||||
.unwrap()
|
||||
.for_each(move |(language, tag)| {
|
||||
let mut store4 = store4.clone();
|
||||
async move {
|
||||
trim_tag(language, tag, &mut store4).await;
|
||||
sleep(TRIM_STEP_SLEEP).await;
|
||||
}})
|
||||
.await
|
||||
}).await;
|
||||
}
|
||||
|
||||
async fn trim_tag(language: Option<String>, tag: String, store: &mut Store) {
|
||||
let t1 = Instant::now();
|
||||
let lang = if language.is_some() { "some" } else { "any" };
|
||||
let trend_tags = store.get_trend_tags(&language, [tag].into_iter())
|
||||
.await
|
||||
.unwrap();
|
||||
let trend_tag = &trend_tags[0];
|
||||
|
||||
if trend_tag.hour_users.iter().all(|(_, users)| *users == 0) {
|
||||
// drop the whole wholly-outdated tag
|
||||
tracing::debug!("Deleting whole tag {:?} {}", language, trend_tag.name);
|
||||
store.delete_tag(&language, &trend_tag.name).await
|
||||
.expect("delete_tag");
|
||||
metrics::increment_counter!("hunter_trimmed_tags", "type" => "delete_tag", "lang" => lang);
|
||||
} else {
|
||||
store.clean_trend_tag(&language, trend_tag).await
|
||||
.expect("clean_trend_tag");
|
||||
metrics::increment_counter!("hunter_trimmed_tags", "type" => "clean_trend_tag", "lang" => lang);
|
||||
}
|
||||
let t2 = Instant::now();
|
||||
metrics::histogram!("hunter_trim_tag_seconds", t2 - t1, "lang" => lang);
|
||||
}
|
|
@ -1,79 +0,0 @@
|
|||
use std::time::{Duration, Instant};
|
||||
|
||||
use futures::{
|
||||
future::join,
|
||||
StreamExt,
|
||||
};
|
||||
use tokio::time::sleep;
|
||||
use cave::store::Store;
|
||||
|
||||
const TRIM_INTERVAL: Duration = Duration::from_secs(86400);
|
||||
const TRIM_STEP_SLEEP: Duration = Duration::from_millis(30);
|
||||
|
||||
pub fn start(store: Store) {
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
let start = Instant::now();
|
||||
run(&store).await;
|
||||
let end = Instant::now();
|
||||
tracing::info!("trimmed in {:.3?}", end - start);
|
||||
|
||||
sleep(TRIM_INTERVAL).await;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async fn run(store: &Store) {
|
||||
let mut store1 = store.clone();
|
||||
let mut store3 = store.clone();
|
||||
|
||||
join(async {
|
||||
let store2 = store1.clone();
|
||||
store1.get_tags_global()
|
||||
.await
|
||||
.unwrap()
|
||||
.for_each(move |tag| {
|
||||
let mut store2 = store2.clone();
|
||||
async move {
|
||||
trim_tag(None, tag, &mut store2).await;
|
||||
sleep(TRIM_STEP_SLEEP).await;
|
||||
}
|
||||
})
|
||||
.await
|
||||
}, async {
|
||||
let store4 = store3.clone();
|
||||
store3.get_tags_by_language()
|
||||
.await
|
||||
.unwrap()
|
||||
.for_each(move |(language, tag)| {
|
||||
let mut store4 = store4.clone();
|
||||
async move {
|
||||
trim_tag(language, tag, &mut store4).await;
|
||||
sleep(TRIM_STEP_SLEEP).await;
|
||||
}})
|
||||
.await
|
||||
}).await;
|
||||
}
|
||||
|
||||
async fn trim_tag(language: Option<String>, tag: String, store: &mut Store) {
|
||||
let t1 = Instant::now();
|
||||
let lang = if language.is_some() { "some" } else { "any" };
|
||||
let trend_tags = store.get_trend_tags(&language, [tag].into_iter())
|
||||
.await
|
||||
.unwrap();
|
||||
let trend_tag = &trend_tags[0];
|
||||
|
||||
if trend_tag.hour_users.iter().all(|(_, users)| *users == 0) {
|
||||
// drop the whole wholly-outdated tag
|
||||
tracing::debug!("Deleting whole tag {:?} {}", language, trend_tag.name);
|
||||
store.delete_tag(&language, &trend_tag.name).await
|
||||
.expect("delete_tag");
|
||||
metrics::increment_counter!("hunter_trimmed_tags", "type" => "delete_tag", "lang" => lang);
|
||||
} else {
|
||||
store.clean_trend_tag(&language, trend_tag).await
|
||||
.expect("clean_trend_tag");
|
||||
metrics::increment_counter!("hunter_trimmed_tags", "type" => "clean_trend_tag", "lang" => lang);
|
||||
}
|
||||
let t2 = Instant::now();
|
||||
metrics::histogram!("hunter_trim_tag_seconds", t2 - t1, "lang" => lang);
|
||||
}
|
Loading…
Reference in New Issue