hunter/trend_setter: implement maintaining the pools

This commit is contained in:
Astro 2022-11-09 18:11:02 +01:00
parent a98a669086
commit 0c23fad76d
6 changed files with 191 additions and 39 deletions

View File

@ -4,3 +4,5 @@ pub mod config;
pub mod feed;
pub mod store;
pub mod trend_tag;
pub const PERIODS: &[u64] = &[4, 24, 7 * 24];

View File

@ -1,9 +1,16 @@
use crate::feed::Post;
use std::collections::HashSet;
use redis::{Value, RedisError};
use crate::{feed::Post, trend_tag::TrendTag};
const POST_EXPIRE: usize = 86400;
const TAG_EXPIRE: i64 = 30 * 24;
const LANGUAGE_EXPIRE: usize = 86400;
pub const TREND_POOL_SIZE: usize = 30;
pub type Error = RedisError;
#[derive(Clone)]
pub struct Store {
man: redis::aio::ConnectionManager,
@ -19,18 +26,17 @@ impl Store {
Self { man }
}
pub async fn save_post(&mut self, post: Post) -> bool {
pub async fn save_post(&mut self, post: Post) -> Result<bool, RedisError> {
let post_key = format!("p:{}", post.uri);
let check = redis::pipe()
.getset(&post_key, "1")
.expire(post_key, POST_EXPIRE)
.ignore()
.query_async::<_, redis::Value>(&mut self.man)
.await
.unwrap();
if check != redis::Value::Bulk(vec![redis::Value::Nil]) {
.query_async::<_, Value>(&mut self.man)
.await?;
if check != Value::Bulk(vec![Value::Nil]) {
// post is not new
return false;
return Ok(false);
}
log::info!("New post ({}{} tags): {}",
if post.account.bot { "bot, " } else { "" },
@ -39,7 +45,7 @@ impl Store {
self.save_post_tags(post).await;
// post was new
true
Ok(true)
}
async fn save_post_tags(&mut self, post: Post) {
@ -126,31 +132,102 @@ impl Store {
}
}
pub async fn save_host(&mut self, host: &str) {
pub async fn save_host(&mut self, host: &str) -> Result<(), RedisError> {
redis::Cmd::set(format!("h:{}", host), "1")
.query_async::<_, ()>(&mut self.man)
.await
.unwrap();
}
pub async fn get_hosts(&mut self) -> Vec<String> {
pub async fn get_hosts(&mut self) -> Result<Vec<String>, RedisError> {
let mut results = vec![];
self.scan("h:*", |key| results.push(key[2..].to_string())).await;
results
self.scan("h:*", |key| results.push(key[2..].to_string())).await?;
Ok(results)
}
async fn scan<F: FnMut(String)>(
&mut self,
pattern: &str,
mut f: F,
) {
) -> Result<(), RedisError> {
let mut cmd = redis::cmd("SCAN");
cmd.cursor_arg(0).arg("MATCH").arg(pattern);
let mut iter = cmd.iter_async::<String>(&mut self.man)
.await
.unwrap();
.await?;
while let Some(key) = iter.next_item().await {
f(key);
}
Ok(())
}
pub async fn get_trend_tags(
&mut self,
language: &Option<String>,
names_in: HashSet<&String>,
) -> Result<Vec<TrendTag>, RedisError> {
let mut cmd = redis::pipe();
let prefix = match language {
Some(language) => format!("l:{}:", language),
None => "g:".to_string(),
};
let mut names = Vec::with_capacity(names_in.len());
for name in names_in.into_iter() {
cmd.hgetall(format!("{}{}", prefix, name));
names.push(name);
}
let hashes = cmd.query_async::<_, Vec<Vec<String>>>(&mut self.man).await?;
let results = names.into_iter()
.zip(hashes)
.map(|(name, hash_values)| TrendTag::from_hash(name.to_string(), hash_values))
.collect();
Ok(results)
}
pub async fn get_trend_pools(
&mut self,
language: &Option<String>,
periods: &[u64],
) -> Result<Vec<(u64, Vec<String>)>, RedisError> {
let mut cmd = redis::pipe();
for period in periods {
cmd.smembers(pool_key(language, *period));
}
let sets: Vec<Vec<String>> = cmd.query_async(&mut self.man)
.await?;
let results = periods.iter().cloned()
.zip(sets.into_iter())
.collect();
Ok(results)
}
pub async fn update_trend_pools(
&mut self,
remove: impl Iterator<Item = (&Option<String>, u64, Vec<&str>)>,
add: impl Iterator<Item = (&Option<String>, u64, Vec<&str>)>,
) -> Result<(), RedisError> {
let mut cmd = redis::pipe();
for (language, period, tags) in remove {
if ! tags.is_empty() {
cmd.srem(pool_key(language, period), tags)
.ignore();
}
}
for (language, period, tags) in add {
if ! tags.is_empty() {
cmd.sadd(pool_key(language, period), tags)
.ignore();
}
}
cmd.query_async(&mut self.man)
.await?;
Ok(())
}
}
fn pool_key(language: &Option<String>, period: u64) -> String {
match language {
Some(language) =>
format!("q:{}:{}", period, language),
None =>
format!("q:{}", period),
}
}

View File

@ -1,5 +1,4 @@
use std::collections::BTreeSet;
use std::sync::Arc;
use redis::{
aio::ConnectionManager,
RedisError,
@ -9,7 +8,7 @@ const MIN_AFTER_MENTIONS: usize = 3;
#[derive(Debug)]
pub struct TrendTag {
pub name: Arc<String>,
pub name: String,
by_hour: Vec<(u64, usize)>,
other: Vec<(String, String)>,
}
@ -54,7 +53,7 @@ impl TrendTag {
Ok(())
}
fn from_hash(name: String, hash_values: Vec<String>) -> Self {
pub(crate) fn from_hash(name: String, hash_values: Vec<String>) -> Self {
let mut by_hour = Vec::with_capacity(hash_values.len() / 2);
let mut other = Vec::with_capacity(hash_values.len() / 2);
@ -77,7 +76,7 @@ impl TrendTag {
by_hour.sort();
TrendTag {
name: Arc::new(name),
name,
by_hour,
other,
}

View File

@ -5,6 +5,7 @@ use cave::config::LoadConfig;
mod config;
mod scheduler;
mod worker;
mod trend_setter;
use worker::Message;

View File

@ -1,6 +1,8 @@
use std::{
time::{Duration, Instant},
borrow::Borrow,
cmp::Ordering,
collections::{HashSet, HashMap, BTreeMap},
time::{Duration, Instant},
};
use tokio::{
sync::mpsc::{channel, Sender},
@ -8,11 +10,11 @@ use tokio::{
};
use cave::{
feed::Post,
store::Store,
store::{Store, TREND_POOL_SIZE}, trend_tag::TrendTag,
};
// const MIN_FLUSH_INTERVAL: Duration = Duration::from_secs(10);
const MIN_FLUSH_INTERVAL: Duration = Duration::from_secs(5);
// const MIN_INTERVAL: Duration = Duration::from_secs(10);
const MIN_INTERVAL: Duration = Duration::from_secs(5);
#[derive(Debug)]
pub struct UpdateSet {
@ -37,7 +39,7 @@ impl From<&Post> for UpdateSet {
pub type Tx = Sender<UpdateSet>;
pub fn start(store: Store) -> Tx {
pub fn start(mut store: Store) -> Tx {
let (tx, mut rx) = channel::<UpdateSet>(1024);
tokio::spawn(async move {
@ -53,7 +55,7 @@ pub fn start(store: Store) -> Tx {
match buffer.entry(language.clone()) {
std::collections::hash_map::Entry::Vacant(entry) => {
entry.insert(tags);
queue.insert(Instant::now() + MIN_FLUSH_INTERVAL, language);
queue.insert(Instant::now() + MIN_INTERVAL, language);
}
std::collections::hash_map::Entry::Occupied(mut entry) => {
// merge into buffered
@ -65,27 +67,27 @@ pub fn start(store: Store) -> Tx {
}
loop {
let mut next_flush = queue.keys().cloned().next();
// log::trace!("next_flush in {:?}", next_flush - Instant::now());
if let Some(next_flush_) = next_flush {
let mut next_run = queue.keys().cloned().next();
// log::trace!("next_run in {:?}", next_run - Instant::now());
if let Some(next_run_) = next_run {
let now = Instant::now();
if next_flush_ <= now {
let language = queue.remove(&next_flush_).unwrap();
if next_run_ <= now {
let language = queue.remove(&next_run_).unwrap();
let buffered = buffer.remove(&language).unwrap();
// TODO: flush
log::debug!("flush {:?}: {:?}", language, buffered);
log::debug!("run {:?}: {:?}", language, buffered);
run(&language, &buffered, &mut store).await.unwrap();
// update with next in queue
next_flush = queue.keys().cloned().next();
next_run = queue.keys().cloned().next();
}
}
let now = Instant::now();
let interval = next_flush.map_or(
let interval = next_run.map_or(
Duration::from_secs(3600),
|next_flush| next_flush - now
|next_run| next_run - now
);
log::trace!("interval {:?}", interval);
let _ = timeout(interval, async {
if let Some(update_set) = rx.recv().await {
enqueue(None, &mut queue, &mut buffer, update_set.tags.clone());
@ -99,3 +101,74 @@ pub fn start(store: Store) -> Tx {
tx
}
async fn run(
language: &Option<String>,
new: &HashSet<String>,
store: &mut Store,
) -> Result<(), cave::store::Error> {
let old = store.get_trend_pools(language, cave::PERIODS).await?;
let all: HashSet<&String> = old.iter()
.flat_map(|(_period, tags)| tags)
.chain(new)
.collect();
let mut period_scores: Vec<(u64, Vec<(f64, &TrendTag)>)> = cave::PERIODS.iter()
.map(|period| (*period, Vec::with_capacity(all.len())))
.collect();
// calculate scores for tags currently in the pool
let until = (chrono::offset::Utc::now().naive_utc().timestamp() / 3600) as u64;
let trend_tags = store.get_trend_tags(language, all).await?;
for trend_tag in &trend_tags {
for (period, scores) in &mut period_scores {
let score = trend_tag.score(*period, until);
scores.push((score, trend_tag));
}
}
// order
for (_period, scores) in &mut period_scores {
scores.sort_by(|(score1, _), (score2, _)| {
if score1 > score2 {
Ordering::Less
} else if score1 < score2 {
Ordering::Greater
} else {
Ordering::Equal
}
});
// dbg!(_period);
// dbg!(scores);
}
// separate new, kept, old tags
let updates = old.into_iter().zip(&period_scores)
.map(|((period1, old_tags), (period2, scores))| {
assert_eq!(period1, *period2);
let old_tags = old_tags.into_iter().collect::<HashSet<String>>();
let keep = TREND_POOL_SIZE.min(scores.len());
let add = scores[..keep].iter()
.map(|(_score, trend_tag)| trend_tag.name.borrow())
.filter(|tag| old_tags.get(*tag).is_none())
.collect::<Vec<&str>>();
if add.clone().len() > 0 {
log::info!("Now trending in {:?} for {}: {:?}", language, period1, add.clone());
}
let remove = scores[keep..].iter()
.map(|(_score, trend_tag)| trend_tag.name.borrow())
.filter(|tag| old_tags.get(*tag).is_some())
.collect::<Vec<&str>>();
if remove.clone().len() > 0 {
log::info!("No longer trending in {:?} for {}: {:?}", language, period1, remove.clone());
}
(period1, remove, add)
});
store.update_trend_pools(
updates.clone().map(|(period, remove, _add)| (language, period, remove)),
updates.map(|(period, _remove, add)| (language, period, add)),
).await?;
Ok(())
}

View File

@ -58,7 +58,7 @@ pub fn fetch_and_process(
if let Some(author_host) = post.account.host() {
// send away to redis
let update_set = UpdateSet::from(&post);
if store.save_post(post).await {
if store.save_post(post).await == Ok(true) {
new_posts += 1;
if ! update_set.is_empty() {
@ -85,7 +85,7 @@ pub fn fetch_and_process(
let _ = message_tx.send(Message::IntroduceHosts { hosts });
// successfully fetched, save for future run
store.save_host(&host).await;
store.save_host(&host).await.unwrap();
message_tx.send(Message::Fetched {
host: host.clone(),