From 7399f9c631bead5f7fda357039656c35af390aa6 Mon Sep 17 00:00:00 2001 From: Astro Date: Tue, 20 Dec 2022 00:04:56 +0100 Subject: [PATCH] relay: concurrentize --- src/relay.rs | 25 +++++++++++++++---------- src/send.rs | 2 +- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/src/relay.rs b/src/relay.rs index f4864b5..c9e4e38 100644 --- a/src/relay.rs +++ b/src/relay.rs @@ -67,6 +67,8 @@ pub fn spawn( private_key: PrivateKey, mut stream_rx: Receiver ) { + let private_key = Arc::new(private_key); + tokio::spawn(async move { while let Some(data) = stream_rx.recv().await { // dbg!(&data); @@ -84,7 +86,6 @@ pub fn spawn( // skip reposts None => continue, }; - // TODO: queue by target? let mut seen_actors = HashSet::new(); let mut seen_inboxes = HashSet::new(); for actor in post.relay_targets(hostname.clone()) { @@ -109,16 +110,20 @@ pub fn spawn( if seen_inboxes.contains(&inbox) { continue; } - + seen_inboxes.insert(inbox.clone()); + let client_ = client.clone(); + let body_ = body.clone(); + let key_id = actor.key_id(); + let private_key_ = private_key.clone(); tracing::debug!("relay {} to {}", actor_id, inbox); - if let Err(e) = send::send_raw( - &client, &inbox, - &actor.key_id(), &private_key, body.clone() - ).await { - tracing::error!("relay::send {:?}", e); - } - - seen_inboxes.insert(inbox); + tokio::spawn(async move { + if let Err(e) = send::send_raw( + &client_, &inbox, + &key_id, &private_key_, body_ + ).await { + tracing::error!("relay::send {:?}", e); + } + }); } seen_actors.insert(actor); diff --git a/src/send.rs b/src/send.rs index b695f9f..adafa5c 100644 --- a/src/send.rs +++ b/src/send.rs @@ -71,7 +71,7 @@ pub async fn send_raw( .header("digest", digest_header) .body(body.as_ref().clone()) .map_err(SendError::HttpReq)?; - SigningConfig::new(RsaSha256, private_key, key_id) + SigningConfig::new(RsaSha256, &private_key, key_id) .sign(&mut req)?; let req: reqwest::Request = req.try_into()?; let res = client.execute(req)