From c26469b778105f124a45a56f3711c439ea77c489 Mon Sep 17 00:00:00 2001 From: Astro Date: Sun, 15 Oct 2023 23:02:34 +0200 Subject: [PATCH] sieve: init --- Cargo.toml | 1 + buzzback/src/main.rs | 2 +- cave/src/activitypub/mod.rs | 86 +++++++++++++++++-- cave/src/feed.rs | 12 ++- sieve/Cargo.toml | 18 ++++ sieve/config.yaml | 4 + sieve/src/config.rs | 29 +++++++ sieve/src/main.rs | 162 ++++++++++++++++++++++++++++++++++++ 8 files changed, 306 insertions(+), 8 deletions(-) create mode 100644 sieve/Cargo.toml create mode 100644 sieve/config.yaml create mode 100644 sieve/src/config.rs create mode 100644 sieve/src/main.rs diff --git a/Cargo.toml b/Cargo.toml index 1b06128..f4ec328 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ members = [ "gatherer", "smokestack", "buzzback", + "sieve", ] resolver = "2" diff --git a/buzzback/src/main.rs b/buzzback/src/main.rs index 15b692b..f922be7 100644 --- a/buzzback/src/main.rs +++ b/buzzback/src/main.rs @@ -20,7 +20,7 @@ async fn follow_back( urlencoding::encode(&follow.actor), ); let action = activitypub::Action { - jsonld_context: serde_json::Value::String("https://www.w3.org/ns/activitystreams".to_string()), + jsonld_context: Some(serde_json::Value::String("https://www.w3.org/ns/activitystreams".to_string())), action_type: "Follow".to_string(), id: follow_id, to: None, diff --git a/cave/src/activitypub/mod.rs b/cave/src/activitypub/mod.rs index 619ecec..cb696ff 100644 --- a/cave/src/activitypub/mod.rs +++ b/cave/src/activitypub/mod.rs @@ -4,23 +4,25 @@ pub mod fetch; mod error; pub use error::Error; +use std::collections::HashMap; use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Actor { #[serde(rename = "@context")] - pub jsonld_context: serde_json::Value, + pub jsonld_context: Option, #[serde(rename = "type")] pub actor_type: String, pub id: String, - pub name: Option, + pub url: Option, + pub name: String, pub icon: Option, pub inbox: String, pub outbox: String, #[serde(rename = "publicKey")] pub public_key: ActorPublicKey, #[serde(rename = "preferredUsername")] - pub preferred_username: Option, + pub preferred_username: String, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -35,20 +37,92 @@ pub struct ActorPublicKey { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Action { #[serde(rename = "@context")] - pub jsonld_context: serde_json::Value, + pub jsonld_context: Option, #[serde(rename = "type")] pub action_type: String, pub id: String, pub actor: String, pub to: Option, - pub object: Option, + pub object: O, +} + +impl Action { + pub fn object_id(&self) -> Option<&str> { + if let Some(id) = self.object.as_str() { + Some(id) + } else if let Some(object) = self.object.as_object() { + object.get("id").and_then(|id| id.as_str()) + } else { + None + } + } +} + +// hack for misskey +fn media_default_content_type() -> String { + "image/jpeg".to_string() } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Media { #[serde(rename = "type")] pub media_type: String, - #[serde(rename = "mediaType")] + #[serde(rename = "mediaType", default = "media_default_content_type")] pub content_type: String, pub url: String, } + +/// This is unfortunately strictly separate from the Client/Server +/// data schema in `cave::feed::Post` +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub struct Post { + pub published: String, + pub id: String, + pub url: Option, + #[serde(rename = "attributedTo")] + pub attributed_to: Option, + #[serde(default = "String::new")] + pub content: String, + #[serde(rename = "contentMap", default = "HashMap::new")] + pub content_map: HashMap, + #[serde(default)] + pub tag: Vec, + pub sensitive: Option, + #[serde(rename = "attachment", default = "Vec::new")] + pub attachments: Vec, +} + +impl Post { + pub fn language(&self) -> Option<&str> { + self.content_map.keys() + .next().map(|s| s.as_str()) + } + + /// Translate ActivityPub post to Mastodon client API post format + pub fn to_feed_post(self, actor: Actor) -> super::feed::Post { + let language = self.language() + .map(|s| s.to_string()); + super::feed::Post { + created_at: self.published, + uri: self.url.unwrap_or(self.id), + content: self.content, + account: super::feed::Account { + username: actor.preferred_username, + display_name: actor.name, + url: actor.url.unwrap_or(actor.id), + bot: actor.actor_type != "Person", + }, + tags: self.tag, + sensitive: self.sensitive, + mentions: vec![], + language, + media_attachments: self.attachments.into_iter().map(|media| { + super::feed::MediaAttachment { + media_type: media.media_type, + remote_url: Some(media.url), + } + }).collect(), + reblog: None, + } + } +} diff --git a/cave/src/feed.rs b/cave/src/feed.rs index 04c6077..285b8dd 100644 --- a/cave/src/feed.rs +++ b/cave/src/feed.rs @@ -70,7 +70,7 @@ pub struct Post { pub account: Account, #[serde(default)] pub tags: Vec, - pub application: Option, + // pub application: Option, pub sensitive: Option, #[serde(default)] pub mentions: Vec, @@ -147,6 +147,7 @@ enum EncodedPost { Stolen, } +// TODO: eliminate /// Wraps a `Post` along with a serializable form that is most close /// to the original incoming data #[derive(Debug)] @@ -164,6 +165,15 @@ impl Deref for EncodablePost { } impl EncodablePost { + pub fn from_post(event_type: String, post: Post) -> Result { + let bytes = serde_json::to_vec(&post)?; + Ok(EncodablePost { + event_type, + post, + encoded: EncodedPost::Bytes(bytes), + }) + } + pub fn from_value(event_type: String, value: serde_json::Value) -> Result { let post = serde_json::from_value(value.clone())?; Ok(EncodablePost { diff --git a/sieve/Cargo.toml b/sieve/Cargo.toml new file mode 100644 index 0000000..ba4ad6b --- /dev/null +++ b/sieve/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "caveman-sieve" +description = "Relay deduplicator and enricher" +version = "0.0.0" +edition = "2021" + +[dependencies] +futures = "0.3" +tokio = { version = "1", features = ["full"] } +serde = { version = "1", features = ["derive"] } +serde_json = "1" +tracing = "0.1" +cave = { path = "../cave" } +url = "2" +redis = { version = "0.23", features = ["tokio-comp", "connection-manager"] } +reqwest = { version = "0.11" } +http = "*" +sigh = "*" diff --git a/sieve/config.yaml b/sieve/config.yaml new file mode 100644 index 0000000..0e8efba --- /dev/null +++ b/sieve/config.yaml @@ -0,0 +1,4 @@ +redis: "redis://fedi.buzz:6379/" +redis_password_file: "redis_password.txt" +in_topic: "relay-in" +priv_key_file: private-key.pem diff --git a/sieve/src/config.rs b/sieve/src/config.rs new file mode 100644 index 0000000..157641b --- /dev/null +++ b/sieve/src/config.rs @@ -0,0 +1,29 @@ +use serde::Deserialize; +use sigh::{PrivateKey, Key}; +use url::Url; + +#[derive(Debug, Deserialize)] +pub struct Config { + pub redis: String, + pub redis_password_file: String, + pub in_topic: String, + priv_key_file: String, +} + +impl Config { + pub fn redis_url(&self) -> Url { + let redis_password = std::fs::read_to_string(&self.redis_password_file) + .expect("redis_password_file"); + let mut redis_url = Url::parse(&self.redis) + .expect("redis_url"); + redis_url.set_password(Some(&redis_password)).unwrap(); + redis_url + } + + pub fn priv_key(&self) -> PrivateKey { + let data = std::fs::read_to_string(&self.priv_key_file) + .expect("read priv_key_file"); + PrivateKey::from_pem(data.as_bytes()) + .expect("priv_key") + } +} diff --git a/sieve/src/main.rs b/sieve/src/main.rs new file mode 100644 index 0000000..5c09246 --- /dev/null +++ b/sieve/src/main.rs @@ -0,0 +1,162 @@ +use std::{ + collections::HashSet, + sync::Arc, + time::Duration, +}; +use futures::{Stream, StreamExt}; +use url::Url; +use cave::{ + activitypub::{ + self, + fetch::authorized_fetch, + }, + config::LoadConfig, + feed, +}; + +mod config; + +pub async fn connect_relay_in(redis_url: Url, in_topic: &str) -> Result>, redis::RedisError> { + let client = redis::Client::open(redis_url)?; + let mut pubsub_conn = client.get_async_connection() + .await? + .into_pubsub(); + + pubsub_conn.subscribe(in_topic) + .await?; + + Ok(pubsub_conn.into_on_message().filter_map(|msg| async move { + let data = msg.get_payload::().ok()?; + let json = serde_json::from_str(&data) + .ok()?; + Some(json) + })) +} + +const KEY_ID: &str = "https://relay.fedi.buzz/instance/fedi.buzz#key"; + +#[tokio::main] +async fn main() { + cave::init::exit_on_panic(); + cave::init::init_logger(5557); + + let config = config::Config::load(); + let priv_key = Arc::new(config.priv_key()); + let relay_in = connect_relay_in(config.redis_url(), &config.in_topic) + .await + .unwrap(); + + let client = reqwest::Client::builder() + .timeout(Duration::from_secs(5)) + .user_agent(concat!( + env!("CARGO_PKG_NAME"), + "/", + env!("CARGO_PKG_VERSION"), + )) + .pool_max_idle_per_host(1) + .pool_idle_timeout(Some(Duration::from_secs(5))) + .build() + .unwrap(); + + let store = cave::store::Store::new(1, config.redis, config.redis_password_file).await; + + let posts_cache = cave::posts_cache::PostsCache::new(65536); + let allowed_action_types: Arc> = Arc::new([ + "Create", "Announce", "Update", + ].into_iter().map(|s| s.to_string()).collect()); + + tracing::info!("ready"); + cave::systemd::ready(); + + relay_in.for_each(|action| async { + // Filter by action type + if ! allowed_action_types.contains(&action.action_type) { + return; + } + + let posts_cache = posts_cache.clone(); + let client = client.clone(); + let mut store = store.clone(); + let priv_key = priv_key.clone(); + + tokio::spawn(async move { + // Avoid duplicate work + let id = if let Some(id) = action.object_id() { + let id = id.to_string(); + if posts_cache.insert(id.clone()) { + return; + } + id + } else { + // no object id + tracing::warn!("No object id in action: {:?}", action); + return; + }; + + let object = if action.object.is_object() { + // all good, can be forwarded + action.object + } else if let Some(id) = action.object.as_str() { + // fetch info for id + tracing::debug!("GET {id}"); + match authorized_fetch(&client, id, KEY_ID, &priv_key).await { + Ok(res) => + res, + Err(e) => { + tracing::error!("{id} HTTP: {e:?}"); + return; + } + } + } else { + tracing::warn!("Invalid object {id} in action: {:?}", action); + return; + }; + + let post: activitypub::Post = match serde_json::from_value(object) { + Ok(post) => + post, + Err(e) => { + tracing::error!("JSON of {id}: {e:?}"); + return; + } + }; + + let author: activitypub::Actor = if let Some(author_url) = &post.attributed_to { + match authorized_fetch(&client, author_url, KEY_ID, &priv_key).await { + Ok(author) => + author, + Err(e) => { + tracing::error!("{author_url} HTTP: {e:?}"); + return; + } + } + } else { + tracing::error!("No attributedTo in {id}"); + return; + }; + // Translate ActivityPub post to Mastodon client API post format + let feed_post = post.to_feed_post(author); + let event_type = if action.action_type == "Update" { + // Translate more weird Mastodon client API naming + "status.update" + } else { + "update" + }.to_string(); + let encodable_post = if let Ok(post) = feed::EncodablePost::from_post(event_type, feed_post) { + post + } else { + tracing::error!("Cannot serialize post {id}"); + return; + }; + + match store.save_post(encodable_post).await { + Ok(true) => + tracing::info!("Post was new: {id}"), + Ok(false) => + tracing::info!("Post was already known: {id}"), + Err(e) => + tracing::error!("Error forwarding post {id}: {e:?}"), + } + }); + }).await; +}