firehose: convey event_type
This commit is contained in:
parent
e07a6b9716
commit
195ba34501
|
@ -45,11 +45,15 @@ async fn main() {
|
||||||
let firehose_factory = FirehoseFactory::new(config.redis);
|
let firehose_factory = FirehoseFactory::new(config.redis);
|
||||||
let firehose = firehose_factory.produce()
|
let firehose = firehose_factory.produce()
|
||||||
.await
|
.await
|
||||||
.expect("firehose")
|
.expect("firehose");
|
||||||
.filter_map(|item| async move { item.ok() });
|
|
||||||
cave::systemd::ready();
|
cave::systemd::ready();
|
||||||
|
|
||||||
firehose.for_each(move |data| {
|
firehose.for_each(move |(event_type, data)| {
|
||||||
|
if event_type != b"update" {
|
||||||
|
// Only analyze new posts, no updates
|
||||||
|
return futures::future::ready(());
|
||||||
|
}
|
||||||
|
|
||||||
let trend_setter_tx = trend_setter_tx.clone();
|
let trend_setter_tx = trend_setter_tx.clone();
|
||||||
let mut store = store.clone();
|
let mut store = store.clone();
|
||||||
let profanity = profanity.clone();
|
let profanity = profanity.clone();
|
||||||
|
|
|
@ -151,6 +151,7 @@ enum EncodedPost {
|
||||||
/// to the original incoming data
|
/// to the original incoming data
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct EncodablePost {
|
pub struct EncodablePost {
|
||||||
|
pub event_type: String,
|
||||||
post: Post,
|
post: Post,
|
||||||
encoded: EncodedPost,
|
encoded: EncodedPost,
|
||||||
}
|
}
|
||||||
|
@ -163,17 +164,19 @@ impl Deref for EncodablePost {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl EncodablePost {
|
impl EncodablePost {
|
||||||
pub fn from_value(value: serde_json::Value) -> Result<Self, serde_json::Error> {
|
pub fn from_value(event_type: String, value: serde_json::Value) -> Result<Self, serde_json::Error> {
|
||||||
let post = serde_json::from_value(value.clone())?;
|
let post = serde_json::from_value(value.clone())?;
|
||||||
Ok(EncodablePost {
|
Ok(EncodablePost {
|
||||||
|
event_type,
|
||||||
post,
|
post,
|
||||||
encoded: EncodedPost::Value(value),
|
encoded: EncodedPost::Value(value),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn from_bytes(bytes: Vec<u8>) -> Result<Self, serde_json::Error> {
|
pub fn from_bytes(event_type: String, bytes: Vec<u8>) -> Result<Self, serde_json::Error> {
|
||||||
let post = serde_json::from_slice(&bytes)?;
|
let post = serde_json::from_slice(&bytes)?;
|
||||||
Ok(EncodablePost {
|
Ok(EncodablePost {
|
||||||
|
event_type,
|
||||||
post,
|
post,
|
||||||
encoded: EncodedPost::Bytes(bytes),
|
encoded: EncodedPost::Bytes(bytes),
|
||||||
})
|
})
|
||||||
|
@ -226,7 +229,7 @@ impl Feed {
|
||||||
let posts = tokio::task::spawn_blocking(move || {
|
let posts = tokio::task::spawn_blocking(move || {
|
||||||
let values: Vec<serde_json::Value> = serde_json::from_slice(&body)?;
|
let values: Vec<serde_json::Value> = serde_json::from_slice(&body)?;
|
||||||
let posts: Vec<EncodablePost> = values.into_iter()
|
let posts: Vec<EncodablePost> = values.into_iter()
|
||||||
.filter_map(|value| EncodablePost::from_value(value).ok())
|
.filter_map(|value| EncodablePost::from_value("update".to_string(), value).ok())
|
||||||
.collect();
|
.collect();
|
||||||
Ok::<_, serde_json::Error>(posts)
|
Ok::<_, serde_json::Error>(posts)
|
||||||
}).await.expect("join blocking")
|
}).await.expect("join blocking")
|
||||||
|
@ -254,16 +257,9 @@ impl Feed {
|
||||||
}
|
}
|
||||||
|
|
||||||
let src = res.bytes_stream().eventsource()
|
let src = res.bytes_stream().eventsource()
|
||||||
.filter_map(|result| async {
|
.filter_map(|result| async move {
|
||||||
let result = result.ok()?;
|
let event = result.ok()?;
|
||||||
if result.event == "update" {
|
EncodablePost::from_bytes(event.event, event.data.into_bytes()).ok()
|
||||||
Some(result)
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.filter_map(|event| async move {
|
|
||||||
EncodablePost::from_bytes(event.data.into_bytes()).ok()
|
|
||||||
});
|
});
|
||||||
Ok(src)
|
Ok(src)
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,7 +11,7 @@ impl FirehoseFactory {
|
||||||
FirehoseFactory { redis_url }
|
FirehoseFactory { redis_url }
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn produce(&self) -> Result<impl Stream<Item = Result<Vec<u8>, RedisError>>, RedisError> {
|
pub async fn produce(&self) -> Result<impl Stream<Item = (Vec<u8>, Vec<u8>)>, RedisError> {
|
||||||
let client = redis::Client::open(&self.redis_url[..])?;
|
let client = redis::Client::open(&self.redis_url[..])?;
|
||||||
let mut pubsub_conn = client.get_async_connection()
|
let mut pubsub_conn = client.get_async_connection()
|
||||||
.await?
|
.await?
|
||||||
|
@ -21,7 +21,13 @@ impl FirehoseFactory {
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let stream = pubsub_conn.into_on_message()
|
let stream = pubsub_conn.into_on_message()
|
||||||
.map(|msg| msg.get_payload());
|
.filter_map(|msg| async move {
|
||||||
|
let data = msg.get_payload::<Vec<u8>>().ok()?;
|
||||||
|
let nul_pos = data.iter().position(|&b| b == 0)?;
|
||||||
|
let event_type = data[..nul_pos].to_owned();
|
||||||
|
let post_data: Vec<u8> = data[nul_pos + 1..].to_owned();
|
||||||
|
Some((event_type, post_data))
|
||||||
|
});
|
||||||
Ok(stream)
|
Ok(stream)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -130,8 +130,14 @@ impl Store {
|
||||||
post.tags.len(), post.uri);
|
post.tags.len(), post.uri);
|
||||||
|
|
||||||
match post.encode() {
|
match post.encode() {
|
||||||
Ok(encoded) => {
|
Ok(encoded_post) => {
|
||||||
redis::Cmd::publish("firehose", encoded)
|
let mut data = Vec::<u8>::with_capacity(
|
||||||
|
post.event_type.len() + 1 + encoded_post.len()
|
||||||
|
);
|
||||||
|
data.extend(post.event_type.as_bytes());
|
||||||
|
data.extend(&[0]);
|
||||||
|
data.extend(&encoded_post);
|
||||||
|
redis::Cmd::publish("firehose", data)
|
||||||
.query_async::<_, Value>(self)
|
.query_async::<_, Value>(self)
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
|
|
|
@ -168,16 +168,16 @@ async fn streaming_api(
|
||||||
let firehose = firehose_factory.produce()
|
let firehose = firehose_factory.produce()
|
||||||
.await
|
.await
|
||||||
.expect("firehose");
|
.expect("firehose");
|
||||||
let stream_err = |e| stream::once(async move { Err(e) }).boxed();
|
let stream = stream::once(async { Ok::<Vec<u8>, axum::Error>(b":)\n".to_vec()) })
|
||||||
let stream_ok = |data|
|
|
||||||
stream::iter([
|
|
||||||
Ok(b"event: update\ndata: ".to_vec()),
|
|
||||||
Ok(data),
|
|
||||||
Ok(b"\n\n".to_vec()),
|
|
||||||
].into_iter()).boxed();
|
|
||||||
let stream = stream::once(async { Ok(b":)\n".to_vec()) })
|
|
||||||
.chain(
|
.chain(
|
||||||
firehose.flat_map(move |data| data.map_or_else(stream_err, stream_ok))
|
firehose.flat_map(|(event_type, data)|
|
||||||
|
stream::iter([
|
||||||
|
Ok(b"event: ".to_vec()),
|
||||||
|
Ok(event_type),
|
||||||
|
Ok(b"\ndata: ".to_vec()),
|
||||||
|
Ok(data),
|
||||||
|
Ok(b"\n\n".to_vec()),
|
||||||
|
].into_iter()).boxed())
|
||||||
);
|
);
|
||||||
let body = axum::body::boxed(hyper::body::Body::wrap_stream(stream));
|
let body = axum::body::boxed(hyper::body::Body::wrap_stream(stream));
|
||||||
|
|
||||||
|
|
|
@ -165,10 +165,15 @@ impl Drop for Pipe {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn publisher(state: State, firehose: impl Stream<Item = Vec<u8>>) {
|
async fn publisher(state: State, firehose: impl Stream<Item = (Vec<u8>, Vec<u8>)>) {
|
||||||
firehose.for_each(move |data| {
|
firehose.for_each(move |(event_type, data)| {
|
||||||
let state = state.clone();
|
let state = state.clone();
|
||||||
async move {
|
async move {
|
||||||
|
if event_type != b"update" {
|
||||||
|
// Only process new posts, no updates
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
let post = serde_json::from_slice(&data)
|
let post = serde_json::from_slice(&data)
|
||||||
.ok();
|
.ok();
|
||||||
let msg = post.and_then(format_message);
|
let msg = post.and_then(format_message);
|
||||||
|
@ -193,8 +198,7 @@ async fn main() {
|
||||||
let firehose_factory = FirehoseFactory::new(config.redis);
|
let firehose_factory = FirehoseFactory::new(config.redis);
|
||||||
let firehose = firehose_factory.produce()
|
let firehose = firehose_factory.produce()
|
||||||
.await
|
.await
|
||||||
.expect("firehose")
|
.expect("firehose");
|
||||||
.filter_map(|item| async move { item.ok() });
|
|
||||||
tokio::spawn(
|
tokio::spawn(
|
||||||
publisher(state.clone(), firehose)
|
publisher(state.clone(), firehose)
|
||||||
);
|
);
|
||||||
|
|
Loading…
Reference in New Issue
Block a user