Compare commits

...

3 Commits

8 changed files with 70 additions and 22 deletions

View File

@ -51,13 +51,14 @@ impl Leaf {
} }
} }
#[derive(Clone)]
pub struct BlockList { pub struct BlockList {
tree: Arc<RwLock<Leaf>>, tree: Arc<RwLock<Leaf>>,
} }
impl BlockList { impl BlockList {
pub async fn new(path: &str) -> BlockList { pub async fn new(path: &str) -> BlockList {
let root = cave::live_file::load(path, |file| async move { let root = crate::live_file::load(path, |file| async move {
let mut root = Leaf::Tree(HashMap::new()); let mut root = Leaf::Tree(HashMap::new());
let mut file = BufReader::new(file); let mut file = BufReader::new(file);
let mut line = String::new(); let mut line = String::new();

View File

@ -10,6 +10,8 @@ pub mod word_list;
pub mod db; pub mod db;
pub mod posts_cache; pub mod posts_cache;
pub mod activitypub; pub mod activitypub;
pub mod block_list;
pub const PERIODS: &[u64] = &[4, 24, 7 * 24]; pub const PERIODS: &[u64] = &[4, 24, 7 * 24];

View File

@ -3,10 +3,12 @@ use futures::{StreamExt, pin_mut};
use metrics_util::MetricKindMask; use metrics_util::MetricKindMask;
use metrics_exporter_prometheus::PrometheusBuilder; use metrics_exporter_prometheus::PrometheusBuilder;
use tokio::time::timeout; use tokio::time::timeout;
use cave::config::LoadConfig; use cave::{
block_list::BlockList,
config::LoadConfig,
};
mod config; mod config;
mod block_list;
mod scheduler; mod scheduler;
mod worker; mod worker;
mod webfinger; mod webfinger;
@ -43,10 +45,10 @@ async fn run() {
let mut store = cave::store::Store::new(16, config.redis, config.redis_password_file).await; let mut store = cave::store::Store::new(16, config.redis, config.redis_password_file).await;
let posts_cache = cave::posts_cache::PostsCache::new(65536); let posts_cache = cave::posts_cache::PostsCache::new(65536);
let block_list = block_list::BlockList::new(&config.blocklist).await; let block_list = BlockList::new(&config.blocklist).await;
cave::systemd::status("Starting scheduler"); cave::systemd::status("Starting scheduler");
let mut scheduler = scheduler::Scheduler::new(block_list); let mut scheduler = scheduler::Scheduler::new(block_list.clone());
cave::systemd::status("Loading known hosts from config"); cave::systemd::status("Loading known hosts from config");
for host in config.hosts.into_iter() { for host in config.hosts.into_iter() {
scheduler.introduce(InstanceHost::just_host(host)).await; scheduler.introduce(InstanceHost::just_host(host)).await;
@ -129,6 +131,7 @@ async fn run() {
store.clone(), store.clone(),
db.clone(), db.clone(),
posts_cache.clone(), posts_cache.clone(),
block_list.clone(),
client.clone(), client.clone(),
host host
)).unwrap(); )).unwrap();

View File

@ -5,7 +5,7 @@ use cave::feed::{Mention, Account};
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
use tokio::time::Instant; use tokio::time::Instant;
use crate::block_list::BlockList; use cave::block_list::BlockList;
const MIN_INTERVAL: Duration = Duration::from_secs(30); const MIN_INTERVAL: Duration = Duration::from_secs(30);
const MAX_INTERVAL: Duration = Duration::from_secs(6 * 3600); const MAX_INTERVAL: Duration = Duration::from_secs(6 * 3600);

View File

@ -3,12 +3,13 @@ use std::future::Future;
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use cave::{ use cave::{
block_list::BlockList,
db::Database, db::Database,
feed::{Feed, EncodablePost, Post, StreamError}, feed::{Feed, EncodablePost, Post, StreamError},
posts_cache::PostsCache, posts_cache::PostsCache,
store::Store, store::Store,
}; };
use futures::{StreamExt, future}; use futures::{StreamExt, future, stream::iter};
use reqwest::StatusCode; use reqwest::StatusCode;
use crate::scheduler::{Host, InstanceHost}; use crate::scheduler::{Host, InstanceHost};
use crate::webfinger; use crate::webfinger;
@ -75,6 +76,7 @@ pub async fn run(
store: Store, store: Store,
db: Database, db: Database,
posts_cache: PostsCache, posts_cache: PostsCache,
block_list: BlockList,
client: reqwest::Client, client: reqwest::Client,
host: InstanceHost, host: InstanceHost,
) { ) {
@ -85,12 +87,12 @@ pub async fn run(
let (timeline_result, stream_result) = future::join( let (timeline_result, stream_result) = future::join(
fetch_timeline( fetch_timeline(
message_tx.clone(), store.clone(), message_tx.clone(), store.clone(),
&posts_cache, &posts_cache, block_list.clone(),
&client, robots_txt.clone(), &host.host &client, robots_txt.clone(), &host.host
), ),
open_stream( open_stream(
message_tx.clone(), store.clone(), db.clone(), message_tx.clone(), store.clone(), db.clone(),
&posts_cache, &posts_cache, block_list.clone(),
&client, robots_txt, host.host.clone() &client, robots_txt, host.host.clone()
), ),
).await; ).await;
@ -165,6 +167,7 @@ async fn fetch_timeline(
message_tx: tokio::sync::mpsc::UnboundedSender<Message>, message_tx: tokio::sync::mpsc::UnboundedSender<Message>,
mut store: Store, mut store: Store,
posts_cache: &PostsCache, posts_cache: &PostsCache,
block_list: BlockList,
client: &reqwest::Client, client: &reqwest::Client,
robots_txt: RobotsTxt, robots_txt: RobotsTxt,
host: &Host, host: &Host,
@ -188,7 +191,7 @@ async fn fetch_timeline(
let mean_interval = feed.mean_post_interval(); let mean_interval = feed.mean_post_interval();
let (new_post_ratio, introduce_hosts) = process_posts(&mut store, posts_cache, &host, feed.posts.into_iter()).await; let (new_post_ratio, introduce_hosts) = process_posts(&mut store, posts_cache, block_list, &host, feed.posts.into_iter()).await;
for introduce_host in introduce_hosts.into_iter() { for introduce_host in introduce_hosts.into_iter() {
message_tx.send(Message::IntroduceHost(introduce_host)).unwrap(); message_tx.send(Message::IntroduceHost(introduce_host)).unwrap();
} }
@ -215,6 +218,7 @@ fn scan_for_hosts(introduce_hosts: &mut Vec<InstanceHost>, post: &Post) {
async fn process_posts( async fn process_posts(
store: &mut Store, store: &mut Store,
posts_cache: &PostsCache, posts_cache: &PostsCache,
block_list: BlockList,
host: &Host, host: &Host,
posts: impl Iterator<Item = EncodablePost>, posts: impl Iterator<Item = EncodablePost>,
) -> (Option<f64>, Vec<InstanceHost>) { ) -> (Option<f64>, Vec<InstanceHost>) {
@ -234,9 +238,11 @@ async fn process_posts(
scan_for_hosts(&mut introduce_hosts, &reblog); scan_for_hosts(&mut introduce_hosts, &reblog);
} }
if let Some(_account_host) = post.account.host() { if let Some(account_host) = post.account.host() {
if block_list.is_blocked(&account_host).await {
tracing::warn!("ignore post from blocked host {account_host}");
} else if store.save_post(post).await == Ok(true) {
// send away to redis // send away to redis
if store.save_post(post).await == Ok(true) {
new_posts += 1; new_posts += 1;
} }
} else { } else {
@ -260,7 +266,8 @@ async fn process_posts(
// dedup introduce_hosts // dedup introduce_hosts
let mut seen_hosts = HashSet::with_capacity(introduce_hosts.len()); let mut seen_hosts = HashSet::with_capacity(introduce_hosts.len());
let introduce_hosts = introduce_hosts.into_iter() let introduce_hosts = iter(
introduce_hosts.into_iter()
.filter_map(|introduce_host| { .filter_map(|introduce_host| {
if ! seen_hosts.contains(&introduce_host.host) { if ! seen_hosts.contains(&introduce_host.host) {
seen_hosts.insert(introduce_host.host.clone()); seen_hosts.insert(introduce_host.host.clone());
@ -268,7 +275,16 @@ async fn process_posts(
} else { } else {
None None
} }
}).collect(); })
)
.filter(|introduce_host| {
let block_list = block_list.clone();
let host = introduce_host.host.to_string();
async move {
! block_list.is_blocked(&host).await
}
})
.collect().await;
(new_post_ratio, introduce_hosts) (new_post_ratio, introduce_hosts)
} }
@ -278,6 +294,7 @@ async fn open_stream(
store: Store, store: Store,
db: Database, db: Database,
posts_cache: &PostsCache, posts_cache: &PostsCache,
block_list: BlockList,
client: &reqwest::Client, client: &reqwest::Client,
robots_txt: RobotsTxt, robots_txt: RobotsTxt,
host: Host, host: Host,
@ -336,10 +353,11 @@ async fn open_stream(
let message_tx = message_tx.clone(); let message_tx = message_tx.clone();
let mut store = store.clone(); let mut store = store.clone();
let posts_cache = posts_cache.clone(); let posts_cache = posts_cache.clone();
let block_list = block_list.clone();
let host = host.clone(); let host = host.clone();
async move { async move {
let (_, introduce_hosts) = let (_, introduce_hosts) =
process_posts(&mut store, &posts_cache, &host, [post].into_iter()).await; process_posts(&mut store, &posts_cache, block_list, &host, [post].into_iter()).await;
for introduce_host in introduce_hosts.into_iter() { for introduce_host in introduce_hosts.into_iter() {
message_tx.send(Message::IntroduceHost(introduce_host)).unwrap(); message_tx.send(Message::IntroduceHost(introduce_host)).unwrap();
} }

View File

@ -55,6 +55,7 @@ let
redis_password_file = cfg.redis.passwordFile; redis_password_file = cfg.redis.passwordFile;
in_topic = "relay-in"; in_topic = "relay-in";
prometheus_port = 9102; prometheus_port = 9102;
blocklist = blocklistPath;
}; };
sieveSettings = lib.recursiveUpdate sieveDefaultSettings cfg.sieve.settings; sieveSettings = lib.recursiveUpdate sieveDefaultSettings cfg.sieve.settings;

View File

@ -9,6 +9,7 @@ pub struct Config {
pub in_topic: String, pub in_topic: String,
priv_key_file: String, priv_key_file: String,
pub prometheus_port: u16, pub prometheus_port: u16,
pub blocklist: String,
} }
impl Config { impl Config {

View File

@ -10,6 +10,7 @@ use cave::{
self, self,
fetch::authorized_fetch, fetch::authorized_fetch,
}, },
block_list::BlockList,
config::LoadConfig, config::LoadConfig,
feed, feed,
}; };
@ -43,6 +44,7 @@ async fn main() {
cave::init::init_logger(5557); cave::init::init_logger(5557);
let config = config::Config::load(); let config = config::Config::load();
let block_list = BlockList::new(&config.blocklist).await;
PrometheusBuilder::new() PrometheusBuilder::new()
.with_http_listener(([0; 8], config.prometheus_port)) .with_http_listener(([0; 8], config.prometheus_port))
@ -86,6 +88,7 @@ async fn main() {
} }
let posts_cache = posts_cache.clone(); let posts_cache = posts_cache.clone();
let block_list = block_list.clone();
let client = client.clone(); let client = client.clone();
let mut store = store.clone(); let mut store = store.clone();
let priv_key = priv_key.clone(); let priv_key = priv_key.clone();
@ -137,6 +140,25 @@ async fn main() {
}; };
let author: activitypub::Actor = if let Some(author_url) = &post.attributed_to { let author: activitypub::Actor = if let Some(author_url) = &post.attributed_to {
if let Ok(url) = Url::parse(author_url) {
let host = if let Some(host) = url.host_str() {
host
} else {
tracing::error!("No host in author {author_url}");
return;
};
if block_list.is_blocked(host).await {
tracing::warn!("Ignore blocked author {author_url}");
metrics::counter!("sieve_activity", 1, "type" => "blocked_author");
return;
}
} else {
tracing::error!("Invalid author: {author_url}");
metrics::counter!("sieve_activity", 1, "type" => "invalid_author");
return;
}
match authorized_fetch(&client, author_url, KEY_ID, &priv_key).await { match authorized_fetch(&client, author_url, KEY_ID, &priv_key).await {
Ok(author) => { Ok(author) => {
metrics::counter!("sieve_activity", 1, "type" => "fetch_author_ok"); metrics::counter!("sieve_activity", 1, "type" => "fetch_author_ok");