use std::{ collections::HashSet, sync::Arc, time::Duration, }; use futures::{Stream, StreamExt}; use url::Url; use cave::{ activitypub::{ self, fetch::authorized_fetch, }, block_list::BlockList, config::LoadConfig, feed, }; use metrics_util::MetricKindMask; use metrics_exporter_prometheus::PrometheusBuilder; mod config; pub async fn connect_relay_in(redis_url: Url, in_topic: &str) -> Result>, redis::RedisError> { let client = redis::Client::open(redis_url)?; let mut pubsub_conn = client.get_async_connection() .await? .into_pubsub(); pubsub_conn.subscribe(in_topic) .await?; Ok(pubsub_conn.into_on_message().filter_map(|msg| async move { let data = msg.get_payload::().ok()?; let json = serde_json::from_str(&data) .ok()?; Some(json) })) } const KEY_ID: &str = "https://relay.fedi.buzz/instance/fedi.buzz#key"; #[tokio::main] async fn main() { cave::init::exit_on_panic(); cave::init::init_logger(5557); let config = config::Config::load(); let block_list = BlockList::new(&config.blocklist).await; PrometheusBuilder::new() .with_http_listener(([0; 8], config.prometheus_port)) .add_global_label("application", env!("CARGO_PKG_NAME")) .idle_timeout(MetricKindMask::ALL, Some(Duration::from_secs(600))) .install() .unwrap(); let priv_key = Arc::new(config.priv_key()); let relay_in = connect_relay_in(config.redis_url(), &config.in_topic) .await .unwrap(); let client = reqwest::Client::builder() .timeout(Duration::from_secs(5)) .user_agent(concat!( env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"), )) .pool_max_idle_per_host(1) .pool_idle_timeout(Some(Duration::from_secs(5))) .build() .unwrap(); let store = cave::store::Store::new(1, config.redis, config.redis_password_file).await; let posts_cache = cave::posts_cache::PostsCache::new(65536); let allowed_action_types: Arc> = Arc::new([ "Create", "Announce", "Update", ].into_iter().map(|s| s.to_string()).collect()); tracing::info!("ready"); cave::systemd::ready(); relay_in.for_each(|action| async { // Filter by action type if ! allowed_action_types.contains(&action.action_type) { metrics::counter!("sieve_activity", 1, "type" => "type_ignored"); return; } let posts_cache = posts_cache.clone(); let block_list = block_list.clone(); let client = client.clone(); let mut store = store.clone(); let priv_key = priv_key.clone(); tokio::spawn(async move { // Avoid duplicate work let id = if let Some(id) = action.object_id() { let id = id.to_string(); if posts_cache.insert(id.clone()) { return; } id } else { // no object id tracing::warn!("No object id in action: {:?}", action); return; }; let object = if action.object.is_object() { // all good, can be forwarded action.object } else if let Some(id) = action.object.as_str() { // fetch info for id tracing::debug!("GET {id}"); match authorized_fetch(&client, id, KEY_ID, &priv_key).await { Ok(res) => { metrics::counter!("sieve_activity", 1, "type" => "fetch_object_ok"); res } Err(e) => { tracing::error!("{id} HTTP: {e:?}"); metrics::counter!("sieve_activity", 1, "type" => "fetch_object_error"); return; } } } else { tracing::warn!("Invalid object {id} in action: {:?}", action); return; }; let post: activitypub::Post = match serde_json::from_value(object) { Ok(post) => post, Err(e) => { tracing::error!("JSON of {id}: {e:?}"); metrics::counter!("sieve_activity", 1, "type" => "json_error"); return; } }; let author: activitypub::Actor = if let Some(author_url) = &post.attributed_to { if let Ok(url) = Url::parse(author_url) { let Some(host) = url.host_str() else { tracing::error!("No host in author {author_url}"); return; }; if block_list.is_blocked(host).await { tracing::warn!("Ignore blocked author {author_url}"); metrics::counter!("sieve_activity", 1, "type" => "blocked_author"); return; } } else { tracing::error!("Invalid author: {author_url}"); metrics::counter!("sieve_activity", 1, "type" => "invalid_author"); return; } match authorized_fetch(&client, author_url, KEY_ID, &priv_key).await { Ok(author) => { metrics::counter!("sieve_activity", 1, "type" => "fetch_author_ok"); author } Err(e) => { tracing::error!("{author_url} HTTP: {e:?}"); metrics::counter!("sieve_activity", 1, "type" => "fetch_author_error"); return; } } } else { tracing::error!("No attributedTo in {id}"); metrics::counter!("sieve_activity", 1, "type" => "no_author"); return; }; // Translate ActivityPub post to Mastodon client API post format let feed_post = post.to_feed_post(author); let event_type = if action.action_type == "Update" { // Translate more weird Mastodon client API naming "status.update" } else { "update" }.to_string(); let Ok(encodable_post) = feed::EncodablePost::from_post(event_type, feed_post) else { tracing::error!("Cannot serialize post {id}"); metrics::counter!("sieve_activity", 1, "type" => "serialize_error"); return; }; match store.save_post(encodable_post).await { Ok(true) => { tracing::info!("Post was new: {id}"); metrics::counter!("sieve_activity", 1, "type" => "post_new"); cave::systemd::watchdog(); } Ok(false) => { tracing::info!("Post was already known: {id}"); metrics::counter!("sieve_activity", 1, "type" => "post_known"); cave::systemd::watchdog(); } Err(e) => { tracing::error!("Error forwarding post {id}: {e:?}"); metrics::counter!("sieve_activity", 1, "type" => "post_error"); } } }); }).await; }