cave/trend_tag: switch from by_hours to hour_users algorithm

This commit is contained in:
Astro 2022-11-14 03:14:12 +01:00
parent 9e73a2eec0
commit e49ae907e5
9 changed files with 112 additions and 153 deletions

View File

@ -6,3 +6,10 @@ pub mod store;
pub mod trend_tag;
pub const PERIODS: &[u64] = &[4, 24, 7 * 24];
/// compare the selected period against how many multiples of preceding periods
pub const PERIOD_COMPARE_WINDOW: u64 = 3;
pub fn current_hour() -> u64 {
chrono::offset::Utc::now().naive_utc().timestamp() as u64 / 3600
}

View File

@ -2,11 +2,11 @@ use std::pin::Pin;
use bb8::ManageConnection;
use futures::{Future, Stream, stream::unfold, StreamExt};
use redis::{Value, RedisError, aio::ConnectionLike};
use crate::{feed::Post, trend_tag::TrendTag};
use redis::{Value, RedisError, aio::ConnectionLike, FromRedisValue};
use crate::{feed::Post, trend_tag::TrendTag, PERIOD_COMPARE_WINDOW, current_hour, PERIODS};
const POST_EXPIRE: usize = 86400;
const TAG_EXPIRE: i64 = 30 * 24;
const TAG_EXPIRE: u64 = 30 * 24;
pub const TREND_POOL_SIZE: usize = 30;
@ -147,8 +147,8 @@ impl Store {
return;
}
};
let hour = timestamp.naive_utc().timestamp() / 3600;
let until = chrono::offset::Utc::now().naive_utc().timestamp() / 3600;
let hour = timestamp.naive_utc().timestamp() as u64 / 3600;
let until = current_hour();
if hour > until {
log::warn!("future post from {}", timestamp);
return;
@ -165,12 +165,6 @@ impl Store {
let mut cmd = redis::pipe();
let mut store_tags = |spellings, tag_key, user_key| {
// by hour (TODO: remove eventually)
cmd.hincr(
&tag_key,
format!("t:{}", hour),
1
).ignore();
// by spelling
for spelling in spellings {
cmd.hincr(
@ -278,25 +272,54 @@ impl Store {
Ok(stream)
}
pub async fn get_trend_tags(
&mut self,
language: &Option<String>,
names_in: impl Iterator<Item = String>,
) -> Result<Vec<TrendTag>, RedisError> {
pub async fn clean_trend_tag(&self, language: &Option<String>, tag: &TrendTag) -> Result<(), RedisError> {
if ! tag.other.iter().any(|(name, _)| &name[..2] == "t:") {
return Ok(());
}
let mut cmd = redis::pipe();
let prefix = match language {
Some(language) => format!("l:{}:", language),
None => "g:".to_string(),
};
let names: Vec<String> = names_in.map(|name| {
cmd.hgetall(format!("{}{}", prefix, name));
for (name, _) in &tag.other {
cmd.hdel(tag_key(language, &tag.name), name)
.ignore();
}
cmd.query_async(&mut self.clone()).await
}
pub async fn get_trend_tags(
&self,
language: &Option<String>,
names: impl Iterator<Item = String>,
) -> Result<Vec<TrendTag>, RedisError> {
let until = current_hour();
let from = until - PERIODS.last().unwrap() * (1 + PERIOD_COMPARE_WINDOW);
let mut cmd = redis::pipe();
let names = names.map(|name| {
cmd.hgetall(tag_key(language, &name));
for hour in from..=until {
cmd.scard(format!("u:{}:{}:{}", language.as_ref().map_or("", |l| &l), hour, name));
}
name
}).collect();
let hashes = cmd.query_async::<_, Vec<Vec<String>>>(self).await?;
let results = names.into_iter()
.zip(hashes)
.map(|(name, hash_values)| TrendTag::from_hash(name, hash_values))
.collect();
}).collect::<Vec<String>>();
let mut values = cmd.query_async::<_, Vec<Value>>(&mut self.clone()).await?
.into_iter();
let mut results = Vec::with_capacity(names.len());
for name in names.into_iter() {
let hash_values = if let Some(Value::Bulk(hash_values)) = values.next() {
hash_values
} else {
panic!("hash_values");
};
let hash_values = hash_values.iter()
.map(|value| String::from_redis_value(value).expect("hash_values value"))
.collect();
let hour_users = (from..=until).map(|hour| {
let users = usize::from_redis_value(&values.next().unwrap()).expect("hour_users");
(hour, users)
}).collect();
results.push(TrendTag::from_hash(name.to_string(), hash_values, hour_users));
}
Ok(results)
}
@ -360,27 +383,6 @@ impl Store {
Ok(())
}
pub async fn delete_tag_hours(
&mut self,
language: &Option<String>,
tag: &str,
hours: impl Iterator<Item = u64>,
) -> Result<(), RedisError> {
let key = match language {
Some(language) => format!("l:{}:{}", language, tag),
None => format!("g:{}", tag),
};
let hash_keys = hours.map(|hour| format!("t:{}", hour))
.collect::<Vec<_>>();
if hash_keys.len() > 0 {
redis::Cmd::hdel(key, hash_keys)
.query_async(self)
.await
} else {
Ok(())
}
}
pub async fn delete_tag(
&mut self,
language: &Option<String>,
@ -396,6 +398,13 @@ impl Store {
}
}
fn tag_key(language: &Option<String>, name: &str) -> String {
match language {
Some(language) => format!("l:{}:{}", language, name),
None => format!("g:{}", name),
}
}
fn pool_key(language: &Option<String>, period: u64) -> String {
match language {
Some(language) =>

View File

@ -1,70 +1,23 @@
use std::collections::{BTreeSet, HashMap};
use redis::{
aio::ConnectionManager,
RedisError,
};
use crate::PERIOD_COMPARE_WINDOW;
const MIN_AFTER_MENTIONS: usize = 3;
#[derive(Debug)]
pub struct TrendTag {
pub name: String,
pub by_hour: Vec<(u64, usize)>,
other: Vec<(String, String)>,
pub hour_users: Vec<(u64, usize)>,
pub other: Vec<(String, String)>,
}
impl TrendTag {
pub async fn for_each<F: FnMut(Self)>(
redis_man: &mut ConnectionManager,
language: Option<String>,
mut f: F,
) -> Result<(), RedisError> {
let prefix = match language {
Some(language) => format!("l:{}:", language),
None => "g:".to_string(),
};
let mut cursor = None;
while cursor != Some(0) {
let mut cmd = redis::cmd("SCAN");
cmd.cursor_arg(cursor.unwrap_or(0))
.arg("MATCH").arg(format!("{}*", prefix))
.arg("COUNT").arg(1000);
let (next_cursor, keys) =
cmd.query_async::<_, (u64, Vec<String>)>(redis_man)
.await?;
let mut cmd = redis::pipe();
for key in &keys {
cmd.hgetall(key);
}
let others =
cmd.query_async::<_, Vec<Vec<String>>>(redis_man)
.await?;
for (key, other) in keys.iter().zip(others) {
let name = key[prefix.len()..].to_string();
let tag = Self::from_hash(name, other);
f(tag);
}
cursor = Some(next_cursor);
}
Ok(())
}
pub(crate) fn from_hash(name: String, hash_values: Vec<String>) -> Self {
let mut by_hour = Vec::with_capacity(hash_values.len() / 2);
pub(crate) fn from_hash(name: String, hash_values: Vec<String>, hour_users: Vec<(u64, usize)>) -> Self {
let mut other = Vec::with_capacity(hash_values.len() / 2);
let mut key: Option<String> = None;
for value in hash_values.into_iter() {
if let Some(key) = key.take() {
if &key[..2] == "t:" {
if let (Ok(hour), Ok(value)) = (str::parse(&key[2..]), str::parse(&value)) {
by_hour.push((hour, value));
}
} else if let Ok(value) = str::parse(&value) {
if let Ok(value) = str::parse(&value) {
other.push((key, value));
}
} else {
@ -72,12 +25,9 @@ impl TrendTag {
}
}
#[cfg(debug)]
by_hour.sort();
TrendTag {
name,
by_hour,
hour_users,
other,
}
}
@ -89,12 +39,12 @@ impl TrendTag {
}
let from = until - period;
let not_before = from - 3 * period;
let not_before = from - PERIOD_COMPARE_WINDOW * period;
let mut before_mentions = 0;
let mut before_hours = 0;
let mut after_mentions = 0;
for (hour, mentions) in self.by_hour.iter().cloned() {
for (hour, mentions) in self.hour_users.iter().cloned() {
if hour > from {
after_mentions += mentions;
} else if hour > not_before {
@ -115,11 +65,11 @@ impl TrendTag {
}
pub fn hour_scores(&self, period: u64, until: u64) -> Vec<usize> {
let hours = self.by_hour.iter().cloned()
let hours = self.hour_users.iter().cloned()
.collect::<HashMap<_, _>>();
let from = until - period;
let not_before = from - 3 * period;
let not_before = from - PERIOD_COMPARE_WINDOW * period;
(not_before + 1 ..= until).map(|hour|
*hours.get(&hour).unwrap_or(&0)
).collect()

View File

@ -4,6 +4,7 @@ use std::sync::Arc;
use redis::{
RedisError,
};
use cave::current_hour;
use cave::store::Store;
use cave::trend_tag::TrendTag;
@ -48,7 +49,7 @@ impl TrendAnalyzer {
periods: &[u64],
language: Option<String>,
) -> Result<TrendsResults, RedisError> {
let until = (chrono::offset::Utc::now().naive_utc().timestamp() / 3600) as u64;
let until = current_hour();
let mut analyzers: Vec<TrendAnalyzer> = periods.iter()
.cloned()
.map(|period| TrendAnalyzer {

View File

@ -8,8 +8,11 @@ futures = "0.3"
tokio = { version = "1", features = ["full"] }
reqwest = { version = "0.11", features = ["json", "deflate", "gzip"] }
serde = { version = "1", features = ["derive"] }
# serde_yaml = "0.9"
chrono = "0.4"
# redis = { version = "0.22", features = ["tokio-comp", "connection-manager"] }
log = "0.4"
# systemd = "0.10"
cave = { path = "../cave" }
rand = "0.8"
texting_robots = "0.2"

View File

@ -2,28 +2,28 @@ redis: redis://10.233.12.2:6379/
hosts:
- chaos.social
- dresden.network
- mastodon.social
- social.librem.one
- fosstodon.org
- nixnet.social
- mastodon.technology
- o3o.ca
- troet.cafe
- mastodon.top
- mastodon.cloud
- pawoo.net
- mamot.fr
- switter.at
- mastodon.online
- bonn.social
- social.bund.de
- gruene.social
- mstdn.io
- octodon.social
- fosstodon.social
- freeradical.zone
- ubuntu.social
- uwu.social
# - dresden.network
# - mastodon.social
# - social.librem.one
# - fosstodon.org
# - nixnet.social
# - mastodon.technology
# - o3o.ca
# - troet.cafe
# - mastodon.top
# - mastodon.cloud
# - pawoo.net
# - mamot.fr
# - switter.at
# - mastodon.online
# - bonn.social
# - social.bund.de
# - gruene.social
# - mstdn.io
# - octodon.social
# - fosstodon.social
# - freeradical.zone
# - ubuntu.social
# - uwu.social
max_workers: 16
max_workers: 8

View File

@ -32,7 +32,7 @@ async fn main() {
for host in config.hosts.into_iter() {
scheduler.introduce(host).await;
}
{
if false {
cave::systemd::status("Loading known hosts from redis");
let hosts = store.get_hosts()
.await.expect("get_hosts");

View File

@ -58,25 +58,13 @@ async fn trim_tag(language: Option<String>, tag: String, store: &mut Store) {
.unwrap();
let trend_tag = &trend_tags[0];
let until = (chrono::offset::Utc::now().naive_utc().timestamp() / 3600) as u64;
let not_before = until - 8 * 7 * 24;
if trend_tag.by_hour.iter().any(|(hour, _)| *hour > not_before) {
// keep the partially-outdated tag
if trend_tag.by_hour.iter().any(|(hour, _)| *hour <= not_before) {
log::debug!("Deleting old hours from {:?} {}", language, trend_tag.name);
store.delete_tag_hours(
&language,
&trend_tag.name,
trend_tag.by_hour.iter()
.map(|(hour, _)| *hour)
.filter(|hour| *hour <= not_before)
).await.expect("delete_tag_hours");
}
} else {
// drop the whole wholly-outdated/empty tag
if trend_tag.hour_users.iter().all(|(_, users)| *users == 0) {
// drop the whole wholly-outdated tag
log::debug!("Deleting whole tag {:?} {}", language, trend_tag.name);
store.delete_tag(&language, &trend_tag.name).await
.expect("delete_tag");
} else {
store.clean_trend_tag(&language, trend_tag).await
.expect("clean_trend_tag");
}
}

View File

@ -9,6 +9,7 @@ use tokio::{
time::timeout,
};
use cave::{
current_hour,
feed::Post,
store::{Store, TREND_POOL_SIZE}, trend_tag::TrendTag,
};
@ -120,7 +121,7 @@ async fn run(
.collect();
// calculate scores for tags currently in the pool
let until = (chrono::offset::Utc::now().naive_utc().timestamp() / 3600) as u64;
let until = current_hour();
let trend_tags = store.get_trend_tags(language, all.into_iter()).await?;
for trend_tag in &trend_tags {
for (period, scores) in &mut period_scores {