hunter: add posts_cache

This commit is contained in:
Astro 2022-11-23 23:59:35 +01:00
parent b4ee13f46d
commit 00ded7dc8e
5 changed files with 181 additions and 12 deletions

106
Cargo.lock generated
View File

@ -789,6 +789,19 @@ dependencies = [
"want",
]
[[package]]
name = "hyper-rustls"
version = "0.23.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59df7c4e19c950e6e0e868dcc0a300b09a9b88e9ec55bd879ca819087a77355d"
dependencies = [
"http",
"hyper",
"rustls",
"tokio",
"tokio-rustls",
]
[[package]]
name = "hyper-tls"
version = "0.5.0"
@ -1290,6 +1303,7 @@ dependencies = [
"http",
"http-body",
"hyper",
"hyper-rustls",
"hyper-tls",
"ipnet",
"js-sys",
@ -1299,20 +1313,60 @@ dependencies = [
"once_cell",
"percent-encoding",
"pin-project-lite",
"rustls",
"rustls-pemfile",
"serde",
"serde_json",
"serde_urlencoded",
"tokio",
"tokio-native-tls",
"tokio-rustls",
"tokio-util",
"tower-service",
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
"webpki-roots",
"winreg",
]
[[package]]
name = "ring"
version = "0.16.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc"
dependencies = [
"cc",
"libc",
"once_cell",
"spin",
"untrusted",
"web-sys",
"winapi",
]
[[package]]
name = "rustls"
version = "0.20.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "539a2bfe908f471bfa933876bd1eb6a19cf2176d375f82ef7f99530a40e48c2c"
dependencies = [
"log",
"ring",
"sct",
"webpki",
]
[[package]]
name = "rustls-pemfile"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0864aeff53f8c05aa08d86e5ef839d3dfcf07aeba2db32f12db0ef716e87bd55"
dependencies = [
"base64",
]
[[package]]
name = "ryu"
version = "1.0.11"
@ -1341,6 +1395,16 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8132065adcfd6e02db789d9285a0deb2f3fcb04002865ab67d5fb103533898"
[[package]]
name = "sct"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4"
dependencies = [
"ring",
"untrusted",
]
[[package]]
name = "security-framework"
version = "2.7.0"
@ -1460,6 +1524,12 @@ dependencies = [
"winapi",
]
[[package]]
name = "spin"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "syn"
version = "1.0.103"
@ -1618,6 +1688,17 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-rustls"
version = "0.23.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59"
dependencies = [
"rustls",
"tokio",
"webpki",
]
[[package]]
name = "tokio-util"
version = "0.7.4"
@ -1763,6 +1844,12 @@ version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1e5fa573d8ac5f1a856f8d7be41d390ee973daf97c806b2c1a465e4e1406e68"
[[package]]
name = "untrusted"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
[[package]]
name = "url"
version = "2.3.1"
@ -1890,6 +1977,25 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "webpki"
version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd"
dependencies = [
"ring",
"untrusted",
]
[[package]]
name = "webpki-roots"
version = "0.22.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "368bfe657969fb01238bb756d351dcade285e0f6fcbd36dcb23359a5169975be"
dependencies = [
"webpki",
]
[[package]]
name = "winapi"
version = "0.3.9"

View File

@ -6,7 +6,7 @@ edition = "2021"
[dependencies]
futures = "0.3"
tokio = { version = "1", features = ["full"] }
reqwest = { version = "0.11", features = ["json", "deflate", "gzip"] }
reqwest = { version = "0.11", features = ["json", "deflate", "gzip", "rustls-tls"] }
serde = { version = "1", features = ["derive"] }
# serde_yaml = "0.9"
chrono = "0.4"

View File

@ -6,6 +6,7 @@ use cave::config::LoadConfig;
mod config;
mod scheduler;
mod worker;
mod posts_cache;
mod trend_setter;
mod tag_trimmer;
@ -21,6 +22,7 @@ async fn main() {
let config = config::Config::load();
let mut store = cave::store::Store::new(16, config.redis).await;
let posts_cache = posts_cache::PostsCache::new(65536);
cave::systemd::status("Starting trend_setter");
let trend_setter_tx = trend_setter::start(store.clone());
@ -98,6 +100,7 @@ async fn main() {
message_tx.clone(),
store.clone(),
trend_setter_tx.clone(),
posts_cache.clone(),
client.clone(),
host
));

50
hunter/src/posts_cache.rs Normal file
View File

@ -0,0 +1,50 @@
use std::{
collections::{BTreeMap, HashSet},
sync::{Arc, Mutex},
time::{Instant, Duration},
};
/// In-process cache avoids a round-trip to redis for each post
#[derive(Clone)]
pub struct PostsCache {
cache: Arc<Mutex<HashSet<Arc<String>>>>,
ages: Arc<Mutex<BTreeMap<Instant, Arc<String>>>>,
size: usize,
}
impl PostsCache {
pub fn new(size: usize) -> Self {
PostsCache {
cache: Arc::new(Mutex::new(HashSet::new())),
ages: Arc::new(Mutex::new(BTreeMap::new())),
size,
}
}
// returns true if already exists
pub fn insert(&self, k: String) -> bool {
let k = Arc::new(k);
let mut cache = self.cache.lock().expect("cache.lock");
if cache.contains(&k) {
return true;
}
let mut ages = self.ages.lock().expect("ages.lock");
let mut now = Instant::now();
while ages.get(&now).is_some() {
now += Duration::from_millis(1);
}
ages.insert(now, k.clone());
cache.insert(k);
while cache.len() > self.size {
let oldest = ages.keys().cloned().next().expect("ages first");
let oldest_k = ages.remove(&oldest).expect("remove oldest");
cache.remove(&oldest_k);
}
false
}
}

View File

@ -7,6 +7,7 @@ use cave::{
store::Store,
};
use futures::{StreamExt, future};
use crate::posts_cache::PostsCache;
use crate::trend_setter::UpdateSet;
#[derive(Clone)]
@ -66,6 +67,7 @@ pub async fn run(
message_tx: tokio::sync::mpsc::UnboundedSender<Message>,
store: Store,
trend_setter_tx: crate::trend_setter::Tx,
posts_cache: PostsCache,
client: reqwest::Client,
host: String,
) {
@ -76,12 +78,12 @@ pub async fn run(
let ((new_post_ratio, mut mean_interval), stream_result) = future::join(
fetch_timeline(
message_tx.clone(), store.clone(),
trend_setter_tx.clone(),
trend_setter_tx.clone(), &posts_cache,
&client, robots_txt.clone(), &host
),
open_stream(
message_tx.clone(), store.clone(),
trend_setter_tx.clone(),
trend_setter_tx.clone(), &posts_cache,
&client, robots_txt, host.clone()
),
).await;
@ -113,6 +115,7 @@ async fn fetch_timeline(
message_tx: tokio::sync::mpsc::UnboundedSender<Message>,
mut store: Store,
trend_setter_tx: crate::trend_setter::Tx,
posts_cache: &PostsCache,
client: &reqwest::Client,
robots_txt: RobotsTxt,
host: &str,
@ -129,7 +132,7 @@ async fn fetch_timeline(
Ok(feed) => {
let mean_interval = feed.mean_post_interval();
let (new_post_ratio, introduce_hosts) = process_posts(&mut store, &trend_setter_tx, &host, feed.posts.into_iter()).await;
let (new_post_ratio, introduce_hosts) = process_posts(&mut store, &trend_setter_tx, posts_cache, &host, feed.posts.into_iter()).await;
let _ = message_tx.send(Message::IntroduceHosts {
hosts: introduce_hosts.into_iter().collect(),
});
@ -149,6 +152,7 @@ async fn fetch_timeline(
async fn process_posts(
store: &mut Store,
trend_setter_tx: &crate::trend_setter::Tx,
posts_cache: &PostsCache,
host: &str,
posts: impl Iterator<Item = Post>,
) -> (Option<f64>, HashSet<String>) {
@ -168,13 +172,17 @@ async fn process_posts(
// check if it's an actual post, not a repost
if let Some(author_host) = post.account.host() {
// send away to redis
let update_set = UpdateSet::from(&post);
if store.save_post(post).await == Ok(true) {
new_posts += 1;
// potentially save a round-trip to redis with an in-process cache
if ! posts_cache.insert(post.uri.clone()) {
let update_set = UpdateSet::from(&post);
// send away to redis
if store.save_post(post).await == Ok(true) {
log::debug!("new from {} {} {}", host, if host == author_host { "==" } else { "!=" }, author_host);
new_posts += 1;
if ! update_set.is_empty() {
trend_setter_tx.send(update_set).await.unwrap();
if ! update_set.is_empty() {
trend_setter_tx.send(update_set).await.unwrap();
}
}
}
@ -202,6 +210,7 @@ async fn open_stream(
message_tx: tokio::sync::mpsc::UnboundedSender<Message>,
store: Store,
trend_setter_tx: crate::trend_setter::Tx,
posts_cache: &PostsCache,
client: &reqwest::Client,
robots_txt: RobotsTxt,
host: String,
@ -213,6 +222,7 @@ async fn open_stream(
}
// free as early as possible
drop(robots_txt);
let posts_cache = posts_cache.clone();
let stream = Feed::stream(client, &url).await
.map_err(|e| {
@ -223,11 +233,11 @@ async fn open_stream(
let message_tx = message_tx.clone();
let mut store = store.clone();
let trend_setter_tx = trend_setter_tx.clone();
let posts_cache = posts_cache.clone();
let host = host.clone();
async move {
log::info!("Stream {} new post: {}", host, &post.uri);
let (_, introduce_hosts) =
process_posts(&mut store, &trend_setter_tx, &host, [post].into_iter()).await;
process_posts(&mut store, &trend_setter_tx, &posts_cache, &host, [post].into_iter()).await;
let _ = message_tx.send(Message::IntroduceHosts {
hosts: introduce_hosts.into_iter().collect(),
});