From edfc6fe7b45009e1db0c289735c86edabbd0001e Mon Sep 17 00:00:00 2001 From: Astro Date: Thu, 3 Nov 2022 01:21:53 +0100 Subject: [PATCH] trends: init --- Cargo.lock | 110 ++++++++++++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 1 + config.yaml | 2 +- src/config.rs | 2 +- src/main.rs | 16 ++++++-- src/trends.rs | 48 ++++++++++++++++++++++ src/worker.rs | 19 +++++---- 7 files changed, 182 insertions(+), 16 deletions(-) create mode 100644 src/trends.rs diff --git a/Cargo.lock b/Cargo.lock index 04263ab..a6b390c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,12 @@ dependencies = [ "libc", ] +[[package]] +name = "arc-swap" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "983cd8b9d4b02a6dc6ffa557262eb5858a27a0038ffffe21a0f133eaa819a164" + [[package]] name = "async-compression" version = "0.3.15" @@ -30,6 +36,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "async-trait" +version = "0.1.58" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e805d94e6b5001b651426cf4cd446b1ab5f319d27bab5c644f61de0a804360c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "autocfg" version = "1.1.0" @@ -65,6 +82,7 @@ name = "caveman" version = "0.0.0" dependencies = [ "chrono", + "redis", "reqwest", "serde", "serde_yaml", @@ -108,6 +126,20 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "combine" +version = "4.6.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35ed6e9d84f0b51a7f52daf1c7d71dd136fd7a3f41a8462b8cdb8c78d920fad4" +dependencies = [ + "bytes", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", + "tokio-util", +] + [[package]] name = "core-foundation" version = "0.9.3" @@ -235,6 +267,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38390104763dc37a5145a53c29c63c1290b5d316d6086ec32c293f6736051bb0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.25" @@ -242,6 +289,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "52ba265a92256105f45b719605a571ffe2d1f0fea3807304b522c1d778f79eed" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -250,6 +298,34 @@ version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04909a7a7e4633ae6c4a9ab280aeb86da1236243a77b694a49eacd659a4bd3ac" +[[package]] +name = "futures-executor" +version = "0.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7acc85df6714c176ab5edf386123fafe217be88c0840ec11f199441134a074e2" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00f5fb52a06bdcadeb54e8d3671f8888a39697dcb0b81b23b55174030427f4eb" + +[[package]] +name = "futures-macro" +version = "0.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bdfb8ce053d86b91919aad980c220b1fb8401a9394410e1c289ed7e66b61835d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.25" @@ -268,10 +344,16 @@ version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "197676987abd2f9cadff84926f410af1c183608d36641465df73ae8211dc65d6" dependencies = [ + "futures-channel", "futures-core", + "futures-io", + "futures-macro", + "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", + "slab", ] [[package]] @@ -689,6 +771,28 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "redis" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "513b3649f1a111c17954296e4a3b9eecb108b766c803e2b99f179ebe27005985" +dependencies = [ + "arc-swap", + "async-trait", + "bytes", + "combine", + "futures", + "futures-util", + "itoa", + "percent-encoding", + "pin-project-lite", + "ryu", + "sha1_smol", + "tokio", + "tokio-util", + "url", +] + [[package]] name = "redox_syscall" version = "0.2.16" @@ -853,6 +957,12 @@ dependencies = [ "unsafe-libyaml", ] +[[package]] +name = "sha1_smol" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012" + [[package]] name = "signal-hook-registry" version = "1.4.0" diff --git a/Cargo.toml b/Cargo.toml index 2bfadd5..c66a905 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,3 +9,4 @@ reqwest = { version = "0.11", features = ["json", "deflate", "gzip"] } serde = { version = "1", features = ["derive"] } serde_yaml = "0.9" chrono = "0.4" +redis = { version = "0.22", features = ["tokio-comp", "connection-manager"] } diff --git a/config.yaml b/config.yaml index d37d9d1..a2fc731 100644 --- a/config.yaml +++ b/config.yaml @@ -1,4 +1,4 @@ -indexer: http://10.233.12.2:7700/ +redis: redis://10.233.12.2:6379/ hosts: - chaos.social diff --git a/src/config.rs b/src/config.rs index 8469507..d70d617 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,6 +1,6 @@ #[derive(Debug, serde::Deserialize)] pub struct Config { - pub indexer: String, + pub redis: String, pub hosts: Vec, pub interval_after_error: u64, pub max_workers: usize, diff --git a/src/main.rs b/src/main.rs index 9eb844f..423b63b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,6 +5,7 @@ mod config; mod world; mod feed; mod worker; +mod trends; use worker::Message; @@ -33,8 +34,11 @@ async fn main() { .build() .expect("reqwest::Client"); + let (posts_tx, posts_rx) = tokio::sync::mpsc::unbounded_channel(); + trends::spawn(&config.redis, posts_rx).await; + let mut workers_active = 0usize; - let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let (message_tx, mut message_rx) = tokio::sync::mpsc::unbounded_channel(); loop { // println!("{} workers active, queued {} of {}", workers_active, world.queue_len(), world.size()); let next_task = if workers_active < config.max_workers { @@ -45,7 +49,7 @@ async fn main() { match next_task { Err(duration) => { let _ = timeout(duration, async { - let message = rx.recv().await.unwrap(); + let message = message_rx.recv().await.unwrap(); match message { Message::Fetched { host, next_interval, latest_timestamp } => { workers_active -= 1; @@ -60,14 +64,18 @@ async fn main() { world.introduce(host); } } - Message::Posts { .. } => {} } }).await; } Ok(host) => { println!("Fetch {}", host); workers_active += 1; - worker::fetch_and_process(tx.clone(), client.clone(), host); + worker::fetch_and_process( + message_tx.clone(), + posts_tx.clone(), + client.clone(), + host + ); } } } diff --git a/src/trends.rs b/src/trends.rs new file mode 100644 index 0000000..7a39503 --- /dev/null +++ b/src/trends.rs @@ -0,0 +1,48 @@ +use crate::feed::Post; + +pub async fn spawn(redis_url: &str, mut post_rx: tokio::sync::mpsc::UnboundedReceiver) { + let client = redis::Client::open(redis_url) + .expect("redis::Client"); + let mut man = redis::aio::ConnectionManager::new(client).await + .expect("redis::aio::ConnectionManager"); + + tokio::spawn(async move { + while let Some(post) = post_rx.recv().await { + if post.tags.is_empty() { + continue; + } + + if let Some(timestamp) = post.timestamp() { + let post_key = format!("p:{}", post.url); + let cmd = redis::Cmd::getset(&post_key, "1"); + if cmd.query_async::<_, Option>>(&mut man).await.unwrap().is_some() { + continue; + } + + let hour = timestamp.naive_utc().timestamp() / 3600; + let mut cmd = redis::pipe(); + for tag in post.tags { + let tag_key = format!("t:{}", tag.name.to_lowercase()); + // by hour + cmd.hincr( + &tag_key, + format!("h:{}", hour), + 1 + ).ignore(); + // by spelling + cmd.hincr( + tag_key, + format!("s:{}", tag.name), + 1 + ).ignore(); + } + match cmd.query_async(&mut man).await { + Ok(()) => {} + Err(e) => { + eprintln!("redis error: {:?}", e); + } + } + } + } + }); +} diff --git a/src/worker.rs b/src/worker.rs index 6358ec2..82f11d9 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -1,7 +1,7 @@ use std::collections::HashSet; use std::time::Duration; use chrono::{DateTime, FixedOffset}; -use crate::feed; +use crate::feed::{Feed, Post}; const DEFAULT_INTERVAL: Duration = Duration::from_secs(3600); const MIN_INTERVAL: Duration = Duration::from_secs(10); @@ -15,17 +15,17 @@ pub enum Message { }, Error { host: String }, IntroduceHosts { hosts: Vec }, - Posts { posts: Vec }, } pub fn fetch_and_process( - tx: tokio::sync::mpsc::UnboundedSender, + message_tx: tokio::sync::mpsc::UnboundedSender, + posts_tx: tokio::sync::mpsc::UnboundedSender, client: reqwest::Client, host: String, ) { let url = format!("https://{}/api/v1/timelines/public", host); tokio::spawn(async move { - match feed::Feed::fetch(&client, &url).await { + match Feed::fetch(&client, &url).await { Ok(feed) => { // Analyze time intervals between posts to estimate when to fetch next let mut timestamps = feed.posts.iter() @@ -43,21 +43,20 @@ pub fn fetch_and_process( let latest_timestamp = timestamps.iter().last().cloned(); // introduce new hosts, validate posts - let mut posts = Vec::with_capacity(feed.posts.len()); let mut hosts = HashSet::new(); for post in feed.posts.into_iter() { if let Some(author_host) = post.account.host() { if author_host == host && post.url_host().as_ref() == Some(&host) { - posts.push(post); + // send away to redis + let _ = posts_tx.send(post); } hosts.insert(author_host); } } let hosts = hosts.into_iter().collect(); - let _ = tx.send(Message::IntroduceHosts { hosts }); - let _ = tx.send(Message::Posts { posts }); + let _ = message_tx.send(Message::IntroduceHosts { hosts }); - tx.send(Message::Fetched { + message_tx.send(Message::Fetched { host: host.clone(), next_interval, latest_timestamp, @@ -65,7 +64,7 @@ pub fn fetch_and_process( } Err(e) => { println!("Failed fetching {}: {}", host, e); - tx.send(Message::Error { host }).unwrap(); + message_tx.send(Message::Error { host }).unwrap(); } } });