Compare commits

...

7 Commits

23 changed files with 559 additions and 71 deletions

29
Cargo.lock generated
View File

@ -332,9 +332,8 @@ checksum = "7f30e7476521f6f8af1a1c4c0b8cc94f0bee37d91763d0ca2665f299b6cd8aec"
name = "buzzback"
version = "0.1.0"
dependencies = [
"cave",
"http",
"http_digest_headers",
"httpdate",
"reqwest",
"serde",
"serde_json",
@ -368,13 +367,19 @@ dependencies = [
"chrono",
"eventsource-stream",
"futures",
"http",
"http_digest_headers",
"httpdate",
"inotify",
"metrics",
"redis",
"reqwest",
"serde",
"serde_json",
"serde_yaml",
"sigh",
"systemd",
"thiserror",
"tokio",
"tokio-postgres",
"tracing",
@ -449,6 +454,26 @@ dependencies = [
"urlencoding 1.3.3",
]
[[package]]
name = "caveman-sieve"
version = "0.0.0"
dependencies = [
"cave",
"futures",
"http",
"metrics",
"metrics-exporter-prometheus",
"metrics-util",
"redis",
"reqwest",
"serde",
"serde_json",
"sigh",
"tokio",
"tracing",
"url",
]
[[package]]
name = "caveman-smokestack"
version = "0.0.0"

View File

@ -6,6 +6,7 @@ members = [
"gatherer",
"smokestack",
"buzzback",
"sieve",
]
resolver = "2"

View File

@ -12,10 +12,9 @@ serde = { version = "1", features = ["serde_derive"] }
serde_json = "1"
serde_yaml = "0.9"
reqwest = { version = "0.11", features = ["json", "stream"] }
sigh = "1.0"
http_digest_headers = { version="0.1.0", default-features = false, features = ["use_openssl"] }
sigh = "1"
thiserror = "1"
http = "0.2"
tokio-postgres = "0.7"
urlencoding = "2"
httpdate = "1"
cave = { path = "../cave" }

View File

@ -1,48 +0,0 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Actor {
#[serde(rename = "@context")]
pub jsonld_context: serde_json::Value,
#[serde(rename = "type")]
pub actor_type: String,
pub id: String,
pub name: Option<String>,
pub icon: Option<Media>,
pub inbox: String,
pub outbox: String,
#[serde(rename = "publicKey")]
pub public_key: ActorPublicKey,
#[serde(rename = "preferredUsername")]
pub preferred_username: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ActorPublicKey {
pub id: String,
pub owner: Option<String>,
#[serde(rename = "publicKeyPem")]
pub pem: String,
}
/// `ActivityPub` "activity"
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Action<O> {
#[serde(rename = "@context")]
pub jsonld_context: serde_json::Value,
#[serde(rename = "type")]
pub action_type: String,
pub id: String,
pub actor: String,
pub to: Option<serde_json::Value>,
pub object: Option<O>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Media {
#[serde(rename = "type")]
pub media_type: String,
#[serde(rename = "mediaType")]
pub content_type: String,
pub url: String,
}

View File

@ -1,7 +1,5 @@
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("HTTP Digest generation error")]
Digest,
#[error("JSON encoding error")]
Json(#[from] serde_json::Error),
#[error("Signature error")]
@ -10,8 +8,4 @@ pub enum Error {
HttpReq(#[from] http::Error),
#[error("HTTP client error")]
Http(#[from] reqwest::Error),
#[error("Invalid URI")]
InvalidUri,
#[error("Error response from remote")]
Response(String),
}

View File

@ -2,13 +2,11 @@ use sigh::PrivateKey;
use std::{sync::Arc, time::Duration};
use std::{panic, process};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use cave::activitypub;
mod error;
mod config;
mod db;
mod activitypub;
mod digest;
mod send;
async fn follow_back(
client: &reqwest::Client, hostname: &str,
@ -22,7 +20,7 @@ async fn follow_back(
urlencoding::encode(&follow.actor),
);
let action = activitypub::Action {
jsonld_context: serde_json::Value::String("https://www.w3.org/ns/activitystreams".to_string()),
jsonld_context: Some(serde_json::Value::String("https://www.w3.org/ns/activitystreams".to_string())),
action_type: "Follow".to_string(),
id: follow_id,
to: None,
@ -30,7 +28,7 @@ async fn follow_back(
object: Some(&follow.id),
};
let key_id = format!("{}#key", follow.actor);
let result = send::send(
let result = activitypub::send::send(
client, &follow.inbox,
&key_id, &priv_key,
&action,

View File

@ -21,3 +21,9 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] }
inotify = "0.10"
tokio-postgres = "0.7"
url = "2"
metrics = "0.20"
sigh = "1"
http_digest_headers = { version="0.1.0", default-features = false, features = ["use_openssl"] }
http = "0.2"
thiserror = "1"
httpdate = "1"

View File

@ -0,0 +1,19 @@
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("HTTP Digest generation error")]
Digest,
#[error("JSON encoding error")]
Json(#[from] serde_json::Error),
#[error("Signature error")]
Signature(#[from] sigh::Error),
#[error("Signature verification failure")]
SignatureFail,
#[error("HTTP request error")]
HttpReq(#[from] http::Error),
#[error("HTTP client error")]
Http(#[from] reqwest::Error),
#[error("Invalid URI")]
InvalidUri,
#[error("Error response from remote")]
Response(String),
}

View File

@ -0,0 +1,39 @@
use std::time::SystemTime;
use http::StatusCode;
use serde::de::DeserializeOwned;
use sigh::{PrivateKey, SigningConfig, alg::RsaSha256};
use super::{digest, error::Error};
pub async fn authorized_fetch<T>(
client: &reqwest::Client,
uri: &str,
key_id: &str,
private_key: &PrivateKey,
) -> Result<T, Error>
where
T: DeserializeOwned,
{
let url = reqwest::Url::parse(uri)
.map_err(|_| Error::InvalidUri)?;
let host = format!("{}", url.host().ok_or(Error::InvalidUri)?);
let digest_header = digest::generate_header(&[])
.expect("digest::generate_header");
let mut req = http::Request::builder()
.uri(uri)
.header("host", &host)
.header("content-type", "application/activity+json")
.header("date", httpdate::fmt_http_date(SystemTime::now()))
.header("accept", "application/activity+json, application/ld+json")
.header("digest", digest_header)
.body(vec![])?;
SigningConfig::new(RsaSha256, private_key, key_id)
.sign(&mut req)?;
let req: reqwest::Request = req.try_into()?;
let res = client.execute(req)
.await?;
if res.status() >= StatusCode::OK && res.status() < StatusCode::MULTIPLE_CHOICES {
Ok(res.json().await?)
} else {
Err(Error::Response(res.text().await?))
}
}

128
cave/src/activitypub/mod.rs Normal file
View File

@ -0,0 +1,128 @@
pub mod digest;
pub mod send;
pub mod fetch;
mod error;
pub use error::Error;
use std::collections::HashMap;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Actor {
#[serde(rename = "@context")]
pub jsonld_context: Option<serde_json::Value>,
#[serde(rename = "type")]
pub actor_type: String,
pub id: String,
pub url: Option<String>,
pub name: String,
pub icon: Option<Media>,
pub inbox: String,
pub outbox: String,
#[serde(rename = "publicKey")]
pub public_key: ActorPublicKey,
#[serde(rename = "preferredUsername")]
pub preferred_username: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ActorPublicKey {
pub id: String,
pub owner: Option<String>,
#[serde(rename = "publicKeyPem")]
pub pem: String,
}
/// `ActivityPub` "activity"
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Action<O> {
#[serde(rename = "@context")]
pub jsonld_context: Option<serde_json::Value>,
#[serde(rename = "type")]
pub action_type: String,
pub id: String,
pub actor: String,
pub to: Option<serde_json::Value>,
pub object: O,
}
impl Action<serde_json::Value> {
pub fn object_id(&self) -> Option<&str> {
if let Some(id) = self.object.as_str() {
Some(id)
} else if let Some(object) = self.object.as_object() {
object.get("id").and_then(|id| id.as_str())
} else {
None
}
}
}
// hack for misskey
fn media_default_content_type() -> String {
"image/jpeg".to_string()
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Media {
#[serde(rename = "type")]
pub media_type: String,
#[serde(rename = "mediaType", default = "media_default_content_type")]
pub content_type: String,
pub url: String,
}
/// This is unfortunately strictly separate from the Client/Server
/// data schema in `cave::feed::Post`
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Post {
pub published: String,
pub id: String,
pub url: Option<String>,
#[serde(rename = "attributedTo")]
pub attributed_to: Option<String>,
#[serde(default = "String::new")]
pub content: String,
#[serde(rename = "contentMap", default = "HashMap::new")]
pub content_map: HashMap<String, String>,
#[serde(default)]
pub tag: Vec<super::feed::Tag>,
pub sensitive: Option<bool>,
#[serde(rename = "attachment", default = "Vec::new")]
pub attachments: Vec<Media>,
}
impl Post {
pub fn language(&self) -> Option<&str> {
self.content_map.keys()
.next().map(|s| s.as_str())
}
/// Translate ActivityPub post to Mastodon client API post format
pub fn to_feed_post(self, actor: Actor) -> super::feed::Post {
let language = self.language()
.map(|s| s.to_string());
super::feed::Post {
created_at: self.published,
uri: self.url.unwrap_or(self.id),
content: self.content,
account: super::feed::Account {
username: actor.preferred_username,
display_name: actor.name,
url: actor.url.unwrap_or(actor.id),
bot: actor.actor_type != "Person",
},
tags: self.tag,
sensitive: self.sensitive,
mentions: vec![],
language,
media_attachments: self.attachments.into_iter().map(|media| {
super::feed::MediaAttachment {
media_type: media.media_type,
remote_url: Some(media.url),
}
}).collect(),
reblog: None,
}
}
}

View File

@ -1,11 +1,12 @@
use std::{
sync::Arc,
time::SystemTime,
time::{Instant, SystemTime},
};
use http::StatusCode;
use metrics::histogram;
use serde::Serialize;
use sigh::{PrivateKey, SigningConfig, alg::RsaSha256};
use crate::{digest, error::Error};
use super::{digest, error::Error};
pub async fn send<T: Serialize>(
client: &reqwest::Client,
@ -42,14 +43,20 @@ pub async fn send_raw(
.header("digest", digest_header)
.body(body.as_ref().clone())
.map_err(Error::HttpReq)?;
let t1 = Instant::now();
SigningConfig::new(RsaSha256, private_key, key_id)
.sign(&mut req)?;
let t2 = Instant::now();
let req: reqwest::Request = req.try_into()?;
let res = client.execute(req)
.await?;
let t3 = Instant::now();
histogram!("relay_http_request_duration", t2 - t1);
if res.status() >= StatusCode::OK && res.status() < StatusCode::MULTIPLE_CHOICES {
histogram!("relay_http_response_duration", t3 - t2, "res" => "ok", "host" => host);
Ok(())
} else {
histogram!("relay_http_response_duration", t3 - t2, "res" => "err", "host" => host);
tracing::error!("send_raw {} response HTTP {}", url, res.status());
let response = res.text().await?;
Err(Error::Response(response))

View File

@ -70,7 +70,7 @@ pub struct Post {
pub account: Account,
#[serde(default)]
pub tags: Vec<Tag>,
pub application: Option<Application>,
// pub application: Option<Application>,
pub sensitive: Option<bool>,
#[serde(default)]
pub mentions: Vec<Mention>,
@ -147,6 +147,7 @@ enum EncodedPost {
Stolen,
}
// TODO: eliminate
/// Wraps a `Post` along with a serializable form that is most close
/// to the original incoming data
#[derive(Debug)]
@ -164,6 +165,15 @@ impl Deref for EncodablePost {
}
impl EncodablePost {
pub fn from_post(event_type: String, post: Post) -> Result<Self, serde_json::Error> {
let bytes = serde_json::to_vec(&post)?;
Ok(EncodablePost {
event_type,
post,
encoded: EncodedPost::Bytes(bytes),
})
}
pub fn from_value(event_type: String, value: serde_json::Value) -> Result<Self, serde_json::Error> {
let post = serde_json::from_value(value.clone())?;
Ok(EncodablePost {

View File

@ -8,6 +8,8 @@ pub mod firehose;
pub mod live_file;
pub mod word_list;
pub mod db;
pub mod posts_cache;
pub mod activitypub;
pub const PERIODS: &[u64] = &[4, 24, 7 * 24];

View File

@ -11,7 +11,8 @@
overlay = final: prev: {
inherit (self.packages.${prev.system})
caveman-hunter caveman-butcher
caveman-gatherer caveman-smokestack;
caveman-gatherer caveman-sieve
caveman-smokestack;
};
nixosModule = self.nixosModules.caveman;
@ -100,6 +101,14 @@
cp -rv gatherer/{templates,assets} $out/share/caveman/gatherer/
'';
};
packages.caveman-sieve = naersk-lib.buildPackage rec {
pname = "caveman-sieve";
version = self.lastModifiedDate;
inherit src;
targets = [ pname ];
nativeBuildInputs = with pkgs; [ pkg-config ];
buildInputs = with pkgs; [ openssl systemd ];
};
packages.caveman-smokestack = naersk-lib.buildPackage rec {
pname = "caveman-smokestack";
version = self.lastModifiedDate;

View File

@ -9,7 +9,6 @@ mod config;
mod block_list;
mod scheduler;
mod worker;
mod posts_cache;
mod webfinger;
use scheduler::InstanceHost;
@ -42,7 +41,7 @@ async fn run() {
let db = cave::db::Database::connect(&config.database).await;
let mut store = cave::store::Store::new(16, config.redis, config.redis_password_file).await;
let posts_cache = posts_cache::PostsCache::new(65536);
let posts_cache = cave::posts_cache::PostsCache::new(65536);
let block_list = block_list::BlockList::new(&config.blocklist).await;

View File

@ -5,11 +5,11 @@ use std::time::{Duration, Instant};
use cave::{
db::Database,
feed::{Feed, EncodablePost, Post, StreamError},
posts_cache::PostsCache,
store::Store,
};
use futures::{StreamExt, future};
use reqwest::StatusCode;
use crate::posts_cache::PostsCache;
use crate::scheduler::{Host, InstanceHost};
use crate::webfinger;

View File

@ -50,6 +50,19 @@ let
builtins.toJSON gathererSettings
);
sieveDefaultSettings = {
redis = "redis://127.0.0.1:${toString cfg.redis.port}/";
redis_password_file = cfg.redis.passwordFile;
in_topic = "relay-in";
prometheus_port = 9102;
};
sieveSettings = lib.recursiveUpdate sieveDefaultSettings cfg.sieve.settings;
sieveConfigFile = builtins.toFile "sieve.yaml" (
builtins.toJSON sieveSettings
);
smokestackDefaultSettings = {
redis = "redis://127.0.0.1:${toString cfg.redis.port}/";
redis_password_file = cfg.redis.passwordFile;
@ -119,6 +132,18 @@ in
default = "DEBUG";
};
sieve.enable = mkEnableOption "caveman sieve";
sieve.settings = mkOption {
type = types.anything;
default = sieveDefaultSettings;
};
sieve.logLevel = mkOption {
type = types.enum [ "ERROR" "WARN" "INFO" "DEBUG" "TRACE" ];
default = "DEBUG";
};
smokestack.enable = mkEnableOption "caveman smokestack";
smokestack.settings = mkOption {
@ -139,6 +164,7 @@ in
networking.firewall.allowedTCPPorts = [
hunterSettings.prometheus_port
sieveSettings.prometheus_port
];
systemd.tmpfiles.rules = [
@ -305,6 +331,34 @@ in
'';
};
systemd.services.caveman-sieve = lib.mkIf cfg.sieve.enable {
wantedBy = [ "multi-user.target" ];
requires = [ "redis-caveman.service" ];
after = [ "redis-caveman.service" "network-online.target" ];
environment.RUST_LOG = "caveman=${cfg.sieve.logLevel}";
serviceConfig = {
ExecStart = "${pkgs.caveman-sieve}/bin/caveman-sieve ${sieveConfigFile}";
Type = "notify";
WatchdogSec = 300;
Restart = "always";
RestartSec = 1;
DynamicUser = true;
User = "caveman-sieve";
ProtectSystem = "strict";
ProtectHome = true;
ProtectHostname = true;
ProtectKernelLogs = true;
ProtectKernelModules = true;
ProtectKernelTunables = true;
RestrictNamespaces = true;
RestrictRealtime = true;
LockPersonality = true;
MemoryDenyWriteExecute = true;
LimitNOFile = limitNOFILE;
LimitRSS = "128M:256M";
};
};
systemd.services.caveman-smokestack = lib.mkIf cfg.smokestack.enable {
wantedBy = [ "multi-user.target" ];
requires = [ "redis-caveman.service" "caveman-hunter.service" ];

21
sieve/Cargo.toml Normal file
View File

@ -0,0 +1,21 @@
[package]
name = "caveman-sieve"
description = "Relay deduplicator and enricher"
version = "0.0.0"
edition = "2021"
[dependencies]
futures = "0.3"
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
tracing = "0.1"
cave = { path = "../cave" }
url = "2"
redis = { version = "0.23", features = ["tokio-comp", "connection-manager"] }
reqwest = { version = "0.11" }
http = "*"
sigh = "*"
metrics = "0.20"
metrics-util = "0.14"
metrics-exporter-prometheus = "0.11"

5
sieve/config.yaml Normal file
View File

@ -0,0 +1,5 @@
redis: "redis://fedi.buzz:6379/"
redis_password_file: "redis_password.txt"
in_topic: "relay-in"
priv_key_file: private-key.pem
prometheus_port: 9102

30
sieve/src/config.rs Normal file
View File

@ -0,0 +1,30 @@
use serde::Deserialize;
use sigh::{PrivateKey, Key};
use url::Url;
#[derive(Debug, Deserialize)]
pub struct Config {
pub redis: String,
pub redis_password_file: String,
pub in_topic: String,
priv_key_file: String,
pub prometheus_port: u16,
}
impl Config {
pub fn redis_url(&self) -> Url {
let redis_password = std::fs::read_to_string(&self.redis_password_file)
.expect("redis_password_file");
let mut redis_url = Url::parse(&self.redis)
.expect("redis_url");
redis_url.set_password(Some(&redis_password)).unwrap();
redis_url
}
pub fn priv_key(&self) -> PrivateKey {
let data = std::fs::read_to_string(&self.priv_key_file)
.expect("read priv_key_file");
PrivateKey::from_pem(data.as_bytes())
.expect("priv_key")
}
}

190
sieve/src/main.rs Normal file
View File

@ -0,0 +1,190 @@
use std::{
collections::HashSet,
sync::Arc,
time::Duration,
};
use futures::{Stream, StreamExt};
use url::Url;
use cave::{
activitypub::{
self,
fetch::authorized_fetch,
},
config::LoadConfig,
feed,
};
use metrics_util::MetricKindMask;
use metrics_exporter_prometheus::PrometheusBuilder;
mod config;
pub async fn connect_relay_in(redis_url: Url, in_topic: &str) -> Result<impl Stream<Item = activitypub::Action<serde_json::Value>>, redis::RedisError> {
let client = redis::Client::open(redis_url)?;
let mut pubsub_conn = client.get_async_connection()
.await?
.into_pubsub();
pubsub_conn.subscribe(in_topic)
.await?;
Ok(pubsub_conn.into_on_message().filter_map(|msg| async move {
let data = msg.get_payload::<String>().ok()?;
let json = serde_json::from_str(&data)
.ok()?;
Some(json)
}))
}
const KEY_ID: &str = "https://relay.fedi.buzz/instance/fedi.buzz#key";
#[tokio::main]
async fn main() {
cave::init::exit_on_panic();
cave::init::init_logger(5557);
let config = config::Config::load();
PrometheusBuilder::new()
.with_http_listener(([0; 8], config.prometheus_port))
.add_global_label("application", env!("CARGO_PKG_NAME"))
.idle_timeout(MetricKindMask::ALL, Some(Duration::from_secs(600)))
.install()
.unwrap();
let priv_key = Arc::new(config.priv_key());
let relay_in = connect_relay_in(config.redis_url(), &config.in_topic)
.await
.unwrap();
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(5))
.user_agent(concat!(
env!("CARGO_PKG_NAME"),
"/",
env!("CARGO_PKG_VERSION"),
))
.pool_max_idle_per_host(1)
.pool_idle_timeout(Some(Duration::from_secs(5)))
.build()
.unwrap();
let store = cave::store::Store::new(1, config.redis, config.redis_password_file).await;
let posts_cache = cave::posts_cache::PostsCache::new(65536);
let allowed_action_types: Arc<HashSet<String>> = Arc::new([
"Create", "Announce", "Update",
].into_iter().map(|s| s.to_string()).collect());
tracing::info!("ready");
cave::systemd::ready();
relay_in.for_each(|action| async {
// Filter by action type
if ! allowed_action_types.contains(&action.action_type) {
metrics::counter!("sieve_activity", 1, "type" => "type_ignored");
return;
}
let posts_cache = posts_cache.clone();
let client = client.clone();
let mut store = store.clone();
let priv_key = priv_key.clone();
tokio::spawn(async move {
// Avoid duplicate work
let id = if let Some(id) = action.object_id() {
let id = id.to_string();
if posts_cache.insert(id.clone()) {
return;
}
id
} else {
// no object id
tracing::warn!("No object id in action: {:?}", action);
return;
};
let object = if action.object.is_object() {
// all good, can be forwarded
action.object
} else if let Some(id) = action.object.as_str() {
// fetch info for id
tracing::debug!("GET {id}");
match authorized_fetch(&client, id, KEY_ID, &priv_key).await {
Ok(res) => {
metrics::counter!("sieve_activity", 1, "type" => "fetch_object_ok");
res
}
Err(e) => {
tracing::error!("{id} HTTP: {e:?}");
metrics::counter!("sieve_activity", 1, "type" => "fetch_object_error");
return;
}
}
} else {
tracing::warn!("Invalid object {id} in action: {:?}", action);
return;
};
let post: activitypub::Post = match serde_json::from_value(object) {
Ok(post) =>
post,
Err(e) => {
tracing::error!("JSON of {id}: {e:?}");
metrics::counter!("sieve_activity", 1, "type" => "json_error");
return;
}
};
let author: activitypub::Actor = if let Some(author_url) = &post.attributed_to {
match authorized_fetch(&client, author_url, KEY_ID, &priv_key).await {
Ok(author) => {
metrics::counter!("sieve_activity", 1, "type" => "fetch_author_ok");
author
}
Err(e) => {
tracing::error!("{author_url} HTTP: {e:?}");
metrics::counter!("sieve_activity", 1, "type" => "fetch_author_error");
return;
}
}
} else {
tracing::error!("No attributedTo in {id}");
metrics::counter!("sieve_activity", 1, "type" => "no_author");
return;
};
// Translate ActivityPub post to Mastodon client API post format
let feed_post = post.to_feed_post(author);
let event_type = if action.action_type == "Update" {
// Translate more weird Mastodon client API naming
"status.update"
} else {
"update"
}.to_string();
let encodable_post = if let Ok(post) = feed::EncodablePost::from_post(event_type, feed_post) {
post
} else {
tracing::error!("Cannot serialize post {id}");
metrics::counter!("sieve_activity", 1, "type" => "serialize_error");
return;
};
match store.save_post(encodable_post).await {
Ok(true) => {
tracing::info!("Post was new: {id}");
metrics::counter!("sieve_activity", 1, "type" => "post_new");
cave::systemd::watchdog();
}
Ok(false) => {
tracing::info!("Post was already known: {id}");
metrics::counter!("sieve_activity", 1, "type" => "post_known");
cave::systemd::watchdog();
}
Err(e) => {
tracing::error!("Error forwarding post {id}: {e:?}");
metrics::counter!("sieve_activity", 1, "type" => "post_error");
}
}
});
}).await;
}