caveman/src/trends.rs

60 lines
2.0 KiB
Rust

use crate::feed::Post;
pub async fn spawn(redis_url: &str, mut post_rx: tokio::sync::mpsc::UnboundedReceiver<Post>) {
let client = redis::Client::open(redis_url)
.expect("redis::Client");
let mut man = redis::aio::ConnectionManager::new(client).await
.expect("redis::aio::ConnectionManager");
tokio::spawn(async move {
while let Some(post) = post_rx.recv().await {
if post.account.bot || post.tags.is_empty() {
continue;
}
if let Some(timestamp) = post.timestamp() {
if redis::Cmd::getset(&format!("p:{}", post.url), "1")
.query_async::<_, Option<Vec<u8>>>(&mut man)
.await.unwrap()
.is_some() {
continue;
}
let host = match post.url_host() {
Some(host) => host,
None => continue,
};
let hour = timestamp.naive_utc().timestamp() / 3600;
let mut cmd = redis::pipe();
for tag in post.tags {
let tag_key = format!("t:{}", tag.name.to_lowercase());
// by hour
cmd.hincr(
&tag_key,
format!("t:{}", hour),
1
).ignore();
// by spelling
cmd.hincr(
&tag_key,
format!("s:{}", tag.name),
1
).ignore();
// by instance
cmd.hincr(
tag_key,
format!("h:{}", host),
1
).ignore();
}
match cmd.query_async(&mut man).await {
Ok(()) => {}
Err(e) => {
eprintln!("redis error: {:?}", e);
}
}
}
}
});
}