hunter: split into butcher

This commit is contained in:
Astro 2023-01-22 00:05:10 +01:00
parent 776f3ab48c
commit f2e65fd6cd
11 changed files with 425 additions and 27 deletions

19
Cargo.lock generated
View File

@ -287,6 +287,25 @@ dependencies = [
"tracing-subscriber",
]
[[package]]
name = "caveman-butcher"
version = "0.0.0"
dependencies = [
"cave",
"chrono",
"futures",
"metrics",
"metrics-exporter-prometheus",
"metrics-util",
"rand",
"reqwest",
"serde",
"serde_json",
"texting_robots",
"tokio",
"tracing",
]
[[package]]
name = "caveman-gatherer"
version = "0.0.0"

22
butcher/Cargo.toml Normal file
View File

@ -0,0 +1,22 @@
[package]
name = "caveman-butcher"
version = "0.0.0"
edition = "2021"
[dependencies]
futures = "0.3"
tokio = { version = "1", features = ["full", "tracing"] }
reqwest = { version = "0.11", features = ["json", "deflate", "gzip", "trust-dns"] }
serde = { version = "1", features = ["derive"] }
# serde_yaml = "0.9"
serde_json = "1"
chrono = "0.4"
# redis = { version = "0.22", features = ["tokio-comp", "connection-manager"] }
tracing = "0.1"
# systemd = "0.10"
cave = { path = "../cave" }
rand = "0.8"
texting_robots = "0.2"
metrics = "0.20"
metrics-util = "0.14"
metrics-exporter-prometheus = "0.11"

2
butcher/config.yaml Normal file
View File

@ -0,0 +1,2 @@
#redis: redis://10.233.12.2:6379/
redis: redis://127.0.0.1:6378/

4
butcher/src/config.rs Normal file
View File

@ -0,0 +1,4 @@
#[derive(Debug, serde::Deserialize)]
pub struct Config {
pub redis: String,
}

56
butcher/src/main.rs Normal file
View File

@ -0,0 +1,56 @@
use futures::StreamExt;
use cave::{
config::LoadConfig,
firehose::FirehoseFactory,
};
use trend_setter::UpdateSet;
mod config;
mod trend_setter;
mod tag_trimmer;
#[tokio::main]
async fn main() {
cave::systemd::extend_timeout(100_000);
cave::init::exit_on_panic();
cave::init::init_logger(5555);
let config = config::Config::load();
let store = cave::store::Store::new(16, config.redis.clone()).await;
cave::systemd::status("Starting trend_setter");
let trend_setter_tx = trend_setter::start(store.clone());
cave::systemd::status("Starting tag_trimmer");
tag_trimmer::start(store.clone());
let firehose_factory = FirehoseFactory::new(config.redis);
let firehose = firehose_factory.produce()
.await
.expect("firehose")
.filter_map(|item| async move { item.ok() });
cave::systemd::ready();
firehose.for_each(move |data| {
let trend_setter_tx = trend_setter_tx.clone();
let mut store = store.clone();
async move {
let post = match serde_json::from_slice(&data) {
Ok(post) =>
post,
Err(e) => {
tracing::error!("Cannot parse JSON: {:?}", e);
return;
},
};
store.save_post_tags(&post).await;
let update_set = UpdateSet::from(&post);
if ! update_set.is_empty() {
trend_setter_tx.send(update_set).await.unwrap();
}
}
}).await;
}

View File

@ -0,0 +1,79 @@
use std::time::{Duration, Instant};
use futures::{
future::join,
StreamExt,
};
use tokio::time::sleep;
use cave::store::Store;
const TRIM_INTERVAL: Duration = Duration::from_secs(86400);
const TRIM_STEP_SLEEP: Duration = Duration::from_millis(30);
pub fn start(store: Store) {
tokio::spawn(async move {
loop {
let start = Instant::now();
run(&store).await;
let end = Instant::now();
tracing::info!("trimmed in {:.3?}", end - start);
sleep(TRIM_INTERVAL).await;
}
});
}
async fn run(store: &Store) {
let mut store1 = store.clone();
let mut store3 = store.clone();
join(async {
let store2 = store1.clone();
store1.get_tags_global()
.await
.unwrap()
.for_each(move |tag| {
let mut store2 = store2.clone();
async move {
trim_tag(None, tag, &mut store2).await;
sleep(TRIM_STEP_SLEEP).await;
}
})
.await
}, async {
let store4 = store3.clone();
store3.get_tags_by_language()
.await
.unwrap()
.for_each(move |(language, tag)| {
let mut store4 = store4.clone();
async move {
trim_tag(language, tag, &mut store4).await;
sleep(TRIM_STEP_SLEEP).await;
}})
.await
}).await;
}
async fn trim_tag(language: Option<String>, tag: String, store: &mut Store) {
let t1 = Instant::now();
let lang = if language.is_some() { "some" } else { "any" };
let trend_tags = store.get_trend_tags(&language, [tag].into_iter())
.await
.unwrap();
let trend_tag = &trend_tags[0];
if trend_tag.hour_users.iter().all(|(_, users)| *users == 0) {
// drop the whole wholly-outdated tag
tracing::debug!("Deleting whole tag {:?} {}", language, trend_tag.name);
store.delete_tag(&language, &trend_tag.name).await
.expect("delete_tag");
metrics::increment_counter!("hunter_trimmed_tags", "type" => "delete_tag", "lang" => lang);
} else {
store.clean_trend_tag(&language, trend_tag).await
.expect("clean_trend_tag");
metrics::increment_counter!("hunter_trimmed_tags", "type" => "clean_trend_tag", "lang" => lang);
}
let t2 = Instant::now();
metrics::histogram!("hunter_trim_tag_seconds", t2 - t1, "lang" => lang);
}

188
butcher/src/trend_setter.rs Normal file
View File

@ -0,0 +1,188 @@
use std::{
borrow::Borrow,
cmp::Ordering,
collections::{HashSet, HashMap, BTreeMap},
time::{Duration, Instant},
};
use tokio::{
sync::mpsc::{channel, Sender},
time::timeout,
};
use cave::{
current_hour,
feed::Post,
store::{Store, TREND_POOL_SIZE}, trend_tag::TrendTag,
};
#[cfg(not(debug))]
const MIN_INTERVAL: Duration = Duration::from_secs(60);
#[cfg(debug)]
const MIN_INTERVAL: Duration = Duration::from_secs(5);
#[derive(Debug)]
pub struct UpdateSet {
language: Option<String>,
tags: HashSet<String>,
}
impl UpdateSet {
pub fn is_empty(&self) -> bool {
self.tags.is_empty()
}
}
impl From<&Post> for UpdateSet {
fn from(post: &Post) -> Self {
UpdateSet {
language: post.lang(),
tags: post.tags_set().keys().cloned().collect(),
}
}
}
pub type Tx = Sender<UpdateSet>;
pub fn start(mut store: Store) -> Tx {
let (tx, mut rx) = channel::<UpdateSet>(1024);
tokio::spawn(async move {
let mut queue = BTreeMap::<Instant, Option<String>>::new();
// by language
let mut buffer = HashMap::<Option<String>, HashSet<String>>::new();
fn enqueue(
language: Option<String>,
queue: &mut BTreeMap::<Instant, Option<String>>,
buffer: &mut HashMap::<Option<String>, HashSet<String>>,
tags: HashSet<String>,
) {
match buffer.entry(language.clone()) {
std::collections::hash_map::Entry::Vacant(entry) => {
entry.insert(tags);
queue.insert(Instant::now() + MIN_INTERVAL, language);
}
std::collections::hash_map::Entry::Occupied(mut entry) => {
// merge into buffered
for tag in tags.into_iter() {
entry.get_mut().insert(tag);
}
}
}
}
loop {
let mut next_run = queue.keys().cloned().next();
// tracing::trace!("next_run in {:?}", next_run - Instant::now());
if let Some(next_run_) = next_run {
let now = Instant::now();
if next_run_ <= now {
let language = queue.remove(&next_run_).unwrap();
let buffered = buffer.remove(&language).unwrap();
run(&language, buffered.clone(), &mut store).await.unwrap();
// update with next in queue
next_run = queue.keys().cloned().next();
}
}
let now = Instant::now();
let interval = next_run.map_or(
Duration::from_secs(3600),
|next_run| next_run - now
);
let _ = timeout(interval, async {
if let Some(update_set) = rx.recv().await {
enqueue(None, &mut queue, &mut buffer, update_set.tags.clone());
if let Some(language) = update_set.language {
enqueue(Some(language), &mut queue, &mut buffer, update_set.tags);
}
}
}).await;
}
});
tx
}
async fn run(
language: &Option<String>,
new: HashSet<String>,
store: &mut Store,
) -> Result<(), cave::store::Error> {
let t1 = Instant::now();
tracing::debug!("run {:?}: updated {:?}", language, new);
let old = store.get_trend_pools(language, cave::PERIODS).await?;
let all: HashSet<String> = old.iter()
.flat_map(|(_period, tags)| tags.clone())
.chain(new.into_iter())
.collect();
let mut period_scores: Vec<(u64, Vec<(f64, &TrendTag)>)> = cave::PERIODS.iter()
.map(|period| (*period, Vec::with_capacity(all.len())))
.collect();
// calculate scores for tags currently in the pool
let until = current_hour();
let trend_tags = store.get_trend_tags(language, all.into_iter()).await?;
for trend_tag in &trend_tags {
for (period, scores) in &mut period_scores {
let score = trend_tag.score(*period, until);
scores.push((score, trend_tag));
}
}
// order
for (_period, scores) in &mut period_scores {
scores.sort_by(|(score1, _), (score2, _)| {
if score1 > score2 {
Ordering::Less
} else if score1 < score2 {
Ordering::Greater
} else {
Ordering::Equal
}
});
}
// separate new, kept, old tags
let updates = old.into_iter().zip(&period_scores)
.map(|((period1, old_tags), (period2, scores))| {
assert_eq!(period1, *period2);
let old_tags = old_tags.into_iter().collect::<HashSet<String>>();
let mut keep = TREND_POOL_SIZE.min(scores.len());
// shrink sorted set of tags as long as score is 0
while keep > 0 && scores[keep - 1].0 <= 0. {
keep -= 1;
}
let remove = scores[keep..].iter()
.map(|(_score, trend_tag)| trend_tag.name.borrow())
.filter(|tag| old_tags.contains(*tag))
.collect::<Vec<&str>>();
let add = scores[..keep].iter()
.map(|(_score, trend_tag)| trend_tag.name.borrow())
.filter(|tag| ! old_tags.contains(*tag))
.collect::<Vec<&str>>();
(period1, keep, scores.len(), remove, add)
});
// print
for (period, keep, total, remove, add) in updates.clone() {
if add.len() > 0 || remove.len() > 0 {
tracing::info!("Trending in {:?} for {} of {}/{}: +{:?} -{:?}", language, period, keep, total, add, remove);
}
}
// write to redis
store.update_trend_pools(
language,
updates.clone().map(|(period, _, _, remove, _add)| (period, remove)),
updates.clone().map(|(period, _, _, _remove, add)| (period, add)),
updates.map(|(period, keep, _, _, _)| (period, keep)),
).await?;
let t2 = Instant::now();
metrics::histogram!("hunter_trend_setter_seconds", t2 - t1, "lang" => if language.is_some() { "some" } else { "any" });
Ok(())
}

View File

@ -230,13 +230,12 @@ impl Store {
Err(e) =>
tracing::error!("cannot encode post: {:?}", e),
}
self.save_post_tags(&post).await;
// post was new
Ok(true)
}
async fn save_post_tags(&mut self, post: &Post) {
pub async fn save_post_tags(&mut self, post: &Post) {
if post.account.bot || post.tags.is_empty() {
// irrelevant
return;

View File

@ -9,8 +9,6 @@ mod config;
mod scheduler;
mod worker;
mod posts_cache;
mod trend_setter;
mod tag_trimmer;
use worker::Message;
@ -37,12 +35,6 @@ async fn main() {
let mut store = cave::store::Store::new(16, config.redis).await;
let posts_cache = posts_cache::PostsCache::new(65536);
cave::systemd::status("Starting trend_setter");
let trend_setter_tx = trend_setter::start(store.clone());
cave::systemd::status("Starting tag_trimmer");
tag_trimmer::start(store.clone());
cave::systemd::status("Starting scheduler");
let mut scheduler = scheduler::Scheduler::new();
cave::systemd::status("Loading known hosts from config");
@ -129,7 +121,6 @@ async fn main() {
.spawn(worker::run(
message_tx.clone(),
store.clone(),
trend_setter_tx.clone(),
posts_cache.clone(),
client.clone(),
host

View File

@ -1,6 +1,5 @@
use std::collections::HashSet;
use std::future::Future;
use std::ops::Deref;
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
use cave::{
@ -10,7 +9,6 @@ use cave::{
use futures::{StreamExt, future};
use crate::posts_cache::PostsCache;
use crate::scheduler::Host;
use crate::trend_setter::UpdateSet;
#[derive(Clone)]
pub struct RobotsTxt {
@ -70,7 +68,6 @@ pub enum Message {
pub async fn run(
message_tx: tokio::sync::mpsc::UnboundedSender<Message>,
store: Store,
trend_setter_tx: crate::trend_setter::Tx,
posts_cache: PostsCache,
client: reqwest::Client,
host: Host,
@ -82,12 +79,12 @@ pub async fn run(
let ((new_post_ratio, mut mean_interval), stream_result) = future::join(
fetch_timeline(
message_tx.clone(), store.clone(),
trend_setter_tx.clone(), &posts_cache,
&posts_cache,
&client, robots_txt.clone(), &host
),
open_stream(
message_tx.clone(), store.clone(),
trend_setter_tx.clone(), &posts_cache,
&posts_cache,
&client, robots_txt, host.clone()
),
).await;
@ -120,7 +117,6 @@ pub async fn run(
async fn fetch_timeline(
message_tx: tokio::sync::mpsc::UnboundedSender<Message>,
mut store: Store,
trend_setter_tx: crate::trend_setter::Tx,
posts_cache: &PostsCache,
client: &reqwest::Client,
robots_txt: RobotsTxt,
@ -145,7 +141,7 @@ async fn fetch_timeline(
Ok(feed) => {
let mean_interval = feed.mean_post_interval();
let (new_post_ratio, introduce_hosts) = process_posts(&mut store, &trend_setter_tx, posts_cache, &host, feed.posts.into_iter()).await;
let (new_post_ratio, introduce_hosts) = process_posts(&mut store, posts_cache, &host, feed.posts.into_iter()).await;
let _ = message_tx.send(Message::IntroduceHosts {
hosts: introduce_hosts.into_iter().collect(),
});
@ -164,7 +160,6 @@ async fn fetch_timeline(
async fn process_posts(
store: &mut Store,
trend_setter_tx: &crate::trend_setter::Tx,
posts_cache: &PostsCache,
host: &Host,
posts: impl Iterator<Item = EncodablePost>,
@ -189,14 +184,9 @@ async fn process_posts(
// check if it's an actual post, not a repost
if let Some(author_host) = post.account.host() {
let update_set = UpdateSet::from(post.deref());
// send away to redis
if store.save_post(post).await == Ok(true) {
new_posts += 1;
if ! update_set.is_empty() {
trend_setter_tx.send(update_set).await.unwrap();
}
}
// introduce instances from authors
@ -230,7 +220,6 @@ async fn process_posts(
async fn open_stream(
message_tx: tokio::sync::mpsc::UnboundedSender<Message>,
store: Store,
trend_setter_tx: crate::trend_setter::Tx,
posts_cache: &PostsCache,
client: &reqwest::Client,
robots_txt: RobotsTxt,
@ -255,12 +244,11 @@ async fn open_stream(
Ok(stream.for_each(move |post| {
let message_tx = message_tx.clone();
let mut store = store.clone();
let trend_setter_tx = trend_setter_tx.clone();
let posts_cache = posts_cache.clone();
let host = host.clone();
async move {
let (_, introduce_hosts) =
process_posts(&mut store, &trend_setter_tx, &posts_cache, &host, [post].into_iter()).await;
process_posts(&mut store, &posts_cache, &host, [post].into_iter()).await;
let _ = message_tx.send(Message::IntroduceHosts {
hosts: introduce_hosts.into_iter().collect(),
});

View File

@ -16,6 +16,16 @@ let
builtins.toJSON hunterSettings
);
butcherDefaultSettings = {
redis = "redis://127.0.0.1:${toString cfg.redis.port}/";
};
butcherSettings = lib.recursiveUpdate butcherDefaultSettings cfg.butcher.settings;
butcherConfigFile = builtins.toFile "butcher.yaml" (
builtins.toJSON butcherSettings
);
gathererDefaultSettings = {
redis = "redis://127.0.0.1:${toString cfg.redis.port}/";
listen_port = 8000;
@ -68,6 +78,18 @@ in
default = "DEBUG";
};
butcher.enable = mkEnableOption "caveman butcher";
butcher.settings = mkOption {
type = types.anything;
default = butcherDefaultSettings;
};
butcher.logLevel = mkOption {
type = types.enum [ "ERROR" "WARN" "INFO" "DEBUG" "TRACE" ];
default = "DEBUG";
};
gatherer.enable = mkEnableOption "caveman gatherer";
gatherer.settings = mkOption {
@ -139,6 +161,34 @@ in
};
};
systemd.services.caveman-butcher = lib.mkIf cfg.butcher.enable {
wantedBy = [ "multi-user.target" ];
requires = [ "redis-caveman.service" ];
after = [ "redis-caveman.service" "network-online.target" ];
environment.RUST_LOG = "caveman=${cfg.butcher.logLevel}";
serviceConfig = {
ExecStart = "${pkgs.caveman-butcher}/bin/caveman-butcher ${butcherConfigFile}";
Type = "notify";
WatchdogSec = 600;
Restart = "always";
RestartSec = 30;
DynamicUser = true;
User = "caveman-butcher";
ProtectSystem = "strict";
ProtectHome = true;
ProtectHostname = true;
ProtectKernelLogs = true;
ProtectKernelModules = true;
ProtectKernelTunables = true;
RestrictNamespaces = true;
RestrictRealtime = true;
LockPersonality = true;
MemoryDenyWriteExecute = true;
LimitNOFile = limitNOFILE;
LimitRSS = "4G:6G";
};
};
systemd.services.caveman-gatherer = lib.mkIf cfg.gatherer.enable {
wantedBy = [ "multi-user.target" ];
requires = [ "redis-caveman.service" ];