use std::{ borrow::Borrow, cmp::Ordering, collections::{HashSet, HashMap, BTreeMap}, time::{Duration, Instant}, }; use tokio::{ sync::mpsc::{channel, Sender}, time::timeout, }; use cave::{ current_hour, feed::Post, store::{Store, TREND_POOL_SIZE}, trend_tag::TrendTag, }; #[cfg(not(debug))] const MIN_INTERVAL: Duration = Duration::from_secs(30); #[cfg(debug)] const MIN_INTERVAL: Duration = Duration::from_secs(5); #[derive(Debug)] pub struct UpdateSet { language: Option, tags: HashSet, } impl UpdateSet { pub fn is_empty(&self) -> bool { self.tags.is_empty() } } impl From<&Post> for UpdateSet { fn from(post: &Post) -> Self { UpdateSet { language: post.lang(), tags: post.tags_set().keys().cloned().collect(), } } } pub type Tx = Sender; pub fn start(mut store: Store) -> Tx { let (tx, mut rx) = channel::(1024); tokio::spawn(async move { let mut queue = BTreeMap::>::new(); // by language let mut buffer = HashMap::, HashSet>::new(); fn enqueue( language: Option, queue: &mut BTreeMap::>, buffer: &mut HashMap::, HashSet>, tags: HashSet, ) { match buffer.entry(language.clone()) { std::collections::hash_map::Entry::Vacant(entry) => { entry.insert(tags); queue.insert(Instant::now() + MIN_INTERVAL, language); } std::collections::hash_map::Entry::Occupied(mut entry) => { // merge into buffered for tag in tags.into_iter() { entry.get_mut().insert(tag); } } } } loop { let mut next_run = queue.keys().cloned().next(); if let Some(next_run_) = next_run { let now = Instant::now(); if next_run_ <= now { let language = queue.remove(&next_run_).unwrap(); let buffered = buffer.remove(&language).unwrap(); run(&language, buffered.clone(), &mut store).await.unwrap(); // update with next in queue next_run = queue.keys().cloned().next(); } else { tracing::trace!("next_run in {:?}", next_run_ - now); } } else { let languages = store.get_languages().await.unwrap(); tracing::info!("queue empty, filling from {} languages", languages.len()); for language in languages.into_iter() { enqueue(Some(language.clone()), &mut queue, &mut buffer, HashSet::new()); } } let now = Instant::now(); let interval = next_run.map_or( Duration::from_secs(3600), |next_run| next_run - now ); let _ = timeout(interval, async { if let Some(update_set) = rx.recv().await { enqueue(None, &mut queue, &mut buffer, update_set.tags.clone()); if let Some(language) = update_set.language { enqueue(Some(language), &mut queue, &mut buffer, update_set.tags); } } }).await; } }); tx } async fn run( language: &Option, new: HashSet, store: &mut Store, ) -> Result<(), cave::store::Error> { let t1 = Instant::now(); tracing::debug!("run {:?}: updated {:?}", language, new); let old = store.get_trend_pools(language, cave::PERIODS).await?; let all: HashSet = old.iter() .flat_map(|(_period, tags)| tags.clone()) .chain(new.into_iter()) .collect(); let mut period_scores: Vec<(u64, Vec<(f64, &TrendTag)>)> = cave::PERIODS.iter() .map(|period| (*period, Vec::with_capacity(all.len()))) .collect(); // calculate scores for tags currently in the pool let until = current_hour(); let trend_tags = store.get_trend_tags(language, all.into_iter()).await?; for trend_tag in &trend_tags { for (period, scores) in &mut period_scores { let score = trend_tag.score(*period, until); scores.push((score, trend_tag)); } } // order for (_period, scores) in &mut period_scores { scores.sort_by(|(score1, _), (score2, _)| { if score1 > score2 { Ordering::Less } else if score1 < score2 { Ordering::Greater } else { Ordering::Equal } }); } // separate new, kept, old tags let updates = old.into_iter().zip(&period_scores) .map(|((period1, old_tags), (period2, scores))| { assert_eq!(period1, *period2); let old_tags = old_tags.into_iter().collect::>(); let mut keep = TREND_POOL_SIZE.min(scores.len()); // shrink sorted set of tags as long as score is 0 while keep > 0 && scores[keep - 1].0 <= 0.0 { keep -= 1; } let remove = scores[keep..].iter() .map(|(_score, trend_tag)| trend_tag.name.borrow()) .filter(|tag| old_tags.contains(*tag)) .collect::>(); let add = scores[..keep].iter() .map(|(_score, trend_tag)| trend_tag.name.borrow()) .filter(|tag| ! old_tags.contains(*tag)) .collect::>(); (period1, keep, scores.len(), remove, add) }); // print for (period, keep, total, remove, add) in updates.clone() { if !add.is_empty() || !remove.is_empty() { tracing::info!("Trending in {:?} for {} of {}/{}: +{:?} -{:?}", language, period, keep, total, add, remove); } } // write to redis store.update_trend_pools( language, updates.clone().map(|(period, _, _, remove, _add)| (period, remove)), updates.clone().map(|(period, _, _, _remove, add)| (period, add)), updates.map(|(period, keep, _, _, _)| (period, keep)), ).await?; let t2 = Instant::now(); metrics::histogram!("hunter_trend_setter_seconds", t2 - t1, "lang" => if language.is_some() { "some" } else { "any" }); Ok(()) }