221 lines
7.9 KiB
Rust
221 lines
7.9 KiB
Rust
use std::{
|
|
collections::HashSet,
|
|
sync::Arc,
|
|
time::Duration,
|
|
};
|
|
use futures::{Stream, StreamExt};
|
|
use url::Url;
|
|
use cave::{
|
|
activitypub::{
|
|
self,
|
|
fetch::authorized_fetch,
|
|
},
|
|
block_list::BlockList,
|
|
config::LoadConfig,
|
|
feed,
|
|
};
|
|
use metrics_util::MetricKindMask;
|
|
use metrics_exporter_prometheus::PrometheusBuilder;
|
|
|
|
mod config;
|
|
|
|
pub async fn connect_relay_in(redis_url: Url, in_topic: &str) -> Result<impl Stream<Item = activitypub::Action<serde_json::Value>>, redis::RedisError> {
|
|
let client = redis::Client::open(redis_url)?;
|
|
let mut pubsub_conn = client.get_async_pubsub()
|
|
.await?;
|
|
|
|
pubsub_conn.subscribe(in_topic)
|
|
.await?;
|
|
|
|
Ok(pubsub_conn.into_on_message().filter_map(|msg| async move {
|
|
let data = msg.get_payload::<String>().ok()?;
|
|
let json = serde_json::from_str(&data)
|
|
.ok()?;
|
|
Some(json)
|
|
}))
|
|
}
|
|
|
|
const KEY_ID: &str = "https://relay.fedi.buzz/instance/fedi.buzz#key";
|
|
|
|
#[tokio::main]
|
|
async fn main() {
|
|
cave::init::exit_on_panic();
|
|
cave::init::init_logger(5557);
|
|
|
|
let config = config::Config::load();
|
|
let block_list = BlockList::new(&config.blocklist).await;
|
|
|
|
PrometheusBuilder::new()
|
|
.with_http_listener(([0; 8], config.prometheus_port))
|
|
.add_global_label("application", env!("CARGO_PKG_NAME"))
|
|
.idle_timeout(MetricKindMask::ALL, Some(Duration::from_secs(600)))
|
|
.install()
|
|
.unwrap();
|
|
|
|
let priv_key = Arc::new(config.priv_key());
|
|
let relay_in = connect_relay_in(config.redis_url(), &config.in_topic)
|
|
.await
|
|
.unwrap();
|
|
|
|
let client = reqwest::Client::builder()
|
|
.timeout(Duration::from_secs(5))
|
|
.user_agent(concat!(
|
|
env!("CARGO_PKG_NAME"),
|
|
"/",
|
|
env!("CARGO_PKG_VERSION"),
|
|
))
|
|
.pool_max_idle_per_host(1)
|
|
.pool_idle_timeout(Some(Duration::from_secs(5)))
|
|
.build()
|
|
.unwrap();
|
|
|
|
let store = cave::store::Store::new(1, config.redis, config.redis_password_file).await;
|
|
|
|
let posts_cache = cave::posts_cache::PostsCache::new(65536);
|
|
let allowed_action_types: Arc<HashSet<String>> = Arc::new([
|
|
"Create", "Announce", "Update",
|
|
].into_iter().map(|s| s.to_string()).collect());
|
|
|
|
tracing::info!("ready");
|
|
cave::systemd::ready();
|
|
|
|
relay_in.for_each(|action| async {
|
|
// Filter by action type
|
|
if ! allowed_action_types.contains(&action.action_type) {
|
|
metrics::counter!("sieve_activity", "type" => "type_ignored")
|
|
.increment(1);
|
|
return;
|
|
}
|
|
|
|
let posts_cache = posts_cache.clone();
|
|
let block_list = block_list.clone();
|
|
let client = client.clone();
|
|
let mut store = store.clone();
|
|
let priv_key = priv_key.clone();
|
|
|
|
tokio::spawn(async move {
|
|
// Avoid duplicate work
|
|
let id = if let Some(id) = action.object_id() {
|
|
let id = id.to_string();
|
|
if posts_cache.insert(id.clone()) {
|
|
return;
|
|
}
|
|
id
|
|
} else {
|
|
// no object id
|
|
tracing::warn!("No object id in action: {:?}", action);
|
|
return;
|
|
};
|
|
|
|
let object = if action.object.is_object() {
|
|
// all good, can be forwarded
|
|
action.object
|
|
} else if let Some(id) = action.object.as_str() {
|
|
// fetch info for id
|
|
tracing::debug!("GET {id}");
|
|
match authorized_fetch(&client, id, KEY_ID, &priv_key).await {
|
|
Ok(res) => {
|
|
metrics::counter!("sieve_activity", "type" => "fetch_object_ok")
|
|
.increment(1);
|
|
res
|
|
}
|
|
Err(e) => {
|
|
tracing::error!("{id} HTTP: {e:?}");
|
|
metrics::counter!("sieve_activity", "type" => "fetch_object_error")
|
|
.increment(1);
|
|
return;
|
|
}
|
|
}
|
|
} else {
|
|
tracing::warn!("Invalid object {id} in action: {:?}", action);
|
|
return;
|
|
};
|
|
|
|
let post: activitypub::Post = match serde_json::from_value(object) {
|
|
Ok(post) =>
|
|
post,
|
|
Err(e) => {
|
|
tracing::error!("JSON of {id}: {e:?}");
|
|
metrics::counter!("sieve_activity", "type" => "json_error")
|
|
.increment(1);
|
|
return;
|
|
}
|
|
};
|
|
|
|
let author: activitypub::Actor = if let Some(author_url) = &post.attributed_to {
|
|
if let Ok(url) = Url::parse(author_url) {
|
|
let Some(host) = url.host_str() else {
|
|
tracing::error!("No host in author {author_url}");
|
|
return;
|
|
};
|
|
if block_list.is_blocked(host).await {
|
|
tracing::warn!("Ignore blocked author {author_url}");
|
|
metrics::counter!("sieve_activity", "type" => "blocked_author")
|
|
.increment(1);
|
|
return;
|
|
}
|
|
} else {
|
|
tracing::error!("Invalid author: {author_url}");
|
|
metrics::counter!("sieve_activity", "type" => "invalid_author")
|
|
.increment(1);
|
|
return;
|
|
|
|
}
|
|
|
|
match authorized_fetch(&client, author_url, KEY_ID, &priv_key).await {
|
|
Ok(author) => {
|
|
metrics::counter!("sieve_activity", "type" => "fetch_author_ok")
|
|
.increment(1);
|
|
author
|
|
}
|
|
Err(e) => {
|
|
tracing::error!("{author_url} HTTP: {e:?}");
|
|
metrics::counter!("sieve_activity", "type" => "fetch_author_error")
|
|
.increment(1);
|
|
return;
|
|
}
|
|
}
|
|
} else {
|
|
tracing::error!("No attributedTo in {id}");
|
|
metrics::counter!("sieve_activity", "type" => "no_author")
|
|
.increment(1);
|
|
return;
|
|
};
|
|
// Translate ActivityPub post to Mastodon client API post format
|
|
let feed_post = post.to_feed_post(author);
|
|
let event_type = if action.action_type == "Update" {
|
|
// Translate more weird Mastodon client API naming
|
|
"status.update"
|
|
} else {
|
|
"update"
|
|
}.to_string();
|
|
let Ok(encodable_post) = feed::EncodablePost::from_post(event_type, feed_post) else {
|
|
tracing::error!("Cannot serialize post {id}");
|
|
metrics::counter!("sieve_activity", "type" => "serialize_error")
|
|
.increment(1);
|
|
return;
|
|
};
|
|
|
|
match store.save_post(encodable_post).await {
|
|
Ok(true) => {
|
|
tracing::info!("Post was new: {id}");
|
|
metrics::counter!("sieve_activity", "type" => "post_new")
|
|
.increment(1);
|
|
cave::systemd::watchdog();
|
|
}
|
|
Ok(false) => {
|
|
tracing::info!("Post was already known: {id}");
|
|
metrics::counter!("sieve_activity", "type" => "post_known")
|
|
.increment(1);
|
|
cave::systemd::watchdog();
|
|
}
|
|
Err(e) => {
|
|
tracing::error!("Error forwarding post {id}: {e:?}");
|
|
metrics::counter!("sieve_activity", "type" => "post_error")
|
|
.increment(1);
|
|
}
|
|
}
|
|
});
|
|
}).await;
|
|
}
|