diff --git a/config.yaml b/config.yaml index 73fa539..23cc155 100644 --- a/config.yaml +++ b/config.yaml @@ -1,5 +1,14 @@ +# Sources +streams: + # The fedi.buzz firehose stream + - "https://fedi.buzz/api/v1/streaming/public" + # You may list the streaming API of other instances here +# external https hostname hostname: relay.fedi.buzz +# where your reverse proxy will connect to listen_port: 3000 +# ActivityPub signing keypair priv_key_file: private-key.pem pub_key_file: public-key.pem +# PostgreSQL db: "host=localhost user=relay password=xyz dbname=buzzrelay" diff --git a/nixos-module.nix b/nixos-module.nix index 911bf4a..ae6f985 100644 --- a/nixos-module.nix +++ b/nixos-module.nix @@ -2,6 +2,12 @@ { config, lib, pkgs, ... }: { options.services.buzzrelay = with lib; { enable = mkEnableOption "Enable Fedi.buzz relay"; + streams = mkOption { + type = types.listOf str; + default = [ + "https://fedi.buzz/api/v1/streaming/public" + ]; + }; listenPort = mkOption { type = types.int; default = 8000; @@ -34,6 +40,7 @@ cfg = config.services.buzzrelay; configFile = builtins.toFile "buzzrelay.toml" ( lib.generators.toYAML {} { + streams = cfg.streams; hostname = cfg.hostName; listen_port = cfg.listenPort; priv_key_file = cfg.privKeyFile; diff --git a/src/config.rs b/src/config.rs index 65b01f3..2306b24 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,14 +1,9 @@ use serde::Deserialize; use sigh::{PrivateKey, PublicKey, Key}; -fn default_upstream() -> String { - "fedi.buzz".to_string() -} - #[derive(Deserialize)] pub struct Config { - #[serde(default = "default_upstream")] - pub upstream: String, + pub streams: Vec, pub db: String, pub hostname: String, pub listen_port: u16, diff --git a/src/main.rs b/src/main.rs index 680792e..0ab9d1b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -248,6 +248,8 @@ async fn main() { &std::env::args().nth(1) .expect("Call with config.yaml") ); + let priv_key = config.priv_key(); + let pub_key = config.pub_key(); let recorder = PrometheusBuilder::new() .add_global_label("application", env!("CARGO_PKG_NAME")) @@ -257,7 +259,7 @@ async fn main() { let database = db::Database::connect(&config.db).await; - let stream_rx = stream::spawn(config.upstream.clone()); + let stream_rx = stream::spawn(config.streams.into_iter()); let client = Arc::new( reqwest::Client::builder() .timeout(Duration::from_secs(5)) @@ -272,7 +274,7 @@ async fn main() { .unwrap() ); let hostname = Arc::new(config.hostname.clone()); - relay::spawn(client.clone(), hostname.clone(), database.clone(), config.priv_key(), stream_rx); + relay::spawn(client.clone(), hostname.clone(), database.clone(), priv_key.clone(), stream_rx); let app = Router::new() .route("/tag/:tag", get(get_tag_actor).post(post_tag_relay)) @@ -285,8 +287,8 @@ async fn main() { database, client, hostname, - priv_key: config.priv_key(), - pub_key: config.pub_key(), + priv_key, + pub_key, }) .merge(SpaRouter::new("/", "static")); diff --git a/src/stream.rs b/src/stream.rs index 8a8ae09..5d95f51 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -13,8 +13,7 @@ pub enum StreamError { InvalidContentType, } -async fn run(host: &str) -> Result, StreamError> { - let url = format!("https://{}/api/v1/streaming/public", host); +async fn run(url: &str) -> Result, StreamError> { let client = reqwest::Client::new(); let res = client.get(url) .timeout(Duration::MAX) @@ -44,22 +43,25 @@ async fn run(host: &str) -> Result, StreamError> { Ok(src) } -pub fn spawn>(host: H) -> Receiver { - let host = host.into(); +pub fn spawn(hosts: impl Iterator>) -> Receiver { let (tx, rx) = channel(1024); - tokio::spawn(async move { - loop { - match run(&host).await { - Ok(stream) => - stream.for_each(|post| async { - tx.send(post).await.unwrap(); - }).await, - Err(e) => - tracing::error!("stream: {:?}", e), - } + for host in hosts { + let host = host.into(); + let tx = tx.clone(); + tokio::spawn(async move { + loop { + match run(&host).await { + Ok(stream) => + stream.for_each(|post| async { + tx.send(post).await.unwrap(); + }).await, + Err(e) => + tracing::error!("stream: {:?}", e), + } - sleep(Duration::from_secs(1)).await; - } - }); + sleep(Duration::from_secs(1)).await; + } + }); + } rx }