diff --git a/src/app.rs b/src/app.rs index b39f636..94f0605 100644 --- a/src/app.rs +++ b/src/app.rs @@ -18,7 +18,7 @@ impl App { } pub fn source(&self, headers: Arc, boundary: Arc>) -> Sender { - let (tx, _rx) = broadcast::channel(128); + let (tx, _rx) = broadcast::channel(64); let mut state = self.state.write().unwrap(); *state = Some(State { headers, diff --git a/src/client.rs b/src/client.rs index 1ea9f42..fd55d8e 100644 --- a/src/client.rs +++ b/src/client.rs @@ -19,6 +19,7 @@ impl std::fmt::Display for Error { } async fn run_once(app: &App) -> Result<(), Box> { + // TODO: from arg // let res = reqwest::get("http://172.20.78.164:81/stream") let res = reqwest::get("http://localhost:8080/?action=stream") .await?; diff --git a/src/server.rs b/src/server.rs index 04212f2..b5bdb73 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,7 +1,7 @@ use std::{convert::Infallible, sync::Arc}; -use bytes::{BufMut, BytesMut}; +use bytes::{BufMut, Bytes, BytesMut}; use futures::stream; -use tokio::sync::broadcast::Receiver; +use tokio::sync::broadcast::{Receiver, error::RecvError}; use http::{ header::HeaderMap, status::StatusCode, @@ -21,6 +21,7 @@ fn bad_gateway() -> Box { } struct PartStream { + wait_for_header: bool, rx: Receiver, headers: Arc, boundary: Arc>, @@ -37,6 +38,7 @@ impl PartStream { }; Ok(Box::new(PartStream { + wait_for_header: true, rx, headers, boundary, @@ -48,10 +50,12 @@ impl Reply for PartStream { fn into_response(self) -> Response { let headers = self.headers.clone(); let body_stream = stream::unfold(self, |mut this| async move { - let result = match this.rx.recv().await { + match this.rx.recv().await { Ok(payload) => { let bytes = match payload.as_ref() { ParseOutput::Headers(headers) => { + this.wait_for_header = false; + let mut buf = BytesMut::new(); buf.put_slice(b"--"); buf.put_slice(&this.boundary); @@ -65,16 +69,23 @@ impl Reply for PartStream { buf.put_slice(b"\r\n"); buf.freeze() } + ParseOutput::Bytes(_) if this.wait_for_header => { + Bytes::from("") + } ParseOutput::Bytes(bytes) => { bytes.clone() } }; Some(Ok(bytes)) } - Err(e) => - Some(Err(e)), - }; - result.map(|result| (result, this)) + Err(RecvError::Lagged(lagged)) => { + println!("PartStream lagged across {} items", lagged); + this.wait_for_header = true; + Some(Ok(Bytes::from(""))) + } + Err(RecvError::Closed) => + None, + }.map(|result: Result| (result, this)) }); let mut res = Response::new(Body::wrap_stream(body_stream)); for (name, value) in headers.as_ref() { @@ -92,7 +103,6 @@ struct PartCapture { impl PartCapture { async fn start(app: App) -> Result, Infallible> { - let (mut rx, http_headers) = match (app.subscribe(), app.headers()) { (Some(rx), Some(headers)) => @@ -129,6 +139,7 @@ impl Reply for PartCapture { Ok(payload) => { match payload.as_ref() { ParseOutput::Headers(_headers) => { + // next frame; done None } ParseOutput::Bytes(bytes) => {