From 195ba345018b6d227c939bdaf92c8b3943af877d Mon Sep 17 00:00:00 2001 From: Astro Date: Sun, 1 Oct 2023 23:19:00 +0200 Subject: [PATCH] firehose: convey event_type --- butcher/src/main.rs | 10 +++++++--- cave/src/feed.rs | 22 +++++++++------------- cave/src/firehose.rs | 10 ++++++++-- cave/src/store.rs | 10 ++++++++-- gatherer/src/http_server.rs | 18 +++++++++--------- smokestack/src/main.rs | 12 ++++++++---- 6 files changed, 49 insertions(+), 33 deletions(-) diff --git a/butcher/src/main.rs b/butcher/src/main.rs index fc9d25e..eb02f07 100644 --- a/butcher/src/main.rs +++ b/butcher/src/main.rs @@ -45,11 +45,15 @@ async fn main() { let firehose_factory = FirehoseFactory::new(config.redis); let firehose = firehose_factory.produce() .await - .expect("firehose") - .filter_map(|item| async move { item.ok() }); + .expect("firehose"); cave::systemd::ready(); - firehose.for_each(move |data| { + firehose.for_each(move |(event_type, data)| { + if event_type != b"update" { + // Only analyze new posts, no updates + return futures::future::ready(()); + } + let trend_setter_tx = trend_setter_tx.clone(); let mut store = store.clone(); let profanity = profanity.clone(); diff --git a/cave/src/feed.rs b/cave/src/feed.rs index b6eae34..04c6077 100644 --- a/cave/src/feed.rs +++ b/cave/src/feed.rs @@ -151,6 +151,7 @@ enum EncodedPost { /// to the original incoming data #[derive(Debug)] pub struct EncodablePost { + pub event_type: String, post: Post, encoded: EncodedPost, } @@ -163,17 +164,19 @@ impl Deref for EncodablePost { } impl EncodablePost { - pub fn from_value(value: serde_json::Value) -> Result { + pub fn from_value(event_type: String, value: serde_json::Value) -> Result { let post = serde_json::from_value(value.clone())?; Ok(EncodablePost { + event_type, post, encoded: EncodedPost::Value(value), }) } - pub fn from_bytes(bytes: Vec) -> Result { + pub fn from_bytes(event_type: String, bytes: Vec) -> Result { let post = serde_json::from_slice(&bytes)?; Ok(EncodablePost { + event_type, post, encoded: EncodedPost::Bytes(bytes), }) @@ -226,7 +229,7 @@ impl Feed { let posts = tokio::task::spawn_blocking(move || { let values: Vec = serde_json::from_slice(&body)?; let posts: Vec = values.into_iter() - .filter_map(|value| EncodablePost::from_value(value).ok()) + .filter_map(|value| EncodablePost::from_value("update".to_string(), value).ok()) .collect(); Ok::<_, serde_json::Error>(posts) }).await.expect("join blocking") @@ -254,16 +257,9 @@ impl Feed { } let src = res.bytes_stream().eventsource() - .filter_map(|result| async { - let result = result.ok()?; - if result.event == "update" { - Some(result) - } else { - None - } - }) - .filter_map(|event| async move { - EncodablePost::from_bytes(event.data.into_bytes()).ok() + .filter_map(|result| async move { + let event = result.ok()?; + EncodablePost::from_bytes(event.event, event.data.into_bytes()).ok() }); Ok(src) } diff --git a/cave/src/firehose.rs b/cave/src/firehose.rs index 88d9c42..a5b6da0 100644 --- a/cave/src/firehose.rs +++ b/cave/src/firehose.rs @@ -11,7 +11,7 @@ impl FirehoseFactory { FirehoseFactory { redis_url } } - pub async fn produce(&self) -> Result, RedisError>>, RedisError> { + pub async fn produce(&self) -> Result, Vec)>, RedisError> { let client = redis::Client::open(&self.redis_url[..])?; let mut pubsub_conn = client.get_async_connection() .await? @@ -21,7 +21,13 @@ impl FirehoseFactory { .await?; let stream = pubsub_conn.into_on_message() - .map(|msg| msg.get_payload()); + .filter_map(|msg| async move { + let data = msg.get_payload::>().ok()?; + let nul_pos = data.iter().position(|&b| b == 0)?; + let event_type = data[..nul_pos].to_owned(); + let post_data: Vec = data[nul_pos + 1..].to_owned(); + Some((event_type, post_data)) + }); Ok(stream) } } diff --git a/cave/src/store.rs b/cave/src/store.rs index 4087827..5e752f0 100644 --- a/cave/src/store.rs +++ b/cave/src/store.rs @@ -130,8 +130,14 @@ impl Store { post.tags.len(), post.uri); match post.encode() { - Ok(encoded) => { - redis::Cmd::publish("firehose", encoded) + Ok(encoded_post) => { + let mut data = Vec::::with_capacity( + post.event_type.len() + 1 + encoded_post.len() + ); + data.extend(post.event_type.as_bytes()); + data.extend(&[0]); + data.extend(&encoded_post); + redis::Cmd::publish("firehose", data) .query_async::<_, Value>(self) .await?; } diff --git a/gatherer/src/http_server.rs b/gatherer/src/http_server.rs index 70d88c5..a2ed983 100644 --- a/gatherer/src/http_server.rs +++ b/gatherer/src/http_server.rs @@ -168,16 +168,16 @@ async fn streaming_api( let firehose = firehose_factory.produce() .await .expect("firehose"); - let stream_err = |e| stream::once(async move { Err(e) }).boxed(); - let stream_ok = |data| - stream::iter([ - Ok(b"event: update\ndata: ".to_vec()), - Ok(data), - Ok(b"\n\n".to_vec()), - ].into_iter()).boxed(); - let stream = stream::once(async { Ok(b":)\n".to_vec()) }) + let stream = stream::once(async { Ok::, axum::Error>(b":)\n".to_vec()) }) .chain( - firehose.flat_map(move |data| data.map_or_else(stream_err, stream_ok)) + firehose.flat_map(|(event_type, data)| + stream::iter([ + Ok(b"event: ".to_vec()), + Ok(event_type), + Ok(b"\ndata: ".to_vec()), + Ok(data), + Ok(b"\n\n".to_vec()), + ].into_iter()).boxed()) ); let body = axum::body::boxed(hyper::body::Body::wrap_stream(stream)); diff --git a/smokestack/src/main.rs b/smokestack/src/main.rs index b69f30c..9bf4137 100644 --- a/smokestack/src/main.rs +++ b/smokestack/src/main.rs @@ -165,10 +165,15 @@ impl Drop for Pipe { } } -async fn publisher(state: State, firehose: impl Stream>) { - firehose.for_each(move |data| { +async fn publisher(state: State, firehose: impl Stream, Vec)>) { + firehose.for_each(move |(event_type, data)| { let state = state.clone(); async move { + if event_type != b"update" { + // Only process new posts, no updates + return; + } + let post = serde_json::from_slice(&data) .ok(); let msg = post.and_then(format_message); @@ -193,8 +198,7 @@ async fn main() { let firehose_factory = FirehoseFactory::new(config.redis); let firehose = firehose_factory.produce() .await - .expect("firehose") - .filter_map(|item| async move { item.ok() }); + .expect("firehose"); tokio::spawn( publisher(state.clone(), firehose) );