Compare commits
3 Commits
35782d3617
...
6a78081469
Author | SHA1 | Date | |
---|---|---|---|
6a78081469 | |||
321217563a | |||
f656daf07a |
|
@ -51,13 +51,14 @@ impl Leaf {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
pub struct BlockList {
|
pub struct BlockList {
|
||||||
tree: Arc<RwLock<Leaf>>,
|
tree: Arc<RwLock<Leaf>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BlockList {
|
impl BlockList {
|
||||||
pub async fn new(path: &str) -> BlockList {
|
pub async fn new(path: &str) -> BlockList {
|
||||||
let root = cave::live_file::load(path, |file| async move {
|
let root = crate::live_file::load(path, |file| async move {
|
||||||
let mut root = Leaf::Tree(HashMap::new());
|
let mut root = Leaf::Tree(HashMap::new());
|
||||||
let mut file = BufReader::new(file);
|
let mut file = BufReader::new(file);
|
||||||
let mut line = String::new();
|
let mut line = String::new();
|
|
@ -10,6 +10,8 @@ pub mod word_list;
|
||||||
pub mod db;
|
pub mod db;
|
||||||
pub mod posts_cache;
|
pub mod posts_cache;
|
||||||
pub mod activitypub;
|
pub mod activitypub;
|
||||||
|
pub mod block_list;
|
||||||
|
|
||||||
|
|
||||||
pub const PERIODS: &[u64] = &[4, 24, 7 * 24];
|
pub const PERIODS: &[u64] = &[4, 24, 7 * 24];
|
||||||
|
|
||||||
|
|
|
@ -3,10 +3,12 @@ use futures::{StreamExt, pin_mut};
|
||||||
use metrics_util::MetricKindMask;
|
use metrics_util::MetricKindMask;
|
||||||
use metrics_exporter_prometheus::PrometheusBuilder;
|
use metrics_exporter_prometheus::PrometheusBuilder;
|
||||||
use tokio::time::timeout;
|
use tokio::time::timeout;
|
||||||
use cave::config::LoadConfig;
|
use cave::{
|
||||||
|
block_list::BlockList,
|
||||||
|
config::LoadConfig,
|
||||||
|
};
|
||||||
|
|
||||||
mod config;
|
mod config;
|
||||||
mod block_list;
|
|
||||||
mod scheduler;
|
mod scheduler;
|
||||||
mod worker;
|
mod worker;
|
||||||
mod webfinger;
|
mod webfinger;
|
||||||
|
@ -43,10 +45,10 @@ async fn run() {
|
||||||
let mut store = cave::store::Store::new(16, config.redis, config.redis_password_file).await;
|
let mut store = cave::store::Store::new(16, config.redis, config.redis_password_file).await;
|
||||||
let posts_cache = cave::posts_cache::PostsCache::new(65536);
|
let posts_cache = cave::posts_cache::PostsCache::new(65536);
|
||||||
|
|
||||||
let block_list = block_list::BlockList::new(&config.blocklist).await;
|
let block_list = BlockList::new(&config.blocklist).await;
|
||||||
|
|
||||||
cave::systemd::status("Starting scheduler");
|
cave::systemd::status("Starting scheduler");
|
||||||
let mut scheduler = scheduler::Scheduler::new(block_list);
|
let mut scheduler = scheduler::Scheduler::new(block_list.clone());
|
||||||
cave::systemd::status("Loading known hosts from config");
|
cave::systemd::status("Loading known hosts from config");
|
||||||
for host in config.hosts.into_iter() {
|
for host in config.hosts.into_iter() {
|
||||||
scheduler.introduce(InstanceHost::just_host(host)).await;
|
scheduler.introduce(InstanceHost::just_host(host)).await;
|
||||||
|
@ -129,6 +131,7 @@ async fn run() {
|
||||||
store.clone(),
|
store.clone(),
|
||||||
db.clone(),
|
db.clone(),
|
||||||
posts_cache.clone(),
|
posts_cache.clone(),
|
||||||
|
block_list.clone(),
|
||||||
client.clone(),
|
client.clone(),
|
||||||
host
|
host
|
||||||
)).unwrap();
|
)).unwrap();
|
||||||
|
|
|
@ -5,7 +5,7 @@ use cave::feed::{Mention, Account};
|
||||||
use rand::{thread_rng, Rng};
|
use rand::{thread_rng, Rng};
|
||||||
use tokio::time::Instant;
|
use tokio::time::Instant;
|
||||||
|
|
||||||
use crate::block_list::BlockList;
|
use cave::block_list::BlockList;
|
||||||
|
|
||||||
const MIN_INTERVAL: Duration = Duration::from_secs(30);
|
const MIN_INTERVAL: Duration = Duration::from_secs(30);
|
||||||
const MAX_INTERVAL: Duration = Duration::from_secs(6 * 3600);
|
const MAX_INTERVAL: Duration = Duration::from_secs(6 * 3600);
|
||||||
|
|
|
@ -3,12 +3,13 @@ use std::future::Future;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
use cave::{
|
use cave::{
|
||||||
|
block_list::BlockList,
|
||||||
db::Database,
|
db::Database,
|
||||||
feed::{Feed, EncodablePost, Post, StreamError},
|
feed::{Feed, EncodablePost, Post, StreamError},
|
||||||
posts_cache::PostsCache,
|
posts_cache::PostsCache,
|
||||||
store::Store,
|
store::Store,
|
||||||
};
|
};
|
||||||
use futures::{StreamExt, future};
|
use futures::{StreamExt, future, stream::iter};
|
||||||
use reqwest::StatusCode;
|
use reqwest::StatusCode;
|
||||||
use crate::scheduler::{Host, InstanceHost};
|
use crate::scheduler::{Host, InstanceHost};
|
||||||
use crate::webfinger;
|
use crate::webfinger;
|
||||||
|
@ -75,6 +76,7 @@ pub async fn run(
|
||||||
store: Store,
|
store: Store,
|
||||||
db: Database,
|
db: Database,
|
||||||
posts_cache: PostsCache,
|
posts_cache: PostsCache,
|
||||||
|
block_list: BlockList,
|
||||||
client: reqwest::Client,
|
client: reqwest::Client,
|
||||||
host: InstanceHost,
|
host: InstanceHost,
|
||||||
) {
|
) {
|
||||||
|
@ -85,12 +87,12 @@ pub async fn run(
|
||||||
let (timeline_result, stream_result) = future::join(
|
let (timeline_result, stream_result) = future::join(
|
||||||
fetch_timeline(
|
fetch_timeline(
|
||||||
message_tx.clone(), store.clone(),
|
message_tx.clone(), store.clone(),
|
||||||
&posts_cache,
|
&posts_cache, block_list.clone(),
|
||||||
&client, robots_txt.clone(), &host.host
|
&client, robots_txt.clone(), &host.host
|
||||||
),
|
),
|
||||||
open_stream(
|
open_stream(
|
||||||
message_tx.clone(), store.clone(), db.clone(),
|
message_tx.clone(), store.clone(), db.clone(),
|
||||||
&posts_cache,
|
&posts_cache, block_list.clone(),
|
||||||
&client, robots_txt, host.host.clone()
|
&client, robots_txt, host.host.clone()
|
||||||
),
|
),
|
||||||
).await;
|
).await;
|
||||||
|
@ -165,6 +167,7 @@ async fn fetch_timeline(
|
||||||
message_tx: tokio::sync::mpsc::UnboundedSender<Message>,
|
message_tx: tokio::sync::mpsc::UnboundedSender<Message>,
|
||||||
mut store: Store,
|
mut store: Store,
|
||||||
posts_cache: &PostsCache,
|
posts_cache: &PostsCache,
|
||||||
|
block_list: BlockList,
|
||||||
client: &reqwest::Client,
|
client: &reqwest::Client,
|
||||||
robots_txt: RobotsTxt,
|
robots_txt: RobotsTxt,
|
||||||
host: &Host,
|
host: &Host,
|
||||||
|
@ -188,7 +191,7 @@ async fn fetch_timeline(
|
||||||
|
|
||||||
let mean_interval = feed.mean_post_interval();
|
let mean_interval = feed.mean_post_interval();
|
||||||
|
|
||||||
let (new_post_ratio, introduce_hosts) = process_posts(&mut store, posts_cache, &host, feed.posts.into_iter()).await;
|
let (new_post_ratio, introduce_hosts) = process_posts(&mut store, posts_cache, block_list, &host, feed.posts.into_iter()).await;
|
||||||
for introduce_host in introduce_hosts.into_iter() {
|
for introduce_host in introduce_hosts.into_iter() {
|
||||||
message_tx.send(Message::IntroduceHost(introduce_host)).unwrap();
|
message_tx.send(Message::IntroduceHost(introduce_host)).unwrap();
|
||||||
}
|
}
|
||||||
|
@ -215,6 +218,7 @@ fn scan_for_hosts(introduce_hosts: &mut Vec<InstanceHost>, post: &Post) {
|
||||||
async fn process_posts(
|
async fn process_posts(
|
||||||
store: &mut Store,
|
store: &mut Store,
|
||||||
posts_cache: &PostsCache,
|
posts_cache: &PostsCache,
|
||||||
|
block_list: BlockList,
|
||||||
host: &Host,
|
host: &Host,
|
||||||
posts: impl Iterator<Item = EncodablePost>,
|
posts: impl Iterator<Item = EncodablePost>,
|
||||||
) -> (Option<f64>, Vec<InstanceHost>) {
|
) -> (Option<f64>, Vec<InstanceHost>) {
|
||||||
|
@ -234,9 +238,11 @@ async fn process_posts(
|
||||||
scan_for_hosts(&mut introduce_hosts, &reblog);
|
scan_for_hosts(&mut introduce_hosts, &reblog);
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(_account_host) = post.account.host() {
|
if let Some(account_host) = post.account.host() {
|
||||||
|
if block_list.is_blocked(&account_host).await {
|
||||||
|
tracing::warn!("ignore post from blocked host {account_host}");
|
||||||
|
} else if store.save_post(post).await == Ok(true) {
|
||||||
// send away to redis
|
// send away to redis
|
||||||
if store.save_post(post).await == Ok(true) {
|
|
||||||
new_posts += 1;
|
new_posts += 1;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -260,7 +266,8 @@ async fn process_posts(
|
||||||
|
|
||||||
// dedup introduce_hosts
|
// dedup introduce_hosts
|
||||||
let mut seen_hosts = HashSet::with_capacity(introduce_hosts.len());
|
let mut seen_hosts = HashSet::with_capacity(introduce_hosts.len());
|
||||||
let introduce_hosts = introduce_hosts.into_iter()
|
let introduce_hosts = iter(
|
||||||
|
introduce_hosts.into_iter()
|
||||||
.filter_map(|introduce_host| {
|
.filter_map(|introduce_host| {
|
||||||
if ! seen_hosts.contains(&introduce_host.host) {
|
if ! seen_hosts.contains(&introduce_host.host) {
|
||||||
seen_hosts.insert(introduce_host.host.clone());
|
seen_hosts.insert(introduce_host.host.clone());
|
||||||
|
@ -268,7 +275,16 @@ async fn process_posts(
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
}).collect();
|
})
|
||||||
|
)
|
||||||
|
.filter(|introduce_host| {
|
||||||
|
let block_list = block_list.clone();
|
||||||
|
let host = introduce_host.host.to_string();
|
||||||
|
async move {
|
||||||
|
! block_list.is_blocked(&host).await
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect().await;
|
||||||
|
|
||||||
(new_post_ratio, introduce_hosts)
|
(new_post_ratio, introduce_hosts)
|
||||||
}
|
}
|
||||||
|
@ -278,6 +294,7 @@ async fn open_stream(
|
||||||
store: Store,
|
store: Store,
|
||||||
db: Database,
|
db: Database,
|
||||||
posts_cache: &PostsCache,
|
posts_cache: &PostsCache,
|
||||||
|
block_list: BlockList,
|
||||||
client: &reqwest::Client,
|
client: &reqwest::Client,
|
||||||
robots_txt: RobotsTxt,
|
robots_txt: RobotsTxt,
|
||||||
host: Host,
|
host: Host,
|
||||||
|
@ -336,10 +353,11 @@ async fn open_stream(
|
||||||
let message_tx = message_tx.clone();
|
let message_tx = message_tx.clone();
|
||||||
let mut store = store.clone();
|
let mut store = store.clone();
|
||||||
let posts_cache = posts_cache.clone();
|
let posts_cache = posts_cache.clone();
|
||||||
|
let block_list = block_list.clone();
|
||||||
let host = host.clone();
|
let host = host.clone();
|
||||||
async move {
|
async move {
|
||||||
let (_, introduce_hosts) =
|
let (_, introduce_hosts) =
|
||||||
process_posts(&mut store, &posts_cache, &host, [post].into_iter()).await;
|
process_posts(&mut store, &posts_cache, block_list, &host, [post].into_iter()).await;
|
||||||
for introduce_host in introduce_hosts.into_iter() {
|
for introduce_host in introduce_hosts.into_iter() {
|
||||||
message_tx.send(Message::IntroduceHost(introduce_host)).unwrap();
|
message_tx.send(Message::IntroduceHost(introduce_host)).unwrap();
|
||||||
}
|
}
|
||||||
|
|
|
@ -55,6 +55,7 @@ let
|
||||||
redis_password_file = cfg.redis.passwordFile;
|
redis_password_file = cfg.redis.passwordFile;
|
||||||
in_topic = "relay-in";
|
in_topic = "relay-in";
|
||||||
prometheus_port = 9102;
|
prometheus_port = 9102;
|
||||||
|
blocklist = blocklistPath;
|
||||||
};
|
};
|
||||||
|
|
||||||
sieveSettings = lib.recursiveUpdate sieveDefaultSettings cfg.sieve.settings;
|
sieveSettings = lib.recursiveUpdate sieveDefaultSettings cfg.sieve.settings;
|
||||||
|
|
|
@ -9,6 +9,7 @@ pub struct Config {
|
||||||
pub in_topic: String,
|
pub in_topic: String,
|
||||||
priv_key_file: String,
|
priv_key_file: String,
|
||||||
pub prometheus_port: u16,
|
pub prometheus_port: u16,
|
||||||
|
pub blocklist: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Config {
|
impl Config {
|
||||||
|
|
|
@ -10,6 +10,7 @@ use cave::{
|
||||||
self,
|
self,
|
||||||
fetch::authorized_fetch,
|
fetch::authorized_fetch,
|
||||||
},
|
},
|
||||||
|
block_list::BlockList,
|
||||||
config::LoadConfig,
|
config::LoadConfig,
|
||||||
feed,
|
feed,
|
||||||
};
|
};
|
||||||
|
@ -43,6 +44,7 @@ async fn main() {
|
||||||
cave::init::init_logger(5557);
|
cave::init::init_logger(5557);
|
||||||
|
|
||||||
let config = config::Config::load();
|
let config = config::Config::load();
|
||||||
|
let block_list = BlockList::new(&config.blocklist).await;
|
||||||
|
|
||||||
PrometheusBuilder::new()
|
PrometheusBuilder::new()
|
||||||
.with_http_listener(([0; 8], config.prometheus_port))
|
.with_http_listener(([0; 8], config.prometheus_port))
|
||||||
|
@ -86,6 +88,7 @@ async fn main() {
|
||||||
}
|
}
|
||||||
|
|
||||||
let posts_cache = posts_cache.clone();
|
let posts_cache = posts_cache.clone();
|
||||||
|
let block_list = block_list.clone();
|
||||||
let client = client.clone();
|
let client = client.clone();
|
||||||
let mut store = store.clone();
|
let mut store = store.clone();
|
||||||
let priv_key = priv_key.clone();
|
let priv_key = priv_key.clone();
|
||||||
|
@ -137,6 +140,25 @@ async fn main() {
|
||||||
};
|
};
|
||||||
|
|
||||||
let author: activitypub::Actor = if let Some(author_url) = &post.attributed_to {
|
let author: activitypub::Actor = if let Some(author_url) = &post.attributed_to {
|
||||||
|
if let Ok(url) = Url::parse(author_url) {
|
||||||
|
let host = if let Some(host) = url.host_str() {
|
||||||
|
host
|
||||||
|
} else {
|
||||||
|
tracing::error!("No host in author {author_url}");
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
if block_list.is_blocked(host).await {
|
||||||
|
tracing::warn!("Ignore blocked author {author_url}");
|
||||||
|
metrics::counter!("sieve_activity", 1, "type" => "blocked_author");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
tracing::error!("Invalid author: {author_url}");
|
||||||
|
metrics::counter!("sieve_activity", 1, "type" => "invalid_author");
|
||||||
|
return;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
match authorized_fetch(&client, author_url, KEY_ID, &priv_key).await {
|
match authorized_fetch(&client, author_url, KEY_ID, &priv_key).await {
|
||||||
Ok(author) => {
|
Ok(author) => {
|
||||||
metrics::counter!("sieve_activity", 1, "type" => "fetch_author_ok");
|
metrics::counter!("sieve_activity", 1, "type" => "fetch_author_ok");
|
||||||
|
|
Loading…
Reference in New Issue
Block a user