From 321217563aadcaafefdbece644b71106e7896605 Mon Sep 17 00:00:00 2001 From: Astro Date: Wed, 18 Oct 2023 16:37:41 +0200 Subject: [PATCH] hunter/worker: filter authors by block_list --- cave/src/block_list.rs | 1 + hunter/src/main.rs | 3 ++- hunter/src/worker.rs | 50 ++++++++++++++++++++++++++++-------------- 3 files changed, 37 insertions(+), 17 deletions(-) diff --git a/cave/src/block_list.rs b/cave/src/block_list.rs index ed4b2e5..dcdc5c3 100644 --- a/cave/src/block_list.rs +++ b/cave/src/block_list.rs @@ -51,6 +51,7 @@ impl Leaf { } } +#[derive(Clone)] pub struct BlockList { tree: Arc>, } diff --git a/hunter/src/main.rs b/hunter/src/main.rs index 93d1afa..aec6135 100644 --- a/hunter/src/main.rs +++ b/hunter/src/main.rs @@ -48,7 +48,7 @@ async fn run() { let block_list = BlockList::new(&config.blocklist).await; cave::systemd::status("Starting scheduler"); - let mut scheduler = scheduler::Scheduler::new(block_list); + let mut scheduler = scheduler::Scheduler::new(block_list.clone()); cave::systemd::status("Loading known hosts from config"); for host in config.hosts.into_iter() { scheduler.introduce(InstanceHost::just_host(host)).await; @@ -131,6 +131,7 @@ async fn run() { store.clone(), db.clone(), posts_cache.clone(), + block_list.clone(), client.clone(), host )).unwrap(); diff --git a/hunter/src/worker.rs b/hunter/src/worker.rs index a35c374..ac90392 100644 --- a/hunter/src/worker.rs +++ b/hunter/src/worker.rs @@ -3,12 +3,13 @@ use std::future::Future; use std::sync::Arc; use std::time::{Duration, Instant}; use cave::{ + block_list::BlockList, db::Database, feed::{Feed, EncodablePost, Post, StreamError}, posts_cache::PostsCache, store::Store, }; -use futures::{StreamExt, future}; +use futures::{StreamExt, future, stream::iter}; use reqwest::StatusCode; use crate::scheduler::{Host, InstanceHost}; use crate::webfinger; @@ -75,6 +76,7 @@ pub async fn run( store: Store, db: Database, posts_cache: PostsCache, + block_list: BlockList, client: reqwest::Client, host: InstanceHost, ) { @@ -85,12 +87,12 @@ pub async fn run( let (timeline_result, stream_result) = future::join( fetch_timeline( message_tx.clone(), store.clone(), - &posts_cache, + &posts_cache, block_list.clone(), &client, robots_txt.clone(), &host.host ), open_stream( message_tx.clone(), store.clone(), db.clone(), - &posts_cache, + &posts_cache, block_list.clone(), &client, robots_txt, host.host.clone() ), ).await; @@ -165,6 +167,7 @@ async fn fetch_timeline( message_tx: tokio::sync::mpsc::UnboundedSender, mut store: Store, posts_cache: &PostsCache, + block_list: BlockList, client: &reqwest::Client, robots_txt: RobotsTxt, host: &Host, @@ -188,7 +191,7 @@ async fn fetch_timeline( let mean_interval = feed.mean_post_interval(); - let (new_post_ratio, introduce_hosts) = process_posts(&mut store, posts_cache, &host, feed.posts.into_iter()).await; + let (new_post_ratio, introduce_hosts) = process_posts(&mut store, posts_cache, block_list, &host, feed.posts.into_iter()).await; for introduce_host in introduce_hosts.into_iter() { message_tx.send(Message::IntroduceHost(introduce_host)).unwrap(); } @@ -215,6 +218,7 @@ fn scan_for_hosts(introduce_hosts: &mut Vec, post: &Post) { async fn process_posts( store: &mut Store, posts_cache: &PostsCache, + block_list: BlockList, host: &Host, posts: impl Iterator, ) -> (Option, Vec) { @@ -234,9 +238,11 @@ async fn process_posts( scan_for_hosts(&mut introduce_hosts, &reblog); } - if let Some(_account_host) = post.account.host() { - // send away to redis - if store.save_post(post).await == Ok(true) { + if let Some(account_host) = post.account.host() { + if block_list.is_blocked(&account_host).await { + tracing::warn!("ignore post from blocked host {account_host}"); + } else if store.save_post(post).await == Ok(true) { + // send away to redis new_posts += 1; } } else { @@ -260,15 +266,25 @@ async fn process_posts( // dedup introduce_hosts let mut seen_hosts = HashSet::with_capacity(introduce_hosts.len()); - let introduce_hosts = introduce_hosts.into_iter() - .filter_map(|introduce_host| { - if ! seen_hosts.contains(&introduce_host.host) { - seen_hosts.insert(introduce_host.host.clone()); - Some(introduce_host) - } else { - None + let introduce_hosts = iter( + introduce_hosts.into_iter() + .filter_map(|introduce_host| { + if ! seen_hosts.contains(&introduce_host.host) { + seen_hosts.insert(introduce_host.host.clone()); + Some(introduce_host) + } else { + None + } + }) + ) + .filter(|introduce_host| { + let block_list = block_list.clone(); + let host = introduce_host.host.to_string(); + async move { + ! block_list.is_blocked(&host).await } - }).collect(); + }) + .collect().await; (new_post_ratio, introduce_hosts) } @@ -278,6 +294,7 @@ async fn open_stream( store: Store, db: Database, posts_cache: &PostsCache, + block_list: BlockList, client: &reqwest::Client, robots_txt: RobotsTxt, host: Host, @@ -336,10 +353,11 @@ async fn open_stream( let message_tx = message_tx.clone(); let mut store = store.clone(); let posts_cache = posts_cache.clone(); + let block_list = block_list.clone(); let host = host.clone(); async move { let (_, introduce_hosts) = - process_posts(&mut store, &posts_cache, &host, [post].into_iter()).await; + process_posts(&mut store, &posts_cache, block_list, &host, [post].into_iter()).await; for introduce_host in introduce_hosts.into_iter() { message_tx.send(Message::IntroduceHost(introduce_host)).unwrap(); }