diff --git a/Cargo.lock b/Cargo.lock index 7a7dfec..be3c8eb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -332,9 +332,8 @@ checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec" name = "buzzback" version = "0.1.0" dependencies = [ + "cave", "http", - "http_digest_headers", - "httpdate", "reqwest", "serde", "serde_json", @@ -368,6 +367,9 @@ dependencies = [ "chrono", "eventsource-stream", "futures", + "http", + "http_digest_headers", + "httpdate", "inotify", "metrics", "redis", @@ -375,7 +377,9 @@ dependencies = [ "serde", "serde_json", "serde_yaml", + "sigh", "systemd", + "thiserror", "tokio", "tokio-postgres", "tracing", @@ -450,6 +454,23 @@ dependencies = [ "urlencoding 1.3.3", ] +[[package]] +name = "caveman-sieve" +version = "0.0.0" +dependencies = [ + "cave", + "futures", + "http", + "redis", + "reqwest", + "serde", + "serde_json", + "sigh", + "tokio", + "tracing", + "url", +] + [[package]] name = "caveman-smokestack" version = "0.0.0" diff --git a/buzzback/Cargo.toml b/buzzback/Cargo.toml index a9daee6..3e0fa00 100644 --- a/buzzback/Cargo.toml +++ b/buzzback/Cargo.toml @@ -12,10 +12,9 @@ serde = { version = "1", features = ["serde_derive"] } serde_json = "1" serde_yaml = "0.9" reqwest = { version = "0.11", features = ["json", "stream"] } -sigh = "1.0" -http_digest_headers = { version="0.1.0", default-features = false, features = ["use_openssl"] } +sigh = "1" thiserror = "1" http = "0.2" tokio-postgres = "0.7" urlencoding = "2" -httpdate = "1" +cave = { path = "../cave" } diff --git a/buzzback/src/main.rs b/buzzback/src/main.rs index d06575e..ca0e709 100644 --- a/buzzback/src/main.rs +++ b/buzzback/src/main.rs @@ -2,12 +2,12 @@ use sigh::PrivateKey; use std::{sync::Arc, time::Duration}; use std::{panic, process}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; +use cave::activitypub::send; mod error; mod config; mod db; mod activitypub; -mod send; async fn follow_back( client: &reqwest::Client, hostname: &str, diff --git a/cave/Cargo.toml b/cave/Cargo.toml index 21086c5..964b550 100644 --- a/cave/Cargo.toml +++ b/cave/Cargo.toml @@ -22,3 +22,8 @@ inotify = "0.10" tokio-postgres = "0.7" url = "2" metrics = "0.20" +sigh = "1" +http_digest_headers = { version="0.1.0", default-features = false, features = ["use_openssl"] } +http = "0.2" +thiserror = "1" +httpdate = "1" diff --git a/cave/src/activitypub/digest.rs b/cave/src/activitypub/digest.rs new file mode 100644 index 0000000..32d6eab --- /dev/null +++ b/cave/src/activitypub/digest.rs @@ -0,0 +1,20 @@ +use http_digest_headers::{DigestHeader, DigestMethod}; + +pub fn generate_header(body: &[u8]) -> Result { + let mut digest_header = DigestHeader::new() + .with_method(DigestMethod::SHA256, body) + .map(|h| format!("{}", h)) + .map_err(|_| ())?; + + // mastodon expects uppercase algo name + if digest_header.starts_with("sha-") { + digest_header.replace_range(..4, "SHA-"); + } + // mastodon uses base64::alphabet::STANDARD, not base64::alphabet::URL_SAFE + digest_header.replace_range( + 7.., + &digest_header[7..].replace('-', "+").replace('_', "/") + ); + + Ok(digest_header) +} diff --git a/cave/src/activitypub/error.rs b/cave/src/activitypub/error.rs new file mode 100644 index 0000000..6980939 --- /dev/null +++ b/cave/src/activitypub/error.rs @@ -0,0 +1,19 @@ +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("HTTP Digest generation error")] + Digest, + #[error("JSON encoding error")] + Json(#[from] serde_json::Error), + #[error("Signature error")] + Signature(#[from] sigh::Error), + #[error("Signature verification failure")] + SignatureFail, + #[error("HTTP request error")] + HttpReq(#[from] http::Error), + #[error("HTTP client error")] + Http(#[from] reqwest::Error), + #[error("Invalid URI")] + InvalidUri, + #[error("Error response from remote")] + Response(String), +} diff --git a/cave/src/activitypub/fetch.rs b/cave/src/activitypub/fetch.rs new file mode 100644 index 0000000..1211e81 --- /dev/null +++ b/cave/src/activitypub/fetch.rs @@ -0,0 +1,39 @@ +use std::time::SystemTime; +use http::StatusCode; +use serde::de::DeserializeOwned; +use sigh::{PrivateKey, SigningConfig, alg::RsaSha256}; +use super::{digest, error::Error}; + +pub async fn authorized_fetch( + client: &reqwest::Client, + uri: &str, + key_id: &str, + private_key: &PrivateKey, +) -> Result +where + T: DeserializeOwned, +{ + let url = reqwest::Url::parse(uri) + .map_err(|_| Error::InvalidUri)?; + let host = format!("{}", url.host().ok_or(Error::InvalidUri)?); + let digest_header = digest::generate_header(&[]) + .expect("digest::generate_header"); + let mut req = http::Request::builder() + .uri(uri) + .header("host", &host) + .header("content-type", "application/activity+json") + .header("date", httpdate::fmt_http_date(SystemTime::now())) + .header("accept", "application/activity+json") + .header("digest", digest_header) + .body(vec![])?; + SigningConfig::new(RsaSha256, private_key, key_id) + .sign(&mut req)?; + let req: reqwest::Request = req.try_into()?; + let res = client.execute(req) + .await?; + if res.status() >= StatusCode::OK && res.status() < StatusCode::MULTIPLE_CHOICES { + Ok(res.json().await?) + } else { + Err(Error::Response(res.text().await?)) + } +} diff --git a/cave/src/activitypub/mod.rs b/cave/src/activitypub/mod.rs new file mode 100644 index 0000000..774fa1e --- /dev/null +++ b/cave/src/activitypub/mod.rs @@ -0,0 +1,5 @@ +pub mod digest; +pub mod send; +pub mod fetch; +mod error; +pub use error::Error; diff --git a/buzzback/src/send.rs b/cave/src/activitypub/send.rs similarity index 68% rename from buzzback/src/send.rs rename to cave/src/activitypub/send.rs index 5649bc8..c383558 100644 --- a/buzzback/src/send.rs +++ b/cave/src/activitypub/send.rs @@ -1,12 +1,12 @@ use std::{ sync::Arc, - time::SystemTime, + time::{Instant, SystemTime}, }; use http::StatusCode; +use metrics::histogram; use serde::Serialize; use sigh::{PrivateKey, SigningConfig, alg::RsaSha256}; -use http_digest_headers::{DigestHeader, DigestMethod}; -use crate::error::Error; +use super::{digest, error::Error}; pub async fn send( client: &reqwest::Client, @@ -22,26 +22,6 @@ pub async fn send( send_raw(client, uri, key_id, private_key, body).await } - -pub fn generate_digest_header(body: &[u8]) -> Result { - let mut digest_header = DigestHeader::new() - .with_method(DigestMethod::SHA256, body) - .map(|h| format!("{}", h)) - .map_err(|_| ())?; - - // mastodon expects uppercase algo name - if digest_header.starts_with("sha-") { - digest_header.replace_range(..4, "SHA-"); - } - // mastodon uses base64::alphabet::STANDARD, not base64::alphabet::URL_SAFE - digest_header.replace_range( - 7.., - &digest_header[7..].replace('-', "+").replace('_', "/") - ); - - Ok(digest_header) -} - pub async fn send_raw( client: &reqwest::Client, uri: &str, @@ -52,7 +32,7 @@ pub async fn send_raw( let url = reqwest::Url::parse(uri) .map_err(|_| Error::InvalidUri)?; let host = format!("{}", url.host().ok_or(Error::InvalidUri)?); - let digest_header = generate_digest_header(&body) + let digest_header = digest::generate_header(&body) .map_err(|()| Error::Digest)?; let mut req = http::Request::builder() .method("POST") @@ -63,14 +43,20 @@ pub async fn send_raw( .header("digest", digest_header) .body(body.as_ref().clone()) .map_err(Error::HttpReq)?; + let t1 = Instant::now(); SigningConfig::new(RsaSha256, private_key, key_id) .sign(&mut req)?; + let t2 = Instant::now(); let req: reqwest::Request = req.try_into()?; let res = client.execute(req) .await?; + let t3 = Instant::now(); + histogram!("relay_http_request_duration", t2 - t1); if res.status() >= StatusCode::OK && res.status() < StatusCode::MULTIPLE_CHOICES { + histogram!("relay_http_response_duration", t3 - t2, "res" => "ok", "host" => host); Ok(()) } else { + histogram!("relay_http_response_duration", t3 - t2, "res" => "err", "host" => host); tracing::error!("send_raw {} response HTTP {}", url, res.status()); let response = res.text().await?; Err(Error::Response(response)) diff --git a/cave/src/lib.rs b/cave/src/lib.rs index b730b7d..70f65f8 100644 --- a/cave/src/lib.rs +++ b/cave/src/lib.rs @@ -9,6 +9,7 @@ pub mod live_file; pub mod word_list; pub mod db; pub mod posts_cache; +pub mod activitypub; pub const PERIODS: &[u64] = &[4, 24, 7 * 24];