hunter/worker: filter authors by block_list
This commit is contained in:
parent
f656daf07a
commit
321217563a
|
@ -51,6 +51,7 @@ impl Leaf {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct BlockList {
|
||||
tree: Arc<RwLock<Leaf>>,
|
||||
}
|
||||
|
|
|
@ -48,7 +48,7 @@ async fn run() {
|
|||
let block_list = BlockList::new(&config.blocklist).await;
|
||||
|
||||
cave::systemd::status("Starting scheduler");
|
||||
let mut scheduler = scheduler::Scheduler::new(block_list);
|
||||
let mut scheduler = scheduler::Scheduler::new(block_list.clone());
|
||||
cave::systemd::status("Loading known hosts from config");
|
||||
for host in config.hosts.into_iter() {
|
||||
scheduler.introduce(InstanceHost::just_host(host)).await;
|
||||
|
@ -131,6 +131,7 @@ async fn run() {
|
|||
store.clone(),
|
||||
db.clone(),
|
||||
posts_cache.clone(),
|
||||
block_list.clone(),
|
||||
client.clone(),
|
||||
host
|
||||
)).unwrap();
|
||||
|
|
|
@ -3,12 +3,13 @@ use std::future::Future;
|
|||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
use cave::{
|
||||
block_list::BlockList,
|
||||
db::Database,
|
||||
feed::{Feed, EncodablePost, Post, StreamError},
|
||||
posts_cache::PostsCache,
|
||||
store::Store,
|
||||
};
|
||||
use futures::{StreamExt, future};
|
||||
use futures::{StreamExt, future, stream::iter};
|
||||
use reqwest::StatusCode;
|
||||
use crate::scheduler::{Host, InstanceHost};
|
||||
use crate::webfinger;
|
||||
|
@ -75,6 +76,7 @@ pub async fn run(
|
|||
store: Store,
|
||||
db: Database,
|
||||
posts_cache: PostsCache,
|
||||
block_list: BlockList,
|
||||
client: reqwest::Client,
|
||||
host: InstanceHost,
|
||||
) {
|
||||
|
@ -85,12 +87,12 @@ pub async fn run(
|
|||
let (timeline_result, stream_result) = future::join(
|
||||
fetch_timeline(
|
||||
message_tx.clone(), store.clone(),
|
||||
&posts_cache,
|
||||
&posts_cache, block_list.clone(),
|
||||
&client, robots_txt.clone(), &host.host
|
||||
),
|
||||
open_stream(
|
||||
message_tx.clone(), store.clone(), db.clone(),
|
||||
&posts_cache,
|
||||
&posts_cache, block_list.clone(),
|
||||
&client, robots_txt, host.host.clone()
|
||||
),
|
||||
).await;
|
||||
|
@ -165,6 +167,7 @@ async fn fetch_timeline(
|
|||
message_tx: tokio::sync::mpsc::UnboundedSender<Message>,
|
||||
mut store: Store,
|
||||
posts_cache: &PostsCache,
|
||||
block_list: BlockList,
|
||||
client: &reqwest::Client,
|
||||
robots_txt: RobotsTxt,
|
||||
host: &Host,
|
||||
|
@ -188,7 +191,7 @@ async fn fetch_timeline(
|
|||
|
||||
let mean_interval = feed.mean_post_interval();
|
||||
|
||||
let (new_post_ratio, introduce_hosts) = process_posts(&mut store, posts_cache, &host, feed.posts.into_iter()).await;
|
||||
let (new_post_ratio, introduce_hosts) = process_posts(&mut store, posts_cache, block_list, &host, feed.posts.into_iter()).await;
|
||||
for introduce_host in introduce_hosts.into_iter() {
|
||||
message_tx.send(Message::IntroduceHost(introduce_host)).unwrap();
|
||||
}
|
||||
|
@ -215,6 +218,7 @@ fn scan_for_hosts(introduce_hosts: &mut Vec<InstanceHost>, post: &Post) {
|
|||
async fn process_posts(
|
||||
store: &mut Store,
|
||||
posts_cache: &PostsCache,
|
||||
block_list: BlockList,
|
||||
host: &Host,
|
||||
posts: impl Iterator<Item = EncodablePost>,
|
||||
) -> (Option<f64>, Vec<InstanceHost>) {
|
||||
|
@ -234,9 +238,11 @@ async fn process_posts(
|
|||
scan_for_hosts(&mut introduce_hosts, &reblog);
|
||||
}
|
||||
|
||||
if let Some(_account_host) = post.account.host() {
|
||||
// send away to redis
|
||||
if store.save_post(post).await == Ok(true) {
|
||||
if let Some(account_host) = post.account.host() {
|
||||
if block_list.is_blocked(&account_host).await {
|
||||
tracing::warn!("ignore post from blocked host {account_host}");
|
||||
} else if store.save_post(post).await == Ok(true) {
|
||||
// send away to redis
|
||||
new_posts += 1;
|
||||
}
|
||||
} else {
|
||||
|
@ -260,15 +266,25 @@ async fn process_posts(
|
|||
|
||||
// dedup introduce_hosts
|
||||
let mut seen_hosts = HashSet::with_capacity(introduce_hosts.len());
|
||||
let introduce_hosts = introduce_hosts.into_iter()
|
||||
.filter_map(|introduce_host| {
|
||||
if ! seen_hosts.contains(&introduce_host.host) {
|
||||
seen_hosts.insert(introduce_host.host.clone());
|
||||
Some(introduce_host)
|
||||
} else {
|
||||
None
|
||||
let introduce_hosts = iter(
|
||||
introduce_hosts.into_iter()
|
||||
.filter_map(|introduce_host| {
|
||||
if ! seen_hosts.contains(&introduce_host.host) {
|
||||
seen_hosts.insert(introduce_host.host.clone());
|
||||
Some(introduce_host)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
)
|
||||
.filter(|introduce_host| {
|
||||
let block_list = block_list.clone();
|
||||
let host = introduce_host.host.to_string();
|
||||
async move {
|
||||
! block_list.is_blocked(&host).await
|
||||
}
|
||||
}).collect();
|
||||
})
|
||||
.collect().await;
|
||||
|
||||
(new_post_ratio, introduce_hosts)
|
||||
}
|
||||
|
@ -278,6 +294,7 @@ async fn open_stream(
|
|||
store: Store,
|
||||
db: Database,
|
||||
posts_cache: &PostsCache,
|
||||
block_list: BlockList,
|
||||
client: &reqwest::Client,
|
||||
robots_txt: RobotsTxt,
|
||||
host: Host,
|
||||
|
@ -336,10 +353,11 @@ async fn open_stream(
|
|||
let message_tx = message_tx.clone();
|
||||
let mut store = store.clone();
|
||||
let posts_cache = posts_cache.clone();
|
||||
let block_list = block_list.clone();
|
||||
let host = host.clone();
|
||||
async move {
|
||||
let (_, introduce_hosts) =
|
||||
process_posts(&mut store, &posts_cache, &host, [post].into_iter()).await;
|
||||
process_posts(&mut store, &posts_cache, block_list, &host, [post].into_iter()).await;
|
||||
for introduce_host in introduce_hosts.into_iter() {
|
||||
message_tx.send(Message::IntroduceHost(introduce_host)).unwrap();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue