smokestack/main: use try_send() instead of hanging send()

This commit is contained in:
Astro 2022-11-17 00:09:02 +01:00
parent 52c5597459
commit ee0382b15e
2 changed files with 6 additions and 5 deletions

View File

@ -183,8 +183,10 @@ impl Feed {
}) })
.filter_map(|event| async move { .filter_map(|event| async move {
match serde_json::from_str(&event.data) { match serde_json::from_str(&event.data) {
Ok(post) => Ok(post) => {
Some(post), drop(event);
Some(post)
},
Err(e) => { Err(e) => {
log::error!("Error decoding stream data: {}", e); log::error!("Error decoding stream data: {}", e);
None None

View File

@ -114,7 +114,7 @@ impl State {
.collect::<Vec<_>>() .collect::<Vec<_>>()
}; };
for tx in txs { for tx in txs {
let _ = tx.send(msg.clone()).await; let _ = tx.try_send(msg.clone());
} }
} }
} }
@ -127,7 +127,7 @@ struct Pipe {
impl Drop for Pipe { impl Drop for Pipe {
fn drop(&mut self) { fn drop(&mut self) {
log::trace!("drop pipe"); log::info!("Consumer disconnected");
let mut consumers = self.consumers.write().unwrap(); let mut consumers = self.consumers.write().unwrap();
consumers.remove(&self.id); consumers.remove(&self.id);
} }
@ -176,7 +176,6 @@ async fn main() {
let mut pipe = state.pipe(); let mut pipe = state.pipe();
tokio::spawn(async move { tokio::spawn(async move {
log::trace!("while...");
while let Some(msg) = pipe.rx.recv().await { while let Some(msg) = pipe.rx.recv().await {
match socket.write_all(&msg[..]).await { match socket.write_all(&msg[..]).await {
Ok(_) => {} Ok(_) => {}