cave/activitypub: move send, error from buzzback

This commit is contained in:
Astro 2023-10-15 20:30:15 +02:00
parent 8f56f53200
commit 1cb80c01e7
10 changed files with 125 additions and 30 deletions

25
Cargo.lock generated
View File

@ -332,9 +332,8 @@ checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec"
name = "buzzback" name = "buzzback"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"cave",
"http", "http",
"http_digest_headers",
"httpdate",
"reqwest", "reqwest",
"serde", "serde",
"serde_json", "serde_json",
@ -368,6 +367,9 @@ dependencies = [
"chrono", "chrono",
"eventsource-stream", "eventsource-stream",
"futures", "futures",
"http",
"http_digest_headers",
"httpdate",
"inotify", "inotify",
"metrics", "metrics",
"redis", "redis",
@ -375,7 +377,9 @@ dependencies = [
"serde", "serde",
"serde_json", "serde_json",
"serde_yaml", "serde_yaml",
"sigh",
"systemd", "systemd",
"thiserror",
"tokio", "tokio",
"tokio-postgres", "tokio-postgres",
"tracing", "tracing",
@ -450,6 +454,23 @@ dependencies = [
"urlencoding 1.3.3", "urlencoding 1.3.3",
] ]
[[package]]
name = "caveman-sieve"
version = "0.0.0"
dependencies = [
"cave",
"futures",
"http",
"redis",
"reqwest",
"serde",
"serde_json",
"sigh",
"tokio",
"tracing",
"url",
]
[[package]] [[package]]
name = "caveman-smokestack" name = "caveman-smokestack"
version = "0.0.0" version = "0.0.0"

View File

@ -12,10 +12,9 @@ serde = { version = "1", features = ["serde_derive"] }
serde_json = "1" serde_json = "1"
serde_yaml = "0.9" serde_yaml = "0.9"
reqwest = { version = "0.11", features = ["json", "stream"] } reqwest = { version = "0.11", features = ["json", "stream"] }
sigh = "1.0" sigh = "1"
http_digest_headers = { version="0.1.0", default-features = false, features = ["use_openssl"] }
thiserror = "1" thiserror = "1"
http = "0.2" http = "0.2"
tokio-postgres = "0.7" tokio-postgres = "0.7"
urlencoding = "2" urlencoding = "2"
httpdate = "1" cave = { path = "../cave" }

View File

@ -2,12 +2,12 @@ use sigh::PrivateKey;
use std::{sync::Arc, time::Duration}; use std::{sync::Arc, time::Duration};
use std::{panic, process}; use std::{panic, process};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use cave::activitypub::send;
mod error; mod error;
mod config; mod config;
mod db; mod db;
mod activitypub; mod activitypub;
mod send;
async fn follow_back( async fn follow_back(
client: &reqwest::Client, hostname: &str, client: &reqwest::Client, hostname: &str,

View File

@ -22,3 +22,8 @@ inotify = "0.10"
tokio-postgres = "0.7" tokio-postgres = "0.7"
url = "2" url = "2"
metrics = "0.20" metrics = "0.20"
sigh = "1"
http_digest_headers = { version="0.1.0", default-features = false, features = ["use_openssl"] }
http = "0.2"
thiserror = "1"
httpdate = "1"

View File

@ -0,0 +1,20 @@
use http_digest_headers::{DigestHeader, DigestMethod};
pub fn generate_header(body: &[u8]) -> Result<String, ()> {
let mut digest_header = DigestHeader::new()
.with_method(DigestMethod::SHA256, body)
.map(|h| format!("{}", h))
.map_err(|_| ())?;
// mastodon expects uppercase algo name
if digest_header.starts_with("sha-") {
digest_header.replace_range(..4, "SHA-");
}
// mastodon uses base64::alphabet::STANDARD, not base64::alphabet::URL_SAFE
digest_header.replace_range(
7..,
&digest_header[7..].replace('-', "+").replace('_', "/")
);
Ok(digest_header)
}

View File

@ -0,0 +1,19 @@
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("HTTP Digest generation error")]
Digest,
#[error("JSON encoding error")]
Json(#[from] serde_json::Error),
#[error("Signature error")]
Signature(#[from] sigh::Error),
#[error("Signature verification failure")]
SignatureFail,
#[error("HTTP request error")]
HttpReq(#[from] http::Error),
#[error("HTTP client error")]
Http(#[from] reqwest::Error),
#[error("Invalid URI")]
InvalidUri,
#[error("Error response from remote")]
Response(String),
}

View File

@ -0,0 +1,39 @@
use std::time::SystemTime;
use http::StatusCode;
use serde::de::DeserializeOwned;
use sigh::{PrivateKey, SigningConfig, alg::RsaSha256};
use super::{digest, error::Error};
pub async fn authorized_fetch<T>(
client: &reqwest::Client,
uri: &str,
key_id: &str,
private_key: &PrivateKey,
) -> Result<T, Error>
where
T: DeserializeOwned,
{
let url = reqwest::Url::parse(uri)
.map_err(|_| Error::InvalidUri)?;
let host = format!("{}", url.host().ok_or(Error::InvalidUri)?);
let digest_header = digest::generate_header(&[])
.expect("digest::generate_header");
let mut req = http::Request::builder()
.uri(uri)
.header("host", &host)
.header("content-type", "application/activity+json")
.header("date", httpdate::fmt_http_date(SystemTime::now()))
.header("accept", "application/activity+json")
.header("digest", digest_header)
.body(vec![])?;
SigningConfig::new(RsaSha256, private_key, key_id)
.sign(&mut req)?;
let req: reqwest::Request = req.try_into()?;
let res = client.execute(req)
.await?;
if res.status() >= StatusCode::OK && res.status() < StatusCode::MULTIPLE_CHOICES {
Ok(res.json().await?)
} else {
Err(Error::Response(res.text().await?))
}
}

View File

@ -0,0 +1,5 @@
pub mod digest;
pub mod send;
pub mod fetch;
mod error;
pub use error::Error;

View File

@ -1,12 +1,12 @@
use std::{ use std::{
sync::Arc, sync::Arc,
time::SystemTime, time::{Instant, SystemTime},
}; };
use http::StatusCode; use http::StatusCode;
use metrics::histogram;
use serde::Serialize; use serde::Serialize;
use sigh::{PrivateKey, SigningConfig, alg::RsaSha256}; use sigh::{PrivateKey, SigningConfig, alg::RsaSha256};
use http_digest_headers::{DigestHeader, DigestMethod}; use super::{digest, error::Error};
use crate::error::Error;
pub async fn send<T: Serialize>( pub async fn send<T: Serialize>(
client: &reqwest::Client, client: &reqwest::Client,
@ -22,26 +22,6 @@ pub async fn send<T: Serialize>(
send_raw(client, uri, key_id, private_key, body).await send_raw(client, uri, key_id, private_key, body).await
} }
pub fn generate_digest_header(body: &[u8]) -> Result<String, ()> {
let mut digest_header = DigestHeader::new()
.with_method(DigestMethod::SHA256, body)
.map(|h| format!("{}", h))
.map_err(|_| ())?;
// mastodon expects uppercase algo name
if digest_header.starts_with("sha-") {
digest_header.replace_range(..4, "SHA-");
}
// mastodon uses base64::alphabet::STANDARD, not base64::alphabet::URL_SAFE
digest_header.replace_range(
7..,
&digest_header[7..].replace('-', "+").replace('_', "/")
);
Ok(digest_header)
}
pub async fn send_raw( pub async fn send_raw(
client: &reqwest::Client, client: &reqwest::Client,
uri: &str, uri: &str,
@ -52,7 +32,7 @@ pub async fn send_raw(
let url = reqwest::Url::parse(uri) let url = reqwest::Url::parse(uri)
.map_err(|_| Error::InvalidUri)?; .map_err(|_| Error::InvalidUri)?;
let host = format!("{}", url.host().ok_or(Error::InvalidUri)?); let host = format!("{}", url.host().ok_or(Error::InvalidUri)?);
let digest_header = generate_digest_header(&body) let digest_header = digest::generate_header(&body)
.map_err(|()| Error::Digest)?; .map_err(|()| Error::Digest)?;
let mut req = http::Request::builder() let mut req = http::Request::builder()
.method("POST") .method("POST")
@ -63,14 +43,20 @@ pub async fn send_raw(
.header("digest", digest_header) .header("digest", digest_header)
.body(body.as_ref().clone()) .body(body.as_ref().clone())
.map_err(Error::HttpReq)?; .map_err(Error::HttpReq)?;
let t1 = Instant::now();
SigningConfig::new(RsaSha256, private_key, key_id) SigningConfig::new(RsaSha256, private_key, key_id)
.sign(&mut req)?; .sign(&mut req)?;
let t2 = Instant::now();
let req: reqwest::Request = req.try_into()?; let req: reqwest::Request = req.try_into()?;
let res = client.execute(req) let res = client.execute(req)
.await?; .await?;
let t3 = Instant::now();
histogram!("relay_http_request_duration", t2 - t1);
if res.status() >= StatusCode::OK && res.status() < StatusCode::MULTIPLE_CHOICES { if res.status() >= StatusCode::OK && res.status() < StatusCode::MULTIPLE_CHOICES {
histogram!("relay_http_response_duration", t3 - t2, "res" => "ok", "host" => host);
Ok(()) Ok(())
} else { } else {
histogram!("relay_http_response_duration", t3 - t2, "res" => "err", "host" => host);
tracing::error!("send_raw {} response HTTP {}", url, res.status()); tracing::error!("send_raw {} response HTTP {}", url, res.status());
let response = res.text().await?; let response = res.text().await?;
Err(Error::Response(response)) Err(Error::Response(response))

View File

@ -9,6 +9,7 @@ pub mod live_file;
pub mod word_list; pub mod word_list;
pub mod db; pub mod db;
pub mod posts_cache; pub mod posts_cache;
pub mod activitypub;
pub const PERIODS: &[u64] = &[4, 24, 7 * 24]; pub const PERIODS: &[u64] = &[4, 24, 7 * 24];