Compare commits
7 Commits
a6a6afbf33
...
60478dbf27
Author | SHA1 | Date |
---|---|---|
Astro | 60478dbf27 | |
Astro | f4dbad281c | |
Astro | c26469b778 | |
Astro | d3a2f9e017 | |
Astro | 1cb80c01e7 | |
Astro | 8f56f53200 | |
Astro | fe0d19fbc6 |
|
@ -332,9 +332,8 @@ checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec"
|
||||||
name = "buzzback"
|
name = "buzzback"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"cave",
|
||||||
"http",
|
"http",
|
||||||
"http_digest_headers",
|
|
||||||
"httpdate",
|
|
||||||
"reqwest",
|
"reqwest",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
|
@ -368,13 +367,19 @@ dependencies = [
|
||||||
"chrono",
|
"chrono",
|
||||||
"eventsource-stream",
|
"eventsource-stream",
|
||||||
"futures",
|
"futures",
|
||||||
|
"http",
|
||||||
|
"http_digest_headers",
|
||||||
|
"httpdate",
|
||||||
"inotify",
|
"inotify",
|
||||||
|
"metrics",
|
||||||
"redis",
|
"redis",
|
||||||
"reqwest",
|
"reqwest",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"serde_yaml",
|
"serde_yaml",
|
||||||
|
"sigh",
|
||||||
"systemd",
|
"systemd",
|
||||||
|
"thiserror",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-postgres",
|
"tokio-postgres",
|
||||||
"tracing",
|
"tracing",
|
||||||
|
@ -449,6 +454,26 @@ dependencies = [
|
||||||
"urlencoding 1.3.3",
|
"urlencoding 1.3.3",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "caveman-sieve"
|
||||||
|
version = "0.0.0"
|
||||||
|
dependencies = [
|
||||||
|
"cave",
|
||||||
|
"futures",
|
||||||
|
"http",
|
||||||
|
"metrics",
|
||||||
|
"metrics-exporter-prometheus",
|
||||||
|
"metrics-util",
|
||||||
|
"redis",
|
||||||
|
"reqwest",
|
||||||
|
"serde",
|
||||||
|
"serde_json",
|
||||||
|
"sigh",
|
||||||
|
"tokio",
|
||||||
|
"tracing",
|
||||||
|
"url",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "caveman-smokestack"
|
name = "caveman-smokestack"
|
||||||
version = "0.0.0"
|
version = "0.0.0"
|
||||||
|
|
|
@ -6,6 +6,7 @@ members = [
|
||||||
"gatherer",
|
"gatherer",
|
||||||
"smokestack",
|
"smokestack",
|
||||||
"buzzback",
|
"buzzback",
|
||||||
|
"sieve",
|
||||||
]
|
]
|
||||||
resolver = "2"
|
resolver = "2"
|
||||||
|
|
||||||
|
|
|
@ -12,10 +12,9 @@ serde = { version = "1", features = ["serde_derive"] }
|
||||||
serde_json = "1"
|
serde_json = "1"
|
||||||
serde_yaml = "0.9"
|
serde_yaml = "0.9"
|
||||||
reqwest = { version = "0.11", features = ["json", "stream"] }
|
reqwest = { version = "0.11", features = ["json", "stream"] }
|
||||||
sigh = "1.0"
|
sigh = "1"
|
||||||
http_digest_headers = { version="0.1.0", default-features = false, features = ["use_openssl"] }
|
|
||||||
thiserror = "1"
|
thiserror = "1"
|
||||||
http = "0.2"
|
http = "0.2"
|
||||||
tokio-postgres = "0.7"
|
tokio-postgres = "0.7"
|
||||||
urlencoding = "2"
|
urlencoding = "2"
|
||||||
httpdate = "1"
|
cave = { path = "../cave" }
|
||||||
|
|
|
@ -1,48 +0,0 @@
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
||||||
pub struct Actor {
|
|
||||||
#[serde(rename = "@context")]
|
|
||||||
pub jsonld_context: serde_json::Value,
|
|
||||||
#[serde(rename = "type")]
|
|
||||||
pub actor_type: String,
|
|
||||||
pub id: String,
|
|
||||||
pub name: Option<String>,
|
|
||||||
pub icon: Option<Media>,
|
|
||||||
pub inbox: String,
|
|
||||||
pub outbox: String,
|
|
||||||
#[serde(rename = "publicKey")]
|
|
||||||
pub public_key: ActorPublicKey,
|
|
||||||
#[serde(rename = "preferredUsername")]
|
|
||||||
pub preferred_username: Option<String>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
||||||
pub struct ActorPublicKey {
|
|
||||||
pub id: String,
|
|
||||||
pub owner: Option<String>,
|
|
||||||
#[serde(rename = "publicKeyPem")]
|
|
||||||
pub pem: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// `ActivityPub` "activity"
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
||||||
pub struct Action<O> {
|
|
||||||
#[serde(rename = "@context")]
|
|
||||||
pub jsonld_context: serde_json::Value,
|
|
||||||
#[serde(rename = "type")]
|
|
||||||
pub action_type: String,
|
|
||||||
pub id: String,
|
|
||||||
pub actor: String,
|
|
||||||
pub to: Option<serde_json::Value>,
|
|
||||||
pub object: Option<O>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
||||||
pub struct Media {
|
|
||||||
#[serde(rename = "type")]
|
|
||||||
pub media_type: String,
|
|
||||||
#[serde(rename = "mediaType")]
|
|
||||||
pub content_type: String,
|
|
||||||
pub url: String,
|
|
||||||
}
|
|
|
@ -1,7 +1,5 @@
|
||||||
#[derive(Debug, thiserror::Error)]
|
#[derive(Debug, thiserror::Error)]
|
||||||
pub enum Error {
|
pub enum Error {
|
||||||
#[error("HTTP Digest generation error")]
|
|
||||||
Digest,
|
|
||||||
#[error("JSON encoding error")]
|
#[error("JSON encoding error")]
|
||||||
Json(#[from] serde_json::Error),
|
Json(#[from] serde_json::Error),
|
||||||
#[error("Signature error")]
|
#[error("Signature error")]
|
||||||
|
@ -10,8 +8,4 @@ pub enum Error {
|
||||||
HttpReq(#[from] http::Error),
|
HttpReq(#[from] http::Error),
|
||||||
#[error("HTTP client error")]
|
#[error("HTTP client error")]
|
||||||
Http(#[from] reqwest::Error),
|
Http(#[from] reqwest::Error),
|
||||||
#[error("Invalid URI")]
|
|
||||||
InvalidUri,
|
|
||||||
#[error("Error response from remote")]
|
|
||||||
Response(String),
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,13 +2,11 @@ use sigh::PrivateKey;
|
||||||
use std::{sync::Arc, time::Duration};
|
use std::{sync::Arc, time::Duration};
|
||||||
use std::{panic, process};
|
use std::{panic, process};
|
||||||
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
|
||||||
|
use cave::activitypub;
|
||||||
|
|
||||||
mod error;
|
mod error;
|
||||||
mod config;
|
mod config;
|
||||||
mod db;
|
mod db;
|
||||||
mod activitypub;
|
|
||||||
mod digest;
|
|
||||||
mod send;
|
|
||||||
|
|
||||||
async fn follow_back(
|
async fn follow_back(
|
||||||
client: &reqwest::Client, hostname: &str,
|
client: &reqwest::Client, hostname: &str,
|
||||||
|
@ -22,7 +20,7 @@ async fn follow_back(
|
||||||
urlencoding::encode(&follow.actor),
|
urlencoding::encode(&follow.actor),
|
||||||
);
|
);
|
||||||
let action = activitypub::Action {
|
let action = activitypub::Action {
|
||||||
jsonld_context: serde_json::Value::String("https://www.w3.org/ns/activitystreams".to_string()),
|
jsonld_context: Some(serde_json::Value::String("https://www.w3.org/ns/activitystreams".to_string())),
|
||||||
action_type: "Follow".to_string(),
|
action_type: "Follow".to_string(),
|
||||||
id: follow_id,
|
id: follow_id,
|
||||||
to: None,
|
to: None,
|
||||||
|
@ -30,7 +28,7 @@ async fn follow_back(
|
||||||
object: Some(&follow.id),
|
object: Some(&follow.id),
|
||||||
};
|
};
|
||||||
let key_id = format!("{}#key", follow.actor);
|
let key_id = format!("{}#key", follow.actor);
|
||||||
let result = send::send(
|
let result = activitypub::send::send(
|
||||||
client, &follow.inbox,
|
client, &follow.inbox,
|
||||||
&key_id, &priv_key,
|
&key_id, &priv_key,
|
||||||
&action,
|
&action,
|
||||||
|
|
|
@ -21,3 +21,9 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
||||||
inotify = "0.10"
|
inotify = "0.10"
|
||||||
tokio-postgres = "0.7"
|
tokio-postgres = "0.7"
|
||||||
url = "2"
|
url = "2"
|
||||||
|
metrics = "0.20"
|
||||||
|
sigh = "1"
|
||||||
|
http_digest_headers = { version="0.1.0", default-features = false, features = ["use_openssl"] }
|
||||||
|
http = "0.2"
|
||||||
|
thiserror = "1"
|
||||||
|
httpdate = "1"
|
||||||
|
|
|
@ -0,0 +1,19 @@
|
||||||
|
#[derive(Debug, thiserror::Error)]
|
||||||
|
pub enum Error {
|
||||||
|
#[error("HTTP Digest generation error")]
|
||||||
|
Digest,
|
||||||
|
#[error("JSON encoding error")]
|
||||||
|
Json(#[from] serde_json::Error),
|
||||||
|
#[error("Signature error")]
|
||||||
|
Signature(#[from] sigh::Error),
|
||||||
|
#[error("Signature verification failure")]
|
||||||
|
SignatureFail,
|
||||||
|
#[error("HTTP request error")]
|
||||||
|
HttpReq(#[from] http::Error),
|
||||||
|
#[error("HTTP client error")]
|
||||||
|
Http(#[from] reqwest::Error),
|
||||||
|
#[error("Invalid URI")]
|
||||||
|
InvalidUri,
|
||||||
|
#[error("Error response from remote")]
|
||||||
|
Response(String),
|
||||||
|
}
|
|
@ -0,0 +1,39 @@
|
||||||
|
use std::time::SystemTime;
|
||||||
|
use http::StatusCode;
|
||||||
|
use serde::de::DeserializeOwned;
|
||||||
|
use sigh::{PrivateKey, SigningConfig, alg::RsaSha256};
|
||||||
|
use super::{digest, error::Error};
|
||||||
|
|
||||||
|
pub async fn authorized_fetch<T>(
|
||||||
|
client: &reqwest::Client,
|
||||||
|
uri: &str,
|
||||||
|
key_id: &str,
|
||||||
|
private_key: &PrivateKey,
|
||||||
|
) -> Result<T, Error>
|
||||||
|
where
|
||||||
|
T: DeserializeOwned,
|
||||||
|
{
|
||||||
|
let url = reqwest::Url::parse(uri)
|
||||||
|
.map_err(|_| Error::InvalidUri)?;
|
||||||
|
let host = format!("{}", url.host().ok_or(Error::InvalidUri)?);
|
||||||
|
let digest_header = digest::generate_header(&[])
|
||||||
|
.expect("digest::generate_header");
|
||||||
|
let mut req = http::Request::builder()
|
||||||
|
.uri(uri)
|
||||||
|
.header("host", &host)
|
||||||
|
.header("content-type", "application/activity+json")
|
||||||
|
.header("date", httpdate::fmt_http_date(SystemTime::now()))
|
||||||
|
.header("accept", "application/activity+json, application/ld+json")
|
||||||
|
.header("digest", digest_header)
|
||||||
|
.body(vec![])?;
|
||||||
|
SigningConfig::new(RsaSha256, private_key, key_id)
|
||||||
|
.sign(&mut req)?;
|
||||||
|
let req: reqwest::Request = req.try_into()?;
|
||||||
|
let res = client.execute(req)
|
||||||
|
.await?;
|
||||||
|
if res.status() >= StatusCode::OK && res.status() < StatusCode::MULTIPLE_CHOICES {
|
||||||
|
Ok(res.json().await?)
|
||||||
|
} else {
|
||||||
|
Err(Error::Response(res.text().await?))
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,128 @@
|
||||||
|
pub mod digest;
|
||||||
|
pub mod send;
|
||||||
|
pub mod fetch;
|
||||||
|
mod error;
|
||||||
|
pub use error::Error;
|
||||||
|
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct Actor {
|
||||||
|
#[serde(rename = "@context")]
|
||||||
|
pub jsonld_context: Option<serde_json::Value>,
|
||||||
|
#[serde(rename = "type")]
|
||||||
|
pub actor_type: String,
|
||||||
|
pub id: String,
|
||||||
|
pub url: Option<String>,
|
||||||
|
pub name: String,
|
||||||
|
pub icon: Option<Media>,
|
||||||
|
pub inbox: String,
|
||||||
|
pub outbox: String,
|
||||||
|
#[serde(rename = "publicKey")]
|
||||||
|
pub public_key: ActorPublicKey,
|
||||||
|
#[serde(rename = "preferredUsername")]
|
||||||
|
pub preferred_username: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct ActorPublicKey {
|
||||||
|
pub id: String,
|
||||||
|
pub owner: Option<String>,
|
||||||
|
#[serde(rename = "publicKeyPem")]
|
||||||
|
pub pem: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// `ActivityPub` "activity"
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct Action<O> {
|
||||||
|
#[serde(rename = "@context")]
|
||||||
|
pub jsonld_context: Option<serde_json::Value>,
|
||||||
|
#[serde(rename = "type")]
|
||||||
|
pub action_type: String,
|
||||||
|
pub id: String,
|
||||||
|
pub actor: String,
|
||||||
|
pub to: Option<serde_json::Value>,
|
||||||
|
pub object: O,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Action<serde_json::Value> {
|
||||||
|
pub fn object_id(&self) -> Option<&str> {
|
||||||
|
if let Some(id) = self.object.as_str() {
|
||||||
|
Some(id)
|
||||||
|
} else if let Some(object) = self.object.as_object() {
|
||||||
|
object.get("id").and_then(|id| id.as_str())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// hack for misskey
|
||||||
|
fn media_default_content_type() -> String {
|
||||||
|
"image/jpeg".to_string()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct Media {
|
||||||
|
#[serde(rename = "type")]
|
||||||
|
pub media_type: String,
|
||||||
|
#[serde(rename = "mediaType", default = "media_default_content_type")]
|
||||||
|
pub content_type: String,
|
||||||
|
pub url: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// This is unfortunately strictly separate from the Client/Server
|
||||||
|
/// data schema in `cave::feed::Post`
|
||||||
|
#[derive(Debug, serde::Serialize, serde::Deserialize)]
|
||||||
|
pub struct Post {
|
||||||
|
pub published: String,
|
||||||
|
pub id: String,
|
||||||
|
pub url: Option<String>,
|
||||||
|
#[serde(rename = "attributedTo")]
|
||||||
|
pub attributed_to: Option<String>,
|
||||||
|
#[serde(default = "String::new")]
|
||||||
|
pub content: String,
|
||||||
|
#[serde(rename = "contentMap", default = "HashMap::new")]
|
||||||
|
pub content_map: HashMap<String, String>,
|
||||||
|
#[serde(default)]
|
||||||
|
pub tag: Vec<super::feed::Tag>,
|
||||||
|
pub sensitive: Option<bool>,
|
||||||
|
#[serde(rename = "attachment", default = "Vec::new")]
|
||||||
|
pub attachments: Vec<Media>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Post {
|
||||||
|
pub fn language(&self) -> Option<&str> {
|
||||||
|
self.content_map.keys()
|
||||||
|
.next().map(|s| s.as_str())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Translate ActivityPub post to Mastodon client API post format
|
||||||
|
pub fn to_feed_post(self, actor: Actor) -> super::feed::Post {
|
||||||
|
let language = self.language()
|
||||||
|
.map(|s| s.to_string());
|
||||||
|
super::feed::Post {
|
||||||
|
created_at: self.published,
|
||||||
|
uri: self.url.unwrap_or(self.id),
|
||||||
|
content: self.content,
|
||||||
|
account: super::feed::Account {
|
||||||
|
username: actor.preferred_username,
|
||||||
|
display_name: actor.name,
|
||||||
|
url: actor.url.unwrap_or(actor.id),
|
||||||
|
bot: actor.actor_type != "Person",
|
||||||
|
},
|
||||||
|
tags: self.tag,
|
||||||
|
sensitive: self.sensitive,
|
||||||
|
mentions: vec![],
|
||||||
|
language,
|
||||||
|
media_attachments: self.attachments.into_iter().map(|media| {
|
||||||
|
super::feed::MediaAttachment {
|
||||||
|
media_type: media.media_type,
|
||||||
|
remote_url: Some(media.url),
|
||||||
|
}
|
||||||
|
}).collect(),
|
||||||
|
reblog: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,11 +1,12 @@
|
||||||
use std::{
|
use std::{
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
time::SystemTime,
|
time::{Instant, SystemTime},
|
||||||
};
|
};
|
||||||
use http::StatusCode;
|
use http::StatusCode;
|
||||||
|
use metrics::histogram;
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use sigh::{PrivateKey, SigningConfig, alg::RsaSha256};
|
use sigh::{PrivateKey, SigningConfig, alg::RsaSha256};
|
||||||
use crate::{digest, error::Error};
|
use super::{digest, error::Error};
|
||||||
|
|
||||||
pub async fn send<T: Serialize>(
|
pub async fn send<T: Serialize>(
|
||||||
client: &reqwest::Client,
|
client: &reqwest::Client,
|
||||||
|
@ -42,14 +43,20 @@ pub async fn send_raw(
|
||||||
.header("digest", digest_header)
|
.header("digest", digest_header)
|
||||||
.body(body.as_ref().clone())
|
.body(body.as_ref().clone())
|
||||||
.map_err(Error::HttpReq)?;
|
.map_err(Error::HttpReq)?;
|
||||||
|
let t1 = Instant::now();
|
||||||
SigningConfig::new(RsaSha256, private_key, key_id)
|
SigningConfig::new(RsaSha256, private_key, key_id)
|
||||||
.sign(&mut req)?;
|
.sign(&mut req)?;
|
||||||
|
let t2 = Instant::now();
|
||||||
let req: reqwest::Request = req.try_into()?;
|
let req: reqwest::Request = req.try_into()?;
|
||||||
let res = client.execute(req)
|
let res = client.execute(req)
|
||||||
.await?;
|
.await?;
|
||||||
|
let t3 = Instant::now();
|
||||||
|
histogram!("relay_http_request_duration", t2 - t1);
|
||||||
if res.status() >= StatusCode::OK && res.status() < StatusCode::MULTIPLE_CHOICES {
|
if res.status() >= StatusCode::OK && res.status() < StatusCode::MULTIPLE_CHOICES {
|
||||||
|
histogram!("relay_http_response_duration", t3 - t2, "res" => "ok", "host" => host);
|
||||||
Ok(())
|
Ok(())
|
||||||
} else {
|
} else {
|
||||||
|
histogram!("relay_http_response_duration", t3 - t2, "res" => "err", "host" => host);
|
||||||
tracing::error!("send_raw {} response HTTP {}", url, res.status());
|
tracing::error!("send_raw {} response HTTP {}", url, res.status());
|
||||||
let response = res.text().await?;
|
let response = res.text().await?;
|
||||||
Err(Error::Response(response))
|
Err(Error::Response(response))
|
|
@ -70,7 +70,7 @@ pub struct Post {
|
||||||
pub account: Account,
|
pub account: Account,
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub tags: Vec<Tag>,
|
pub tags: Vec<Tag>,
|
||||||
pub application: Option<Application>,
|
// pub application: Option<Application>,
|
||||||
pub sensitive: Option<bool>,
|
pub sensitive: Option<bool>,
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub mentions: Vec<Mention>,
|
pub mentions: Vec<Mention>,
|
||||||
|
@ -147,6 +147,7 @@ enum EncodedPost {
|
||||||
Stolen,
|
Stolen,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: eliminate
|
||||||
/// Wraps a `Post` along with a serializable form that is most close
|
/// Wraps a `Post` along with a serializable form that is most close
|
||||||
/// to the original incoming data
|
/// to the original incoming data
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
|
@ -164,6 +165,15 @@ impl Deref for EncodablePost {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl EncodablePost {
|
impl EncodablePost {
|
||||||
|
pub fn from_post(event_type: String, post: Post) -> Result<Self, serde_json::Error> {
|
||||||
|
let bytes = serde_json::to_vec(&post)?;
|
||||||
|
Ok(EncodablePost {
|
||||||
|
event_type,
|
||||||
|
post,
|
||||||
|
encoded: EncodedPost::Bytes(bytes),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
pub fn from_value(event_type: String, value: serde_json::Value) -> Result<Self, serde_json::Error> {
|
pub fn from_value(event_type: String, value: serde_json::Value) -> Result<Self, serde_json::Error> {
|
||||||
let post = serde_json::from_value(value.clone())?;
|
let post = serde_json::from_value(value.clone())?;
|
||||||
Ok(EncodablePost {
|
Ok(EncodablePost {
|
||||||
|
|
|
@ -8,6 +8,8 @@ pub mod firehose;
|
||||||
pub mod live_file;
|
pub mod live_file;
|
||||||
pub mod word_list;
|
pub mod word_list;
|
||||||
pub mod db;
|
pub mod db;
|
||||||
|
pub mod posts_cache;
|
||||||
|
pub mod activitypub;
|
||||||
|
|
||||||
pub const PERIODS: &[u64] = &[4, 24, 7 * 24];
|
pub const PERIODS: &[u64] = &[4, 24, 7 * 24];
|
||||||
|
|
||||||
|
|
11
flake.nix
11
flake.nix
|
@ -11,7 +11,8 @@
|
||||||
overlay = final: prev: {
|
overlay = final: prev: {
|
||||||
inherit (self.packages.${prev.system})
|
inherit (self.packages.${prev.system})
|
||||||
caveman-hunter caveman-butcher
|
caveman-hunter caveman-butcher
|
||||||
caveman-gatherer caveman-smokestack;
|
caveman-gatherer caveman-sieve
|
||||||
|
caveman-smokestack;
|
||||||
};
|
};
|
||||||
|
|
||||||
nixosModule = self.nixosModules.caveman;
|
nixosModule = self.nixosModules.caveman;
|
||||||
|
@ -100,6 +101,14 @@
|
||||||
cp -rv gatherer/{templates,assets} $out/share/caveman/gatherer/
|
cp -rv gatherer/{templates,assets} $out/share/caveman/gatherer/
|
||||||
'';
|
'';
|
||||||
};
|
};
|
||||||
|
packages.caveman-sieve = naersk-lib.buildPackage rec {
|
||||||
|
pname = "caveman-sieve";
|
||||||
|
version = self.lastModifiedDate;
|
||||||
|
inherit src;
|
||||||
|
targets = [ pname ];
|
||||||
|
nativeBuildInputs = with pkgs; [ pkg-config ];
|
||||||
|
buildInputs = with pkgs; [ openssl systemd ];
|
||||||
|
};
|
||||||
packages.caveman-smokestack = naersk-lib.buildPackage rec {
|
packages.caveman-smokestack = naersk-lib.buildPackage rec {
|
||||||
pname = "caveman-smokestack";
|
pname = "caveman-smokestack";
|
||||||
version = self.lastModifiedDate;
|
version = self.lastModifiedDate;
|
||||||
|
|
|
@ -9,7 +9,6 @@ mod config;
|
||||||
mod block_list;
|
mod block_list;
|
||||||
mod scheduler;
|
mod scheduler;
|
||||||
mod worker;
|
mod worker;
|
||||||
mod posts_cache;
|
|
||||||
mod webfinger;
|
mod webfinger;
|
||||||
|
|
||||||
use scheduler::InstanceHost;
|
use scheduler::InstanceHost;
|
||||||
|
@ -42,7 +41,7 @@ async fn run() {
|
||||||
|
|
||||||
let db = cave::db::Database::connect(&config.database).await;
|
let db = cave::db::Database::connect(&config.database).await;
|
||||||
let mut store = cave::store::Store::new(16, config.redis, config.redis_password_file).await;
|
let mut store = cave::store::Store::new(16, config.redis, config.redis_password_file).await;
|
||||||
let posts_cache = posts_cache::PostsCache::new(65536);
|
let posts_cache = cave::posts_cache::PostsCache::new(65536);
|
||||||
|
|
||||||
let block_list = block_list::BlockList::new(&config.blocklist).await;
|
let block_list = block_list::BlockList::new(&config.blocklist).await;
|
||||||
|
|
||||||
|
|
|
@ -5,11 +5,11 @@ use std::time::{Duration, Instant};
|
||||||
use cave::{
|
use cave::{
|
||||||
db::Database,
|
db::Database,
|
||||||
feed::{Feed, EncodablePost, Post, StreamError},
|
feed::{Feed, EncodablePost, Post, StreamError},
|
||||||
|
posts_cache::PostsCache,
|
||||||
store::Store,
|
store::Store,
|
||||||
};
|
};
|
||||||
use futures::{StreamExt, future};
|
use futures::{StreamExt, future};
|
||||||
use reqwest::StatusCode;
|
use reqwest::StatusCode;
|
||||||
use crate::posts_cache::PostsCache;
|
|
||||||
use crate::scheduler::{Host, InstanceHost};
|
use crate::scheduler::{Host, InstanceHost};
|
||||||
use crate::webfinger;
|
use crate::webfinger;
|
||||||
|
|
||||||
|
|
|
@ -50,6 +50,19 @@ let
|
||||||
builtins.toJSON gathererSettings
|
builtins.toJSON gathererSettings
|
||||||
);
|
);
|
||||||
|
|
||||||
|
sieveDefaultSettings = {
|
||||||
|
redis = "redis://127.0.0.1:${toString cfg.redis.port}/";
|
||||||
|
redis_password_file = cfg.redis.passwordFile;
|
||||||
|
in_topic = "relay-in";
|
||||||
|
prometheus_port = 9102;
|
||||||
|
};
|
||||||
|
|
||||||
|
sieveSettings = lib.recursiveUpdate sieveDefaultSettings cfg.sieve.settings;
|
||||||
|
|
||||||
|
sieveConfigFile = builtins.toFile "sieve.yaml" (
|
||||||
|
builtins.toJSON sieveSettings
|
||||||
|
);
|
||||||
|
|
||||||
smokestackDefaultSettings = {
|
smokestackDefaultSettings = {
|
||||||
redis = "redis://127.0.0.1:${toString cfg.redis.port}/";
|
redis = "redis://127.0.0.1:${toString cfg.redis.port}/";
|
||||||
redis_password_file = cfg.redis.passwordFile;
|
redis_password_file = cfg.redis.passwordFile;
|
||||||
|
@ -119,6 +132,18 @@ in
|
||||||
default = "DEBUG";
|
default = "DEBUG";
|
||||||
};
|
};
|
||||||
|
|
||||||
|
sieve.enable = mkEnableOption "caveman sieve";
|
||||||
|
|
||||||
|
sieve.settings = mkOption {
|
||||||
|
type = types.anything;
|
||||||
|
default = sieveDefaultSettings;
|
||||||
|
};
|
||||||
|
|
||||||
|
sieve.logLevel = mkOption {
|
||||||
|
type = types.enum [ "ERROR" "WARN" "INFO" "DEBUG" "TRACE" ];
|
||||||
|
default = "DEBUG";
|
||||||
|
};
|
||||||
|
|
||||||
smokestack.enable = mkEnableOption "caveman smokestack";
|
smokestack.enable = mkEnableOption "caveman smokestack";
|
||||||
|
|
||||||
smokestack.settings = mkOption {
|
smokestack.settings = mkOption {
|
||||||
|
@ -139,6 +164,7 @@ in
|
||||||
|
|
||||||
networking.firewall.allowedTCPPorts = [
|
networking.firewall.allowedTCPPorts = [
|
||||||
hunterSettings.prometheus_port
|
hunterSettings.prometheus_port
|
||||||
|
sieveSettings.prometheus_port
|
||||||
];
|
];
|
||||||
|
|
||||||
systemd.tmpfiles.rules = [
|
systemd.tmpfiles.rules = [
|
||||||
|
@ -305,6 +331,34 @@ in
|
||||||
'';
|
'';
|
||||||
};
|
};
|
||||||
|
|
||||||
|
systemd.services.caveman-sieve = lib.mkIf cfg.sieve.enable {
|
||||||
|
wantedBy = [ "multi-user.target" ];
|
||||||
|
requires = [ "redis-caveman.service" ];
|
||||||
|
after = [ "redis-caveman.service" "network-online.target" ];
|
||||||
|
environment.RUST_LOG = "caveman=${cfg.sieve.logLevel}";
|
||||||
|
serviceConfig = {
|
||||||
|
ExecStart = "${pkgs.caveman-sieve}/bin/caveman-sieve ${sieveConfigFile}";
|
||||||
|
Type = "notify";
|
||||||
|
WatchdogSec = 300;
|
||||||
|
Restart = "always";
|
||||||
|
RestartSec = 1;
|
||||||
|
DynamicUser = true;
|
||||||
|
User = "caveman-sieve";
|
||||||
|
ProtectSystem = "strict";
|
||||||
|
ProtectHome = true;
|
||||||
|
ProtectHostname = true;
|
||||||
|
ProtectKernelLogs = true;
|
||||||
|
ProtectKernelModules = true;
|
||||||
|
ProtectKernelTunables = true;
|
||||||
|
RestrictNamespaces = true;
|
||||||
|
RestrictRealtime = true;
|
||||||
|
LockPersonality = true;
|
||||||
|
MemoryDenyWriteExecute = true;
|
||||||
|
LimitNOFile = limitNOFILE;
|
||||||
|
LimitRSS = "128M:256M";
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
systemd.services.caveman-smokestack = lib.mkIf cfg.smokestack.enable {
|
systemd.services.caveman-smokestack = lib.mkIf cfg.smokestack.enable {
|
||||||
wantedBy = [ "multi-user.target" ];
|
wantedBy = [ "multi-user.target" ];
|
||||||
requires = [ "redis-caveman.service" "caveman-hunter.service" ];
|
requires = [ "redis-caveman.service" "caveman-hunter.service" ];
|
||||||
|
|
|
@ -0,0 +1,21 @@
|
||||||
|
[package]
|
||||||
|
name = "caveman-sieve"
|
||||||
|
description = "Relay deduplicator and enricher"
|
||||||
|
version = "0.0.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
futures = "0.3"
|
||||||
|
tokio = { version = "1", features = ["full"] }
|
||||||
|
serde = { version = "1", features = ["derive"] }
|
||||||
|
serde_json = "1"
|
||||||
|
tracing = "0.1"
|
||||||
|
cave = { path = "../cave" }
|
||||||
|
url = "2"
|
||||||
|
redis = { version = "0.23", features = ["tokio-comp", "connection-manager"] }
|
||||||
|
reqwest = { version = "0.11" }
|
||||||
|
http = "*"
|
||||||
|
sigh = "*"
|
||||||
|
metrics = "0.20"
|
||||||
|
metrics-util = "0.14"
|
||||||
|
metrics-exporter-prometheus = "0.11"
|
|
@ -0,0 +1,5 @@
|
||||||
|
redis: "redis://fedi.buzz:6379/"
|
||||||
|
redis_password_file: "redis_password.txt"
|
||||||
|
in_topic: "relay-in"
|
||||||
|
priv_key_file: private-key.pem
|
||||||
|
prometheus_port: 9102
|
|
@ -0,0 +1,30 @@
|
||||||
|
use serde::Deserialize;
|
||||||
|
use sigh::{PrivateKey, Key};
|
||||||
|
use url::Url;
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
pub struct Config {
|
||||||
|
pub redis: String,
|
||||||
|
pub redis_password_file: String,
|
||||||
|
pub in_topic: String,
|
||||||
|
priv_key_file: String,
|
||||||
|
pub prometheus_port: u16,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Config {
|
||||||
|
pub fn redis_url(&self) -> Url {
|
||||||
|
let redis_password = std::fs::read_to_string(&self.redis_password_file)
|
||||||
|
.expect("redis_password_file");
|
||||||
|
let mut redis_url = Url::parse(&self.redis)
|
||||||
|
.expect("redis_url");
|
||||||
|
redis_url.set_password(Some(&redis_password)).unwrap();
|
||||||
|
redis_url
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn priv_key(&self) -> PrivateKey {
|
||||||
|
let data = std::fs::read_to_string(&self.priv_key_file)
|
||||||
|
.expect("read priv_key_file");
|
||||||
|
PrivateKey::from_pem(data.as_bytes())
|
||||||
|
.expect("priv_key")
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,190 @@
|
||||||
|
use std::{
|
||||||
|
collections::HashSet,
|
||||||
|
sync::Arc,
|
||||||
|
time::Duration,
|
||||||
|
};
|
||||||
|
use futures::{Stream, StreamExt};
|
||||||
|
use url::Url;
|
||||||
|
use cave::{
|
||||||
|
activitypub::{
|
||||||
|
self,
|
||||||
|
fetch::authorized_fetch,
|
||||||
|
},
|
||||||
|
config::LoadConfig,
|
||||||
|
feed,
|
||||||
|
};
|
||||||
|
use metrics_util::MetricKindMask;
|
||||||
|
use metrics_exporter_prometheus::PrometheusBuilder;
|
||||||
|
|
||||||
|
mod config;
|
||||||
|
|
||||||
|
pub async fn connect_relay_in(redis_url: Url, in_topic: &str) -> Result<impl Stream<Item = activitypub::Action<serde_json::Value>>, redis::RedisError> {
|
||||||
|
let client = redis::Client::open(redis_url)?;
|
||||||
|
let mut pubsub_conn = client.get_async_connection()
|
||||||
|
.await?
|
||||||
|
.into_pubsub();
|
||||||
|
|
||||||
|
pubsub_conn.subscribe(in_topic)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(pubsub_conn.into_on_message().filter_map(|msg| async move {
|
||||||
|
let data = msg.get_payload::<String>().ok()?;
|
||||||
|
let json = serde_json::from_str(&data)
|
||||||
|
.ok()?;
|
||||||
|
Some(json)
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
const KEY_ID: &str = "https://relay.fedi.buzz/instance/fedi.buzz#key";
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() {
|
||||||
|
cave::init::exit_on_panic();
|
||||||
|
cave::init::init_logger(5557);
|
||||||
|
|
||||||
|
let config = config::Config::load();
|
||||||
|
|
||||||
|
PrometheusBuilder::new()
|
||||||
|
.with_http_listener(([0; 8], config.prometheus_port))
|
||||||
|
.add_global_label("application", env!("CARGO_PKG_NAME"))
|
||||||
|
.idle_timeout(MetricKindMask::ALL, Some(Duration::from_secs(600)))
|
||||||
|
.install()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let priv_key = Arc::new(config.priv_key());
|
||||||
|
let relay_in = connect_relay_in(config.redis_url(), &config.in_topic)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let client = reqwest::Client::builder()
|
||||||
|
.timeout(Duration::from_secs(5))
|
||||||
|
.user_agent(concat!(
|
||||||
|
env!("CARGO_PKG_NAME"),
|
||||||
|
"/",
|
||||||
|
env!("CARGO_PKG_VERSION"),
|
||||||
|
))
|
||||||
|
.pool_max_idle_per_host(1)
|
||||||
|
.pool_idle_timeout(Some(Duration::from_secs(5)))
|
||||||
|
.build()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let store = cave::store::Store::new(1, config.redis, config.redis_password_file).await;
|
||||||
|
|
||||||
|
let posts_cache = cave::posts_cache::PostsCache::new(65536);
|
||||||
|
let allowed_action_types: Arc<HashSet<String>> = Arc::new([
|
||||||
|
"Create", "Announce", "Update",
|
||||||
|
].into_iter().map(|s| s.to_string()).collect());
|
||||||
|
|
||||||
|
tracing::info!("ready");
|
||||||
|
cave::systemd::ready();
|
||||||
|
|
||||||
|
relay_in.for_each(|action| async {
|
||||||
|
// Filter by action type
|
||||||
|
if ! allowed_action_types.contains(&action.action_type) {
|
||||||
|
metrics::counter!("sieve_activity", 1, "type" => "type_ignored");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let posts_cache = posts_cache.clone();
|
||||||
|
let client = client.clone();
|
||||||
|
let mut store = store.clone();
|
||||||
|
let priv_key = priv_key.clone();
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
// Avoid duplicate work
|
||||||
|
let id = if let Some(id) = action.object_id() {
|
||||||
|
let id = id.to_string();
|
||||||
|
if posts_cache.insert(id.clone()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
id
|
||||||
|
} else {
|
||||||
|
// no object id
|
||||||
|
tracing::warn!("No object id in action: {:?}", action);
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
let object = if action.object.is_object() {
|
||||||
|
// all good, can be forwarded
|
||||||
|
action.object
|
||||||
|
} else if let Some(id) = action.object.as_str() {
|
||||||
|
// fetch info for id
|
||||||
|
tracing::debug!("GET {id}");
|
||||||
|
match authorized_fetch(&client, id, KEY_ID, &priv_key).await {
|
||||||
|
Ok(res) => {
|
||||||
|
metrics::counter!("sieve_activity", 1, "type" => "fetch_object_ok");
|
||||||
|
res
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!("{id} HTTP: {e:?}");
|
||||||
|
metrics::counter!("sieve_activity", 1, "type" => "fetch_object_error");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
tracing::warn!("Invalid object {id} in action: {:?}", action);
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
let post: activitypub::Post = match serde_json::from_value(object) {
|
||||||
|
Ok(post) =>
|
||||||
|
post,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!("JSON of {id}: {e:?}");
|
||||||
|
metrics::counter!("sieve_activity", 1, "type" => "json_error");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let author: activitypub::Actor = if let Some(author_url) = &post.attributed_to {
|
||||||
|
match authorized_fetch(&client, author_url, KEY_ID, &priv_key).await {
|
||||||
|
Ok(author) => {
|
||||||
|
metrics::counter!("sieve_activity", 1, "type" => "fetch_author_ok");
|
||||||
|
author
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!("{author_url} HTTP: {e:?}");
|
||||||
|
metrics::counter!("sieve_activity", 1, "type" => "fetch_author_error");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
tracing::error!("No attributedTo in {id}");
|
||||||
|
metrics::counter!("sieve_activity", 1, "type" => "no_author");
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
// Translate ActivityPub post to Mastodon client API post format
|
||||||
|
let feed_post = post.to_feed_post(author);
|
||||||
|
let event_type = if action.action_type == "Update" {
|
||||||
|
// Translate more weird Mastodon client API naming
|
||||||
|
"status.update"
|
||||||
|
} else {
|
||||||
|
"update"
|
||||||
|
}.to_string();
|
||||||
|
let encodable_post = if let Ok(post) = feed::EncodablePost::from_post(event_type, feed_post) {
|
||||||
|
post
|
||||||
|
} else {
|
||||||
|
tracing::error!("Cannot serialize post {id}");
|
||||||
|
metrics::counter!("sieve_activity", 1, "type" => "serialize_error");
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
match store.save_post(encodable_post).await {
|
||||||
|
Ok(true) => {
|
||||||
|
tracing::info!("Post was new: {id}");
|
||||||
|
metrics::counter!("sieve_activity", 1, "type" => "post_new");
|
||||||
|
cave::systemd::watchdog();
|
||||||
|
}
|
||||||
|
Ok(false) => {
|
||||||
|
tracing::info!("Post was already known: {id}");
|
||||||
|
metrics::counter!("sieve_activity", 1, "type" => "post_known");
|
||||||
|
cave::systemd::watchdog();
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!("Error forwarding post {id}: {e:?}");
|
||||||
|
metrics::counter!("sieve_activity", 1, "type" => "post_error");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}).await;
|
||||||
|
}
|
Loading…
Reference in New Issue