From 00ded7dc8e32741adafebc9d7b794f8792f3eedc Mon Sep 17 00:00:00 2001 From: Astro Date: Wed, 23 Nov 2022 23:59:35 +0100 Subject: [PATCH] hunter: add posts_cache --- Cargo.lock | 106 ++++++++++++++++++++++++++++++++++++++ hunter/Cargo.toml | 2 +- hunter/src/main.rs | 3 ++ hunter/src/posts_cache.rs | 50 ++++++++++++++++++ hunter/src/worker.rs | 32 ++++++++---- 5 files changed, 181 insertions(+), 12 deletions(-) create mode 100644 hunter/src/posts_cache.rs diff --git a/Cargo.lock b/Cargo.lock index 64d0b1f..b1738a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -789,6 +789,19 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-rustls" +version = "0.23.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59df7c4e19c950e6e0e868dcc0a300b09a9b88e9ec55bd879ca819087a77355d" +dependencies = [ + "http", + "hyper", + "rustls", + "tokio", + "tokio-rustls", +] + [[package]] name = "hyper-tls" version = "0.5.0" @@ -1290,6 +1303,7 @@ dependencies = [ "http", "http-body", "hyper", + "hyper-rustls", "hyper-tls", "ipnet", "js-sys", @@ -1299,20 +1313,60 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", + "rustls", + "rustls-pemfile", "serde", "serde_json", "serde_urlencoded", "tokio", "tokio-native-tls", + "tokio-rustls", "tokio-util", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", "web-sys", + "webpki-roots", "winreg", ] +[[package]] +name = "ring" +version = "0.16.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc" +dependencies = [ + "cc", + "libc", + "once_cell", + "spin", + "untrusted", + "web-sys", + "winapi", +] + +[[package]] +name = "rustls" +version = "0.20.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "539a2bfe908f471bfa933876bd1eb6a19cf2176d375f82ef7f99530a40e48c2c" +dependencies = [ + "log", + "ring", + "sct", + "webpki", +] + +[[package]] +name = "rustls-pemfile" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0864aeff53f8c05aa08d86e5ef839d3dfcf07aeba2db32f12db0ef716e87bd55" +dependencies = [ + "base64", +] + [[package]] name = "ryu" version = "1.0.11" @@ -1341,6 +1395,16 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8132065adcfd6e02db789d9285a0deb2f3fcb04002865ab67d5fb103533898" +[[package]] +name = "sct" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "security-framework" version = "2.7.0" @@ -1460,6 +1524,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "spin" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" + [[package]] name = "syn" version = "1.0.103" @@ -1618,6 +1688,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-rustls" +version = "0.23.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59" +dependencies = [ + "rustls", + "tokio", + "webpki", +] + [[package]] name = "tokio-util" version = "0.7.4" @@ -1763,6 +1844,12 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c1e5fa573d8ac5f1a856f8d7be41d390ee973daf97c806b2c1a465e4e1406e68" +[[package]] +name = "untrusted" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" + [[package]] name = "url" version = "2.3.1" @@ -1890,6 +1977,25 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd" +dependencies = [ + "ring", + "untrusted", +] + +[[package]] +name = "webpki-roots" +version = "0.22.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "368bfe657969fb01238bb756d351dcade285e0f6fcbd36dcb23359a5169975be" +dependencies = [ + "webpki", +] + [[package]] name = "winapi" version = "0.3.9" diff --git a/hunter/Cargo.toml b/hunter/Cargo.toml index e737d3e..7ed8b63 100644 --- a/hunter/Cargo.toml +++ b/hunter/Cargo.toml @@ -6,7 +6,7 @@ edition = "2021" [dependencies] futures = "0.3" tokio = { version = "1", features = ["full"] } -reqwest = { version = "0.11", features = ["json", "deflate", "gzip"] } +reqwest = { version = "0.11", features = ["json", "deflate", "gzip", "rustls-tls"] } serde = { version = "1", features = ["derive"] } # serde_yaml = "0.9" chrono = "0.4" diff --git a/hunter/src/main.rs b/hunter/src/main.rs index d4e26bc..0e53803 100644 --- a/hunter/src/main.rs +++ b/hunter/src/main.rs @@ -6,6 +6,7 @@ use cave::config::LoadConfig; mod config; mod scheduler; mod worker; +mod posts_cache; mod trend_setter; mod tag_trimmer; @@ -21,6 +22,7 @@ async fn main() { let config = config::Config::load(); let mut store = cave::store::Store::new(16, config.redis).await; + let posts_cache = posts_cache::PostsCache::new(65536); cave::systemd::status("Starting trend_setter"); let trend_setter_tx = trend_setter::start(store.clone()); @@ -98,6 +100,7 @@ async fn main() { message_tx.clone(), store.clone(), trend_setter_tx.clone(), + posts_cache.clone(), client.clone(), host )); diff --git a/hunter/src/posts_cache.rs b/hunter/src/posts_cache.rs new file mode 100644 index 0000000..273a19d --- /dev/null +++ b/hunter/src/posts_cache.rs @@ -0,0 +1,50 @@ +use std::{ + collections::{BTreeMap, HashSet}, + sync::{Arc, Mutex}, + time::{Instant, Duration}, +}; + +/// In-process cache avoids a round-trip to redis for each post +#[derive(Clone)] +pub struct PostsCache { + cache: Arc>>>, + ages: Arc>>>, + size: usize, +} + +impl PostsCache { + pub fn new(size: usize) -> Self { + PostsCache { + cache: Arc::new(Mutex::new(HashSet::new())), + ages: Arc::new(Mutex::new(BTreeMap::new())), + size, + } + } + + // returns true if already exists + pub fn insert(&self, k: String) -> bool { + let k = Arc::new(k); + + let mut cache = self.cache.lock().expect("cache.lock"); + if cache.contains(&k) { + return true; + } + + let mut ages = self.ages.lock().expect("ages.lock"); + + let mut now = Instant::now(); + while ages.get(&now).is_some() { + now += Duration::from_millis(1); + } + ages.insert(now, k.clone()); + cache.insert(k); + + while cache.len() > self.size { + let oldest = ages.keys().cloned().next().expect("ages first"); + let oldest_k = ages.remove(&oldest).expect("remove oldest"); + cache.remove(&oldest_k); + } + + false + } +} diff --git a/hunter/src/worker.rs b/hunter/src/worker.rs index 5c9928f..3a4803e 100644 --- a/hunter/src/worker.rs +++ b/hunter/src/worker.rs @@ -7,6 +7,7 @@ use cave::{ store::Store, }; use futures::{StreamExt, future}; +use crate::posts_cache::PostsCache; use crate::trend_setter::UpdateSet; #[derive(Clone)] @@ -66,6 +67,7 @@ pub async fn run( message_tx: tokio::sync::mpsc::UnboundedSender, store: Store, trend_setter_tx: crate::trend_setter::Tx, + posts_cache: PostsCache, client: reqwest::Client, host: String, ) { @@ -76,12 +78,12 @@ pub async fn run( let ((new_post_ratio, mut mean_interval), stream_result) = future::join( fetch_timeline( message_tx.clone(), store.clone(), - trend_setter_tx.clone(), + trend_setter_tx.clone(), &posts_cache, &client, robots_txt.clone(), &host ), open_stream( message_tx.clone(), store.clone(), - trend_setter_tx.clone(), + trend_setter_tx.clone(), &posts_cache, &client, robots_txt, host.clone() ), ).await; @@ -113,6 +115,7 @@ async fn fetch_timeline( message_tx: tokio::sync::mpsc::UnboundedSender, mut store: Store, trend_setter_tx: crate::trend_setter::Tx, + posts_cache: &PostsCache, client: &reqwest::Client, robots_txt: RobotsTxt, host: &str, @@ -129,7 +132,7 @@ async fn fetch_timeline( Ok(feed) => { let mean_interval = feed.mean_post_interval(); - let (new_post_ratio, introduce_hosts) = process_posts(&mut store, &trend_setter_tx, &host, feed.posts.into_iter()).await; + let (new_post_ratio, introduce_hosts) = process_posts(&mut store, &trend_setter_tx, posts_cache, &host, feed.posts.into_iter()).await; let _ = message_tx.send(Message::IntroduceHosts { hosts: introduce_hosts.into_iter().collect(), }); @@ -149,6 +152,7 @@ async fn fetch_timeline( async fn process_posts( store: &mut Store, trend_setter_tx: &crate::trend_setter::Tx, + posts_cache: &PostsCache, host: &str, posts: impl Iterator, ) -> (Option, HashSet) { @@ -168,13 +172,17 @@ async fn process_posts( // check if it's an actual post, not a repost if let Some(author_host) = post.account.host() { - // send away to redis - let update_set = UpdateSet::from(&post); - if store.save_post(post).await == Ok(true) { - new_posts += 1; + // potentially save a round-trip to redis with an in-process cache + if ! posts_cache.insert(post.uri.clone()) { + let update_set = UpdateSet::from(&post); + // send away to redis + if store.save_post(post).await == Ok(true) { + log::debug!("new from {} {} {}", host, if host == author_host { "==" } else { "!=" }, author_host); + new_posts += 1; - if ! update_set.is_empty() { - trend_setter_tx.send(update_set).await.unwrap(); + if ! update_set.is_empty() { + trend_setter_tx.send(update_set).await.unwrap(); + } } } @@ -202,6 +210,7 @@ async fn open_stream( message_tx: tokio::sync::mpsc::UnboundedSender, store: Store, trend_setter_tx: crate::trend_setter::Tx, + posts_cache: &PostsCache, client: &reqwest::Client, robots_txt: RobotsTxt, host: String, @@ -213,6 +222,7 @@ async fn open_stream( } // free as early as possible drop(robots_txt); + let posts_cache = posts_cache.clone(); let stream = Feed::stream(client, &url).await .map_err(|e| { @@ -223,11 +233,11 @@ async fn open_stream( let message_tx = message_tx.clone(); let mut store = store.clone(); let trend_setter_tx = trend_setter_tx.clone(); + let posts_cache = posts_cache.clone(); let host = host.clone(); async move { - log::info!("Stream {} new post: {}", host, &post.uri); let (_, introduce_hosts) = - process_posts(&mut store, &trend_setter_tx, &host, [post].into_iter()).await; + process_posts(&mut store, &trend_setter_tx, &posts_cache, &host, [post].into_iter()).await; let _ = message_tx.send(Message::IntroduceHosts { hosts: introduce_hosts.into_iter().collect(), });