diff --git a/Cargo.lock b/Cargo.lock index dbfa7d3..b2ce625 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -287,6 +287,25 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "caveman-butcher" +version = "0.0.0" +dependencies = [ + "cave", + "chrono", + "futures", + "metrics", + "metrics-exporter-prometheus", + "metrics-util", + "rand", + "reqwest", + "serde", + "serde_json", + "texting_robots", + "tokio", + "tracing", +] + [[package]] name = "caveman-gatherer" version = "0.0.0" diff --git a/butcher/Cargo.toml b/butcher/Cargo.toml new file mode 100644 index 0000000..4484875 --- /dev/null +++ b/butcher/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "caveman-butcher" +version = "0.0.0" +edition = "2021" + +[dependencies] +futures = "0.3" +tokio = { version = "1", features = ["full", "tracing"] } +reqwest = { version = "0.11", features = ["json", "deflate", "gzip", "trust-dns"] } +serde = { version = "1", features = ["derive"] } +# serde_yaml = "0.9" +serde_json = "1" +chrono = "0.4" +# redis = { version = "0.22", features = ["tokio-comp", "connection-manager"] } +tracing = "0.1" +# systemd = "0.10" +cave = { path = "../cave" } +rand = "0.8" +texting_robots = "0.2" +metrics = "0.20" +metrics-util = "0.14" +metrics-exporter-prometheus = "0.11" diff --git a/butcher/config.yaml b/butcher/config.yaml new file mode 100644 index 0000000..bdc9129 --- /dev/null +++ b/butcher/config.yaml @@ -0,0 +1,2 @@ +#redis: redis://10.233.12.2:6379/ +redis: redis://127.0.0.1:6378/ diff --git a/butcher/src/config.rs b/butcher/src/config.rs new file mode 100644 index 0000000..5e4e419 --- /dev/null +++ b/butcher/src/config.rs @@ -0,0 +1,4 @@ +#[derive(Debug, serde::Deserialize)] +pub struct Config { + pub redis: String, +} diff --git a/butcher/src/main.rs b/butcher/src/main.rs new file mode 100644 index 0000000..7b935b3 --- /dev/null +++ b/butcher/src/main.rs @@ -0,0 +1,56 @@ +use futures::StreamExt; +use cave::{ + config::LoadConfig, + firehose::FirehoseFactory, +}; +use trend_setter::UpdateSet; + +mod config; +mod trend_setter; +mod tag_trimmer; + +#[tokio::main] +async fn main() { + cave::systemd::extend_timeout(100_000); + + cave::init::exit_on_panic(); + cave::init::init_logger(5555); + + let config = config::Config::load(); + + let store = cave::store::Store::new(16, config.redis.clone()).await; + + cave::systemd::status("Starting trend_setter"); + let trend_setter_tx = trend_setter::start(store.clone()); + + cave::systemd::status("Starting tag_trimmer"); + tag_trimmer::start(store.clone()); + + let firehose_factory = FirehoseFactory::new(config.redis); + let firehose = firehose_factory.produce() + .await + .expect("firehose") + .filter_map(|item| async move { item.ok() }); + cave::systemd::ready(); + + firehose.for_each(move |data| { + let trend_setter_tx = trend_setter_tx.clone(); + let mut store = store.clone(); + async move { + let post = match serde_json::from_slice(&data) { + Ok(post) => + post, + Err(e) => { + tracing::error!("Cannot parse JSON: {:?}", e); + return; + }, + }; + store.save_post_tags(&post).await; + + let update_set = UpdateSet::from(&post); + if ! update_set.is_empty() { + trend_setter_tx.send(update_set).await.unwrap(); + } + } + }).await; +} diff --git a/butcher/src/tag_trimmer.rs b/butcher/src/tag_trimmer.rs new file mode 100644 index 0000000..18dadc2 --- /dev/null +++ b/butcher/src/tag_trimmer.rs @@ -0,0 +1,79 @@ +use std::time::{Duration, Instant}; + +use futures::{ + future::join, + StreamExt, +}; +use tokio::time::sleep; +use cave::store::Store; + +const TRIM_INTERVAL: Duration = Duration::from_secs(86400); +const TRIM_STEP_SLEEP: Duration = Duration::from_millis(30); + +pub fn start(store: Store) { + tokio::spawn(async move { + loop { + let start = Instant::now(); + run(&store).await; + let end = Instant::now(); + tracing::info!("trimmed in {:.3?}", end - start); + + sleep(TRIM_INTERVAL).await; + } + }); +} + +async fn run(store: &Store) { + let mut store1 = store.clone(); + let mut store3 = store.clone(); + + join(async { + let store2 = store1.clone(); + store1.get_tags_global() + .await + .unwrap() + .for_each(move |tag| { + let mut store2 = store2.clone(); + async move { + trim_tag(None, tag, &mut store2).await; + sleep(TRIM_STEP_SLEEP).await; + } + }) + .await + }, async { + let store4 = store3.clone(); + store3.get_tags_by_language() + .await + .unwrap() + .for_each(move |(language, tag)| { + let mut store4 = store4.clone(); + async move { + trim_tag(language, tag, &mut store4).await; + sleep(TRIM_STEP_SLEEP).await; + }}) + .await + }).await; +} + +async fn trim_tag(language: Option, tag: String, store: &mut Store) { + let t1 = Instant::now(); + let lang = if language.is_some() { "some" } else { "any" }; + let trend_tags = store.get_trend_tags(&language, [tag].into_iter()) + .await + .unwrap(); + let trend_tag = &trend_tags[0]; + + if trend_tag.hour_users.iter().all(|(_, users)| *users == 0) { + // drop the whole wholly-outdated tag + tracing::debug!("Deleting whole tag {:?} {}", language, trend_tag.name); + store.delete_tag(&language, &trend_tag.name).await + .expect("delete_tag"); + metrics::increment_counter!("hunter_trimmed_tags", "type" => "delete_tag", "lang" => lang); + } else { + store.clean_trend_tag(&language, trend_tag).await + .expect("clean_trend_tag"); + metrics::increment_counter!("hunter_trimmed_tags", "type" => "clean_trend_tag", "lang" => lang); + } + let t2 = Instant::now(); + metrics::histogram!("hunter_trim_tag_seconds", t2 - t1, "lang" => lang); +} diff --git a/butcher/src/trend_setter.rs b/butcher/src/trend_setter.rs new file mode 100644 index 0000000..5636aee --- /dev/null +++ b/butcher/src/trend_setter.rs @@ -0,0 +1,188 @@ +use std::{ + borrow::Borrow, + cmp::Ordering, + collections::{HashSet, HashMap, BTreeMap}, + time::{Duration, Instant}, +}; +use tokio::{ + sync::mpsc::{channel, Sender}, + time::timeout, +}; +use cave::{ + current_hour, + feed::Post, + store::{Store, TREND_POOL_SIZE}, trend_tag::TrendTag, +}; + +#[cfg(not(debug))] +const MIN_INTERVAL: Duration = Duration::from_secs(60); +#[cfg(debug)] +const MIN_INTERVAL: Duration = Duration::from_secs(5); + +#[derive(Debug)] +pub struct UpdateSet { + language: Option, + tags: HashSet, +} + +impl UpdateSet { + pub fn is_empty(&self) -> bool { + self.tags.is_empty() + } +} + +impl From<&Post> for UpdateSet { + fn from(post: &Post) -> Self { + UpdateSet { + language: post.lang(), + tags: post.tags_set().keys().cloned().collect(), + } + } +} + +pub type Tx = Sender; + +pub fn start(mut store: Store) -> Tx { + let (tx, mut rx) = channel::(1024); + + tokio::spawn(async move { + let mut queue = BTreeMap::>::new(); + // by language + let mut buffer = HashMap::, HashSet>::new(); + fn enqueue( + language: Option, + queue: &mut BTreeMap::>, + buffer: &mut HashMap::, HashSet>, + tags: HashSet, + ) { + match buffer.entry(language.clone()) { + std::collections::hash_map::Entry::Vacant(entry) => { + entry.insert(tags); + queue.insert(Instant::now() + MIN_INTERVAL, language); + } + std::collections::hash_map::Entry::Occupied(mut entry) => { + // merge into buffered + for tag in tags.into_iter() { + entry.get_mut().insert(tag); + } + } + } + } + + loop { + let mut next_run = queue.keys().cloned().next(); + // tracing::trace!("next_run in {:?}", next_run - Instant::now()); + if let Some(next_run_) = next_run { + let now = Instant::now(); + if next_run_ <= now { + let language = queue.remove(&next_run_).unwrap(); + let buffered = buffer.remove(&language).unwrap(); + + run(&language, buffered.clone(), &mut store).await.unwrap(); + + // update with next in queue + next_run = queue.keys().cloned().next(); + } + } + + let now = Instant::now(); + let interval = next_run.map_or( + Duration::from_secs(3600), + |next_run| next_run - now + ); + let _ = timeout(interval, async { + if let Some(update_set) = rx.recv().await { + enqueue(None, &mut queue, &mut buffer, update_set.tags.clone()); + if let Some(language) = update_set.language { + enqueue(Some(language), &mut queue, &mut buffer, update_set.tags); + } + } + }).await; + } + }); + + tx +} + +async fn run( + language: &Option, + new: HashSet, + store: &mut Store, +) -> Result<(), cave::store::Error> { + let t1 = Instant::now(); + tracing::debug!("run {:?}: updated {:?}", language, new); + let old = store.get_trend_pools(language, cave::PERIODS).await?; + let all: HashSet = old.iter() + .flat_map(|(_period, tags)| tags.clone()) + .chain(new.into_iter()) + .collect(); + + let mut period_scores: Vec<(u64, Vec<(f64, &TrendTag)>)> = cave::PERIODS.iter() + .map(|period| (*period, Vec::with_capacity(all.len()))) + .collect(); + + // calculate scores for tags currently in the pool + let until = current_hour(); + let trend_tags = store.get_trend_tags(language, all.into_iter()).await?; + for trend_tag in &trend_tags { + for (period, scores) in &mut period_scores { + let score = trend_tag.score(*period, until); + scores.push((score, trend_tag)); + } + } + // order + for (_period, scores) in &mut period_scores { + scores.sort_by(|(score1, _), (score2, _)| { + if score1 > score2 { + Ordering::Less + } else if score1 < score2 { + Ordering::Greater + } else { + Ordering::Equal + } + }); + } + + // separate new, kept, old tags + let updates = old.into_iter().zip(&period_scores) + .map(|((period1, old_tags), (period2, scores))| { + assert_eq!(period1, *period2); + + let old_tags = old_tags.into_iter().collect::>(); + let mut keep = TREND_POOL_SIZE.min(scores.len()); + // shrink sorted set of tags as long as score is 0 + while keep > 0 && scores[keep - 1].0 <= 0. { + keep -= 1; + } + let remove = scores[keep..].iter() + .map(|(_score, trend_tag)| trend_tag.name.borrow()) + .filter(|tag| old_tags.contains(*tag)) + .collect::>(); + let add = scores[..keep].iter() + .map(|(_score, trend_tag)| trend_tag.name.borrow()) + .filter(|tag| ! old_tags.contains(*tag)) + .collect::>(); + + (period1, keep, scores.len(), remove, add) + }); + + // print + for (period, keep, total, remove, add) in updates.clone() { + if add.len() > 0 || remove.len() > 0 { + tracing::info!("Trending in {:?} for {} of {}/{}: +{:?} -{:?}", language, period, keep, total, add, remove); + } + } + + // write to redis + store.update_trend_pools( + language, + updates.clone().map(|(period, _, _, remove, _add)| (period, remove)), + updates.clone().map(|(period, _, _, _remove, add)| (period, add)), + updates.map(|(period, keep, _, _, _)| (period, keep)), + ).await?; + + let t2 = Instant::now(); + metrics::histogram!("hunter_trend_setter_seconds", t2 - t1, "lang" => if language.is_some() { "some" } else { "any" }); + + Ok(()) +} diff --git a/cave/src/store.rs b/cave/src/store.rs index 1da7d47..e158a84 100644 --- a/cave/src/store.rs +++ b/cave/src/store.rs @@ -230,13 +230,12 @@ impl Store { Err(e) => tracing::error!("cannot encode post: {:?}", e), } - self.save_post_tags(&post).await; // post was new Ok(true) } - async fn save_post_tags(&mut self, post: &Post) { + pub async fn save_post_tags(&mut self, post: &Post) { if post.account.bot || post.tags.is_empty() { // irrelevant return; diff --git a/hunter/src/main.rs b/hunter/src/main.rs index cef60cf..f893945 100644 --- a/hunter/src/main.rs +++ b/hunter/src/main.rs @@ -9,8 +9,6 @@ mod config; mod scheduler; mod worker; mod posts_cache; -mod trend_setter; -mod tag_trimmer; use worker::Message; @@ -37,12 +35,6 @@ async fn main() { let mut store = cave::store::Store::new(16, config.redis).await; let posts_cache = posts_cache::PostsCache::new(65536); - cave::systemd::status("Starting trend_setter"); - let trend_setter_tx = trend_setter::start(store.clone()); - - cave::systemd::status("Starting tag_trimmer"); - tag_trimmer::start(store.clone()); - cave::systemd::status("Starting scheduler"); let mut scheduler = scheduler::Scheduler::new(); cave::systemd::status("Loading known hosts from config"); @@ -129,7 +121,6 @@ async fn main() { .spawn(worker::run( message_tx.clone(), store.clone(), - trend_setter_tx.clone(), posts_cache.clone(), client.clone(), host diff --git a/hunter/src/worker.rs b/hunter/src/worker.rs index 5a38c05..f68565a 100644 --- a/hunter/src/worker.rs +++ b/hunter/src/worker.rs @@ -1,6 +1,5 @@ use std::collections::HashSet; use std::future::Future; -use std::ops::Deref; use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; use cave::{ @@ -10,7 +9,6 @@ use cave::{ use futures::{StreamExt, future}; use crate::posts_cache::PostsCache; use crate::scheduler::Host; -use crate::trend_setter::UpdateSet; #[derive(Clone)] pub struct RobotsTxt { @@ -70,7 +68,6 @@ pub enum Message { pub async fn run( message_tx: tokio::sync::mpsc::UnboundedSender, store: Store, - trend_setter_tx: crate::trend_setter::Tx, posts_cache: PostsCache, client: reqwest::Client, host: Host, @@ -82,12 +79,12 @@ pub async fn run( let ((new_post_ratio, mut mean_interval), stream_result) = future::join( fetch_timeline( message_tx.clone(), store.clone(), - trend_setter_tx.clone(), &posts_cache, + &posts_cache, &client, robots_txt.clone(), &host ), open_stream( message_tx.clone(), store.clone(), - trend_setter_tx.clone(), &posts_cache, + &posts_cache, &client, robots_txt, host.clone() ), ).await; @@ -120,7 +117,6 @@ pub async fn run( async fn fetch_timeline( message_tx: tokio::sync::mpsc::UnboundedSender, mut store: Store, - trend_setter_tx: crate::trend_setter::Tx, posts_cache: &PostsCache, client: &reqwest::Client, robots_txt: RobotsTxt, @@ -145,7 +141,7 @@ async fn fetch_timeline( Ok(feed) => { let mean_interval = feed.mean_post_interval(); - let (new_post_ratio, introduce_hosts) = process_posts(&mut store, &trend_setter_tx, posts_cache, &host, feed.posts.into_iter()).await; + let (new_post_ratio, introduce_hosts) = process_posts(&mut store, posts_cache, &host, feed.posts.into_iter()).await; let _ = message_tx.send(Message::IntroduceHosts { hosts: introduce_hosts.into_iter().collect(), }); @@ -164,7 +160,6 @@ async fn fetch_timeline( async fn process_posts( store: &mut Store, - trend_setter_tx: &crate::trend_setter::Tx, posts_cache: &PostsCache, host: &Host, posts: impl Iterator, @@ -189,14 +184,9 @@ async fn process_posts( // check if it's an actual post, not a repost if let Some(author_host) = post.account.host() { - let update_set = UpdateSet::from(post.deref()); // send away to redis if store.save_post(post).await == Ok(true) { new_posts += 1; - - if ! update_set.is_empty() { - trend_setter_tx.send(update_set).await.unwrap(); - } } // introduce instances from authors @@ -230,7 +220,6 @@ async fn process_posts( async fn open_stream( message_tx: tokio::sync::mpsc::UnboundedSender, store: Store, - trend_setter_tx: crate::trend_setter::Tx, posts_cache: &PostsCache, client: &reqwest::Client, robots_txt: RobotsTxt, @@ -255,12 +244,11 @@ async fn open_stream( Ok(stream.for_each(move |post| { let message_tx = message_tx.clone(); let mut store = store.clone(); - let trend_setter_tx = trend_setter_tx.clone(); let posts_cache = posts_cache.clone(); let host = host.clone(); async move { let (_, introduce_hosts) = - process_posts(&mut store, &trend_setter_tx, &posts_cache, &host, [post].into_iter()).await; + process_posts(&mut store, &posts_cache, &host, [post].into_iter()).await; let _ = message_tx.send(Message::IntroduceHosts { hosts: introduce_hosts.into_iter().collect(), }); diff --git a/nixos-module.nix b/nixos-module.nix index 2c07f0d..01488d3 100644 --- a/nixos-module.nix +++ b/nixos-module.nix @@ -16,6 +16,16 @@ let builtins.toJSON hunterSettings ); + butcherDefaultSettings = { + redis = "redis://127.0.0.1:${toString cfg.redis.port}/"; + }; + + butcherSettings = lib.recursiveUpdate butcherDefaultSettings cfg.butcher.settings; + + butcherConfigFile = builtins.toFile "butcher.yaml" ( + builtins.toJSON butcherSettings + ); + gathererDefaultSettings = { redis = "redis://127.0.0.1:${toString cfg.redis.port}/"; listen_port = 8000; @@ -68,6 +78,18 @@ in default = "DEBUG"; }; + butcher.enable = mkEnableOption "caveman butcher"; + + butcher.settings = mkOption { + type = types.anything; + default = butcherDefaultSettings; + }; + + butcher.logLevel = mkOption { + type = types.enum [ "ERROR" "WARN" "INFO" "DEBUG" "TRACE" ]; + default = "DEBUG"; + }; + gatherer.enable = mkEnableOption "caveman gatherer"; gatherer.settings = mkOption { @@ -139,6 +161,34 @@ in }; }; + systemd.services.caveman-butcher = lib.mkIf cfg.butcher.enable { + wantedBy = [ "multi-user.target" ]; + requires = [ "redis-caveman.service" ]; + after = [ "redis-caveman.service" "network-online.target" ]; + environment.RUST_LOG = "caveman=${cfg.butcher.logLevel}"; + serviceConfig = { + ExecStart = "${pkgs.caveman-butcher}/bin/caveman-butcher ${butcherConfigFile}"; + Type = "notify"; + WatchdogSec = 600; + Restart = "always"; + RestartSec = 30; + DynamicUser = true; + User = "caveman-butcher"; + ProtectSystem = "strict"; + ProtectHome = true; + ProtectHostname = true; + ProtectKernelLogs = true; + ProtectKernelModules = true; + ProtectKernelTunables = true; + RestrictNamespaces = true; + RestrictRealtime = true; + LockPersonality = true; + MemoryDenyWriteExecute = true; + LimitNOFile = limitNOFILE; + LimitRSS = "4G:6G"; + }; + }; + systemd.services.caveman-gatherer = lib.mkIf cfg.gatherer.enable { wantedBy = [ "multi-user.target" ]; requires = [ "redis-caveman.service" ];