caveman/butcher/src/trend_setter.rs

196 lines
6.5 KiB
Rust

use std::{
borrow::Borrow,
cmp::Ordering,
collections::{HashSet, HashMap, BTreeMap},
time::{Duration, Instant},
};
use tokio::{
sync::mpsc::{channel, Sender},
time::timeout,
};
use cave::{
current_hour,
feed::Post,
store::{Store, TREND_POOL_SIZE}, trend_tag::TrendTag,
};
#[cfg(not(debug))]
const MIN_INTERVAL: Duration = Duration::from_secs(30);
#[cfg(debug)]
const MIN_INTERVAL: Duration = Duration::from_secs(5);
#[derive(Debug)]
pub struct UpdateSet {
language: Option<String>,
tags: HashSet<String>,
}
impl UpdateSet {
pub fn is_empty(&self) -> bool {
self.tags.is_empty()
}
}
impl From<&Post> for UpdateSet {
fn from(post: &Post) -> Self {
UpdateSet {
language: post.lang(),
tags: post.tags_set().keys().cloned().collect(),
}
}
}
pub type Tx = Sender<UpdateSet>;
pub fn start(mut store: Store) -> Tx {
let (tx, mut rx) = channel::<UpdateSet>(1024);
tokio::spawn(async move {
let mut queue = BTreeMap::<Instant, Option<String>>::new();
// by language
let mut buffer = HashMap::<Option<String>, HashSet<String>>::new();
fn enqueue(
language: Option<String>,
queue: &mut BTreeMap::<Instant, Option<String>>,
buffer: &mut HashMap::<Option<String>, HashSet<String>>,
tags: HashSet<String>,
) {
match buffer.entry(language.clone()) {
std::collections::hash_map::Entry::Vacant(entry) => {
entry.insert(tags);
queue.insert(Instant::now() + MIN_INTERVAL, language);
}
std::collections::hash_map::Entry::Occupied(mut entry) => {
// merge into buffered
for tag in tags.into_iter() {
entry.get_mut().insert(tag);
}
}
}
}
loop {
let mut next_run = queue.keys().cloned().next();
if let Some(next_run_) = next_run {
let now = Instant::now();
if next_run_ <= now {
let language = queue.remove(&next_run_).unwrap();
let buffered = buffer.remove(&language).unwrap();
run(&language, buffered.clone(), &mut store).await.unwrap();
// update with next in queue
next_run = queue.keys().cloned().next();
} else {
tracing::trace!("next_run in {:?}", next_run_ - now);
}
} else {
let languages = store.get_languages().await.unwrap();
tracing::info!("queue empty, filling from {} languages", languages.len());
for language in languages.into_iter() {
enqueue(Some(language.clone()), &mut queue, &mut buffer, HashSet::new());
}
}
let now = Instant::now();
let interval = next_run.map_or(
Duration::from_secs(3600),
|next_run| next_run - now
);
let _ = timeout(interval, async {
if let Some(update_set) = rx.recv().await {
enqueue(None, &mut queue, &mut buffer, update_set.tags.clone());
if let Some(language) = update_set.language {
enqueue(Some(language), &mut queue, &mut buffer, update_set.tags);
}
}
}).await;
}
});
tx
}
async fn run(
language: &Option<String>,
new: HashSet<String>,
store: &mut Store,
) -> Result<(), cave::store::Error> {
let t1 = Instant::now();
tracing::debug!("run {:?}: updated {:?}", language, new);
let old = store.get_trend_pools(language, cave::PERIODS).await?;
let all: HashSet<String> = old.iter()
.flat_map(|(_period, tags)| tags.clone())
.chain(new.into_iter())
.collect();
let mut period_scores: Vec<(u64, Vec<(f64, &TrendTag)>)> = cave::PERIODS.iter()
.map(|period| (*period, Vec::with_capacity(all.len())))
.collect();
// calculate scores for tags currently in the pool
let until = current_hour();
let trend_tags = store.get_trend_tags(language, all.into_iter()).await?;
for trend_tag in &trend_tags {
for (period, scores) in &mut period_scores {
let score = trend_tag.score(*period, until);
scores.push((score, trend_tag));
}
}
// order
for (_period, scores) in &mut period_scores {
scores.sort_by(|(score1, _), (score2, _)| {
if score1 > score2 {
Ordering::Less
} else if score1 < score2 {
Ordering::Greater
} else {
Ordering::Equal
}
});
}
// separate new, kept, old tags
let updates = old.into_iter().zip(&period_scores)
.map(|((period1, old_tags), (period2, scores))| {
assert_eq!(period1, *period2);
let old_tags = old_tags.into_iter().collect::<HashSet<String>>();
let mut keep = TREND_POOL_SIZE.min(scores.len());
// shrink sorted set of tags as long as score is 0
while keep > 0 && scores[keep - 1].0 <= 0.0 {
keep -= 1;
}
let remove = scores[keep..].iter()
.map(|(_score, trend_tag)| trend_tag.name.borrow())
.filter(|tag| old_tags.contains(*tag))
.collect::<Vec<&str>>();
let add = scores[..keep].iter()
.map(|(_score, trend_tag)| trend_tag.name.borrow())
.filter(|tag| ! old_tags.contains(*tag))
.collect::<Vec<&str>>();
(period1, keep, scores.len(), remove, add)
});
// print
for (period, keep, total, remove, add) in updates.clone() {
if add.len() > 0 || remove.len() > 0 {
tracing::info!("Trending in {:?} for {} of {}/{}: +{:?} -{:?}", language, period, keep, total, add, remove);
}
}
// write to redis
store.update_trend_pools(
language,
updates.clone().map(|(period, _, _, remove, _add)| (period, remove)),
updates.clone().map(|(period, _, _, _remove, add)| (period, add)),
updates.map(|(period, keep, _, _, _)| (period, keep)),
).await?;
let t2 = Instant::now();
metrics::histogram!("hunter_trend_setter_seconds", t2 - t1, "lang" => if language.is_some() { "some" } else { "any" });
Ok(())
}