use std::{ collections::HashSet, sync::Arc, time::Duration, }; use futures::{Stream, StreamExt}; use url::Url; use cave::{ activitypub::{ self, fetch::authorized_fetch, }, config::LoadConfig, feed, }; mod config; pub async fn connect_relay_in(redis_url: Url, in_topic: &str) -> Result>, redis::RedisError> { let client = redis::Client::open(redis_url)?; let mut pubsub_conn = client.get_async_connection() .await? .into_pubsub(); pubsub_conn.subscribe(in_topic) .await?; Ok(pubsub_conn.into_on_message().filter_map(|msg| async move { let data = msg.get_payload::().ok()?; let json = serde_json::from_str(&data) .ok()?; Some(json) })) } const KEY_ID: &str = "https://relay.fedi.buzz/instance/fedi.buzz#key"; #[tokio::main] async fn main() { cave::init::exit_on_panic(); cave::init::init_logger(5557); let config = config::Config::load(); let priv_key = Arc::new(config.priv_key()); let relay_in = connect_relay_in(config.redis_url(), &config.in_topic) .await .unwrap(); let client = reqwest::Client::builder() .timeout(Duration::from_secs(5)) .user_agent(concat!( env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"), )) .pool_max_idle_per_host(1) .pool_idle_timeout(Some(Duration::from_secs(5))) .build() .unwrap(); let store = cave::store::Store::new(1, config.redis, config.redis_password_file).await; let posts_cache = cave::posts_cache::PostsCache::new(65536); let allowed_action_types: Arc> = Arc::new([ "Create", "Announce", "Update", ].into_iter().map(|s| s.to_string()).collect()); tracing::info!("ready"); cave::systemd::ready(); relay_in.for_each(|action| async { // Filter by action type if ! allowed_action_types.contains(&action.action_type) { return; } let posts_cache = posts_cache.clone(); let client = client.clone(); let mut store = store.clone(); let priv_key = priv_key.clone(); tokio::spawn(async move { // Avoid duplicate work let id = if let Some(id) = action.object_id() { let id = id.to_string(); if posts_cache.insert(id.clone()) { return; } id } else { // no object id tracing::warn!("No object id in action: {:?}", action); return; }; let object = if action.object.is_object() { // all good, can be forwarded action.object } else if let Some(id) = action.object.as_str() { // fetch info for id tracing::debug!("GET {id}"); match authorized_fetch(&client, id, KEY_ID, &priv_key).await { Ok(res) => res, Err(e) => { tracing::error!("{id} HTTP: {e:?}"); return; } } } else { tracing::warn!("Invalid object {id} in action: {:?}", action); return; }; let post: activitypub::Post = match serde_json::from_value(object) { Ok(post) => post, Err(e) => { tracing::error!("JSON of {id}: {e:?}"); return; } }; let author: activitypub::Actor = if let Some(author_url) = &post.attributed_to { match authorized_fetch(&client, author_url, KEY_ID, &priv_key).await { Ok(author) => author, Err(e) => { tracing::error!("{author_url} HTTP: {e:?}"); return; } } } else { tracing::error!("No attributedTo in {id}"); return; }; // Translate ActivityPub post to Mastodon client API post format let feed_post = post.to_feed_post(author); let event_type = if action.action_type == "Update" { // Translate more weird Mastodon client API naming "status.update" } else { "update" }.to_string(); let encodable_post = if let Ok(post) = feed::EncodablePost::from_post(event_type, feed_post) { post } else { tracing::error!("Cannot serialize post {id}"); return; }; match store.save_post(encodable_post).await { Ok(true) => tracing::info!("Post was new: {id}"), Ok(false) => tracing::info!("Post was already known: {id}"), Err(e) => tracing::error!("Error forwarding post {id}: {e:?}"), } }); }).await; }