trends: init

This commit is contained in:
Astro 2022-11-03 01:21:53 +01:00
parent ba4eb9cf37
commit edfc6fe7b4
7 changed files with 182 additions and 16 deletions

110
Cargo.lock generated
View File

@ -17,6 +17,12 @@ dependencies = [
"libc",
]
[[package]]
name = "arc-swap"
version = "1.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "983cd8b9d4b02a6dc6ffa557262eb5858a27a0038ffffe21a0f133eaa819a164"
[[package]]
name = "async-compression"
version = "0.3.15"
@ -30,6 +36,17 @@ dependencies = [
"tokio",
]
[[package]]
name = "async-trait"
version = "0.1.58"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e805d94e6b5001b651426cf4cd446b1ab5f319d27bab5c644f61de0a804360c"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "autocfg"
version = "1.1.0"
@ -65,6 +82,7 @@ name = "caveman"
version = "0.0.0"
dependencies = [
"chrono",
"redis",
"reqwest",
"serde",
"serde_yaml",
@ -108,6 +126,20 @@ dependencies = [
"unicode-width",
]
[[package]]
name = "combine"
version = "4.6.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "35ed6e9d84f0b51a7f52daf1c7d71dd136fd7a3f41a8462b8cdb8c78d920fad4"
dependencies = [
"bytes",
"futures-core",
"memchr",
"pin-project-lite",
"tokio",
"tokio-util",
]
[[package]]
name = "core-foundation"
version = "0.9.3"
@ -235,6 +267,21 @@ dependencies = [
"percent-encoding",
]
[[package]]
name = "futures"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38390104763dc37a5145a53c29c63c1290b5d316d6086ec32c293f6736051bb0"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.25"
@ -242,6 +289,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52ba265a92256105f45b719605a571ffe2d1f0fea3807304b522c1d778f79eed"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]]
@ -250,6 +298,34 @@ version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04909a7a7e4633ae6c4a9ab280aeb86da1236243a77b694a49eacd659a4bd3ac"
[[package]]
name = "futures-executor"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7acc85df6714c176ab5edf386123fafe217be88c0840ec11f199441134a074e2"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00f5fb52a06bdcadeb54e8d3671f8888a39697dcb0b81b23b55174030427f4eb"
[[package]]
name = "futures-macro"
version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bdfb8ce053d86b91919aad980c220b1fb8401a9394410e1c289ed7e66b61835d"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "futures-sink"
version = "0.3.25"
@ -268,10 +344,16 @@ version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "197676987abd2f9cadff84926f410af1c183608d36641465df73ae8211dc65d6"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-project-lite",
"pin-utils",
"slab",
]
[[package]]
@ -689,6 +771,28 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "redis"
version = "0.22.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "513b3649f1a111c17954296e4a3b9eecb108b766c803e2b99f179ebe27005985"
dependencies = [
"arc-swap",
"async-trait",
"bytes",
"combine",
"futures",
"futures-util",
"itoa",
"percent-encoding",
"pin-project-lite",
"ryu",
"sha1_smol",
"tokio",
"tokio-util",
"url",
]
[[package]]
name = "redox_syscall"
version = "0.2.16"
@ -853,6 +957,12 @@ dependencies = [
"unsafe-libyaml",
]
[[package]]
name = "sha1_smol"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012"
[[package]]
name = "signal-hook-registry"
version = "1.4.0"

View File

@ -9,3 +9,4 @@ reqwest = { version = "0.11", features = ["json", "deflate", "gzip"] }
serde = { version = "1", features = ["derive"] }
serde_yaml = "0.9"
chrono = "0.4"
redis = { version = "0.22", features = ["tokio-comp", "connection-manager"] }

View File

@ -1,4 +1,4 @@
indexer: http://10.233.12.2:7700/
redis: redis://10.233.12.2:6379/
hosts:
- chaos.social

View File

@ -1,6 +1,6 @@
#[derive(Debug, serde::Deserialize)]
pub struct Config {
pub indexer: String,
pub redis: String,
pub hosts: Vec<String>,
pub interval_after_error: u64,
pub max_workers: usize,

View File

@ -5,6 +5,7 @@ mod config;
mod world;
mod feed;
mod worker;
mod trends;
use worker::Message;
@ -33,8 +34,11 @@ async fn main() {
.build()
.expect("reqwest::Client");
let (posts_tx, posts_rx) = tokio::sync::mpsc::unbounded_channel();
trends::spawn(&config.redis, posts_rx).await;
let mut workers_active = 0usize;
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let (message_tx, mut message_rx) = tokio::sync::mpsc::unbounded_channel();
loop {
// println!("{} workers active, queued {} of {}", workers_active, world.queue_len(), world.size());
let next_task = if workers_active < config.max_workers {
@ -45,7 +49,7 @@ async fn main() {
match next_task {
Err(duration) => {
let _ = timeout(duration, async {
let message = rx.recv().await.unwrap();
let message = message_rx.recv().await.unwrap();
match message {
Message::Fetched { host, next_interval, latest_timestamp } => {
workers_active -= 1;
@ -60,14 +64,18 @@ async fn main() {
world.introduce(host);
}
}
Message::Posts { .. } => {}
}
}).await;
}
Ok(host) => {
println!("Fetch {}", host);
workers_active += 1;
worker::fetch_and_process(tx.clone(), client.clone(), host);
worker::fetch_and_process(
message_tx.clone(),
posts_tx.clone(),
client.clone(),
host
);
}
}
}

48
src/trends.rs Normal file
View File

@ -0,0 +1,48 @@
use crate::feed::Post;
pub async fn spawn(redis_url: &str, mut post_rx: tokio::sync::mpsc::UnboundedReceiver<Post>) {
let client = redis::Client::open(redis_url)
.expect("redis::Client");
let mut man = redis::aio::ConnectionManager::new(client).await
.expect("redis::aio::ConnectionManager");
tokio::spawn(async move {
while let Some(post) = post_rx.recv().await {
if post.tags.is_empty() {
continue;
}
if let Some(timestamp) = post.timestamp() {
let post_key = format!("p:{}", post.url);
let cmd = redis::Cmd::getset(&post_key, "1");
if cmd.query_async::<_, Option<Vec<u8>>>(&mut man).await.unwrap().is_some() {
continue;
}
let hour = timestamp.naive_utc().timestamp() / 3600;
let mut cmd = redis::pipe();
for tag in post.tags {
let tag_key = format!("t:{}", tag.name.to_lowercase());
// by hour
cmd.hincr(
&tag_key,
format!("h:{}", hour),
1
).ignore();
// by spelling
cmd.hincr(
tag_key,
format!("s:{}", tag.name),
1
).ignore();
}
match cmd.query_async(&mut man).await {
Ok(()) => {}
Err(e) => {
eprintln!("redis error: {:?}", e);
}
}
}
}
});
}

View File

@ -1,7 +1,7 @@
use std::collections::HashSet;
use std::time::Duration;
use chrono::{DateTime, FixedOffset};
use crate::feed;
use crate::feed::{Feed, Post};
const DEFAULT_INTERVAL: Duration = Duration::from_secs(3600);
const MIN_INTERVAL: Duration = Duration::from_secs(10);
@ -15,17 +15,17 @@ pub enum Message {
},
Error { host: String },
IntroduceHosts { hosts: Vec<String> },
Posts { posts: Vec<feed::Post> },
}
pub fn fetch_and_process(
tx: tokio::sync::mpsc::UnboundedSender<Message>,
message_tx: tokio::sync::mpsc::UnboundedSender<Message>,
posts_tx: tokio::sync::mpsc::UnboundedSender<Post>,
client: reqwest::Client,
host: String,
) {
let url = format!("https://{}/api/v1/timelines/public", host);
tokio::spawn(async move {
match feed::Feed::fetch(&client, &url).await {
match Feed::fetch(&client, &url).await {
Ok(feed) => {
// Analyze time intervals between posts to estimate when to fetch next
let mut timestamps = feed.posts.iter()
@ -43,21 +43,20 @@ pub fn fetch_and_process(
let latest_timestamp = timestamps.iter().last().cloned();
// introduce new hosts, validate posts
let mut posts = Vec::with_capacity(feed.posts.len());
let mut hosts = HashSet::new();
for post in feed.posts.into_iter() {
if let Some(author_host) = post.account.host() {
if author_host == host && post.url_host().as_ref() == Some(&host) {
posts.push(post);
// send away to redis
let _ = posts_tx.send(post);
}
hosts.insert(author_host);
}
}
let hosts = hosts.into_iter().collect();
let _ = tx.send(Message::IntroduceHosts { hosts });
let _ = tx.send(Message::Posts { posts });
let _ = message_tx.send(Message::IntroduceHosts { hosts });
tx.send(Message::Fetched {
message_tx.send(Message::Fetched {
host: host.clone(),
next_interval,
latest_timestamp,
@ -65,7 +64,7 @@ pub fn fetch_and_process(
}
Err(e) => {
println!("Failed fetching {}: {}", host, e);
tx.send(Message::Error { host }).unwrap();
message_tx.send(Message::Error { host }).unwrap();
}
}
});