gatherer: add PoC

This commit is contained in:
Astro 2022-11-05 20:04:31 +01:00
parent 6ac923f270
commit c3039f96b6
5 changed files with 1291 additions and 0 deletions

1054
gatherer/Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

14
gatherer/Cargo.toml Normal file
View File

@ -0,0 +1,14 @@
[package]
name = "caveman-gatherer"
version = "0.0.0"
edition = "2021"
[dependencies]
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
serde_yaml = "0.9"
chrono = "0.4"
redis = { version = "0.22", features = ["tokio-comp", "connection-manager"] }
log = "0.4"
env_logger = "0.9"
systemd = "0.10"

17
gatherer/src/main.rs Normal file
View File

@ -0,0 +1,17 @@
mod tag;
mod trends;
#[tokio::main]
async fn main() {
let redis_client = redis::Client::open("redis://10.233.12.2:6379/")
// let redis_client = redis::Client::open("redis://localhost:6378/")
.expect("redis::Client");
let mut redis_man = redis::aio::ConnectionManager::new(redis_client).await
.expect("redis::aio::ConnectionManager");
//let r = trends::TrendAnalyzer::run(&mut redis_man, 20, &[4, 25, 24 * 7], None)
let r = trends::TrendAnalyzer::run(&mut redis_man, 20, &[4], None)
.await
.unwrap();
dbg!(r);
}

112
gatherer/src/tag.rs Normal file
View File

@ -0,0 +1,112 @@
use std::rc::Rc;
use redis::{
aio::ConnectionManager,
RedisError,
};
const MIN_AFTER_MENTIONS: usize = 3;
#[derive(Debug)]
pub struct Tag {
pub name: Rc<String>,
by_hour: Vec<(u64, usize)>,
other: Vec<(String, String)>,
// by_host
// by_spelling
}
impl Tag {
pub async fn for_each<F: FnMut(Self)>(
redis_man: &mut ConnectionManager,
language: Option<String>,
mut f: F,
) -> Result<(), RedisError> {
let prefix = match language {
Some(language) => format!("l:{}:", language),
None => "g:".to_string(),
};
let mut cursor = None;
while cursor != Some(0) {
let mut cmd = redis::cmd("SCAN");
cmd.cursor_arg(cursor.unwrap_or(0))
.arg("MATCH").arg(format!("{}*", prefix))
.arg("COUNT").arg(1000);
let (next_cursor, keys) =
cmd.query_async::<_, (u64, Vec<String>)>(redis_man)
.await?;
let mut cmd = redis::pipe();
for key in &keys {
cmd.hgetall(key);
}
let others =
cmd.query_async::<_, Vec<Vec<String>>>(redis_man)
.await?;
for (key, other) in keys.iter().zip(others) {
let name = key[prefix.len()..].to_string();
let tag = Self::from_hash(name, other);
f(tag);
}
cursor = Some(next_cursor);
}
Ok(())
}
fn from_hash(name: String, hash_values: Vec<String>) -> Self {
let mut by_hour = Vec::with_capacity(hash_values.len() / 2);
let mut other = Vec::with_capacity(hash_values.len() / 2);
let mut key: Option<String> = None;
for value in hash_values.into_iter() {
if let Some(key) = key.take() {
if &key[..2] == "t:" {
if let (Ok(hour), Ok(value)) = (str::parse(&key[2..]), str::parse(&value)) {
by_hour.push((hour, value));
}
} else if let Ok(value) = str::parse(&value) {
other.push((key, value));
}
} else {
key = Some(value);
}
}
#[cfg(debug)]
by_hour.sort();
Tag {
name: Rc::new(name),
by_hour,
other,
}
}
pub fn score(&self, period: u64, until: u64) -> f64 {
let from = until - period;
let mut before_mentions = 0;
let mut before_hours = 0;
let mut after_mentions = 0;
for (hour, mentions) in self.by_hour.iter().cloned() {
if hour <= from {
before_mentions += mentions;
before_hours += 1;
} else {
after_mentions += mentions;
}
}
if after_mentions < MIN_AFTER_MENTIONS * (period as usize) {
return 0.;
}
let before = if before_hours > 0 && before_mentions > 0 {
(before_mentions as f64) / (before_hours as f64)
} else { 0.1 };
let after = (after_mentions as f64) / (period as f64);
after / before
}
}

94
gatherer/src/trends.rs Normal file
View File

@ -0,0 +1,94 @@
use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::rc::Rc;
use redis::{
aio::ConnectionManager,
RedisError,
};
use crate::tag::Tag;
#[derive(Debug, Clone, PartialEq, PartialOrd)]
struct ScoreKey {
score: f64,
tag: Rc<String>,
}
impl Eq for ScoreKey {
}
impl Ord for ScoreKey {
fn cmp(&self, other: &Self) -> Ordering {
if self.score == other.score {
self.tag.as_ref().cmp(other.tag.as_ref())
} else if self.score < other.score {
Ordering::Less
} else {
Ordering::Greater
}
}
}
#[derive(Debug)]
pub struct TrendAnalyzer {
/// in hours
period: u64,
/// *now* in hours
until: u64,
size: usize,
/// key contains name to avoid collision by just score
results: BTreeMap<ScoreKey, Rc<Tag>>,
score_threshold: Option<f64>,
}
impl TrendAnalyzer {
pub async fn run(
redis_man: &mut ConnectionManager,
size: usize,
periods: &[u64],
language: Option<String>,
) -> Result<Vec<Self>, RedisError> {
let until = (chrono::offset::Utc::now().naive_utc().timestamp() / 3600) as u64;
let mut analyzers: Vec<TrendAnalyzer> = periods.iter()
.cloned()
.map(|period| TrendAnalyzer {
period,
until,
size,
results: BTreeMap::new(),
score_threshold: None,
}).collect();
Tag::for_each(redis_man, language, |tag| {
let tag = Rc::new(tag);
for analyzer in &mut analyzers {
analyzer.process_tag(&tag);
}
}).await?;
Ok(analyzers)
}
pub fn process_tag(&mut self, tag: &Rc<Tag>) {
let score = tag.score(self.period, self.until);
if score <= 1.0 {
// discard downwards trends
return;
}
if self.score_threshold.map_or(false, |score_threshold| score < score_threshold) {
// score is below self.result[..self.size].score
return;
}
self.results.insert(ScoreKey { score, tag: tag.name.clone() }, tag.clone());
let mut least = self.results.keys().cloned().next().unwrap();
if self.results.len() > self.size {
self.results.remove(&least);
least = self.results.keys().cloned().next().unwrap().clone();
}
self.score_threshold = Some(least.score);
}
}