diff --git a/src/app.rs b/src/app.rs index 8af267e..b39f636 100644 --- a/src/app.rs +++ b/src/app.rs @@ -17,11 +17,11 @@ impl App { } } - pub fn source(&self, headers: HeaderMap, boundary: Vec) -> Sender { + pub fn source(&self, headers: Arc, boundary: Arc>) -> Sender { let (tx, _rx) = broadcast::channel(128); let mut state = self.state.write().unwrap(); *state = Some(State { - headers: Arc::new(headers), + headers, boundary, tx: tx.clone(), }); @@ -37,16 +37,14 @@ impl App { }) } - pub fn boundary(&self) -> Vec { + pub fn boundary(&self) -> Option>> { let state = self.state.read().unwrap(); - state.as_ref().unwrap().boundary.clone() + state.as_ref().map(|state| state.boundary.clone()) } pub fn headers(&self) -> Option> { let state = self.state.read().unwrap(); - state.as_ref().map(|state| { - state.headers.clone() - }) + state.as_ref().map(|state| state.headers.clone()) } } @@ -63,6 +61,6 @@ impl Drop for AppSource { struct State { headers: Arc, - boundary: Vec, + boundary: Arc>, tx: Sender, } diff --git a/src/client.rs b/src/client.rs index 876fdfb..1ea9f42 100644 --- a/src/client.rs +++ b/src/client.rs @@ -34,10 +34,12 @@ async fn run_once(app: &App) -> Result<(), Box> { boundary = Some(field[9..].to_owned()); } } - let boundary = boundary.ok_or(Error::InvalidMultipart)?; - let headers = res.headers().clone(); + let boundary = Arc::new( + boundary.ok_or(Error::InvalidMultipart)? + ); + let headers = Arc::new(res.headers().clone()); let mut stream = MultipartParser::new( - boundary.clone(), + boundary.as_ref().clone(), res.bytes_stream() .map_ok(|buf| buf), ); diff --git a/src/server.rs b/src/server.rs index 757a493..04212f2 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,5 +1,5 @@ use std::{convert::Infallible, sync::Arc}; -use bytes::{BufMut, Bytes, BytesMut}; +use bytes::{BufMut, BytesMut}; use futures::stream; use tokio::sync::broadcast::Receiver; use http::{ @@ -21,34 +21,40 @@ fn bad_gateway() -> Box { } struct PartStream { - app: App, + rx: Receiver, + headers: Arc, + boundary: Arc>, } impl PartStream { - async fn new(app: App) -> Result { - Ok(PartStream { app }) + async fn start(app: App) -> Result { + let (rx, headers, boundary) = + match (app.subscribe(), app.headers(), app.boundary()) { + (Some(rx), Some(headers), Some(boundary)) => + (rx, headers, boundary), + _ => + return Ok(bad_gateway()), + }; + + Ok(Box::new(PartStream { + rx, + headers, + boundary, + })) } } impl Reply for PartStream { fn into_response(self) -> Response { - let (rx, headers) = match (self.app.subscribe(), self.app.headers()) { - (Some(rx), Some(headers)) => (rx, headers), - _ => { - let mut res = Response::new(Body::empty()); - *res.status_mut() = StatusCode::BAD_GATEWAY; - return res; - } - }; - - let body_stream = stream::unfold((self, rx), |(this, mut rx)| async move { - let result = match rx.recv().await { + let headers = self.headers.clone(); + let body_stream = stream::unfold(self, |mut this| async move { + let result = match this.rx.recv().await { Ok(payload) => { let bytes = match payload.as_ref() { ParseOutput::Headers(headers) => { let mut buf = BytesMut::new(); buf.put_slice(b"--"); - buf.put_slice(&this.app.boundary()); + buf.put_slice(&this.boundary); buf.put_slice(b"\r\n"); for (name, value) in headers { buf.put_slice(name.as_ref()); @@ -65,18 +71,10 @@ impl Reply for PartStream { }; Some(Ok(bytes)) } - Err(e) => { - match this.app.subscribe() { - Some(next_rx) => { - rx = next_rx; - Some(Ok(Bytes::from(""))) - } - None => - Some(Err(e)), - } - } + Err(e) => + Some(Err(e)), }; - result.map(|result| (result, (this, rx))) + result.map(|result| (result, this)) }); let mut res = Response::new(Body::wrap_stream(body_stream)); for (name, value) in headers.as_ref() { @@ -93,14 +91,22 @@ struct PartCapture { } impl PartCapture { - // async fn start(app: App) -> PartCapture { - async fn start(mut rx: Receiver, stream_headers: Arc) -> Result, Infallible> { + async fn start(app: App) -> Result, Infallible> { + + let (mut rx, http_headers) = + match (app.subscribe(), app.headers()) { + (Some(rx), Some(headers)) => + (rx, headers), + _ => + return Ok(bad_gateway()), + }; + loop { match rx.recv().await { Ok(output) => match output.as_ref() { // TODO: not necessarily the 1st ParseOutput::Headers(part_headers) => { - let mut headers = stream_headers.as_ref().clone(); + let mut headers = http_headers.as_ref().clone(); for (name, value) in part_headers { headers.insert(name, value.clone()); } @@ -109,7 +115,7 @@ impl PartCapture { // skip content Bytes _ => {}, } - Err(e) => return Ok(bad_gateway()), + Err(_) => return Ok(bad_gateway()), } } } @@ -131,7 +137,6 @@ impl Reply for PartCapture { } } Err(e) => { - println!("e: {:?}", e); Some((Err(e), rx)) }, } @@ -146,17 +151,11 @@ impl Reply for PartCapture { pub async fn run(app: App) { let app_ = app.clone(); let stream = warp::path!("stream.mjpeg") - .and_then(move || PartStream::new(app_.clone())); + .map(move || app_.clone()) + .and_then(PartStream::start); let capture = warp::path!("capture.jpg") .map(move || app.clone()) - .and_then(|app: App| async move { - match (app.subscribe(), app.headers()) { - (Some(rx), Some(headers)) => - PartCapture::start(rx, headers).await, - _ => - Ok(bad_gateway()), - } - }); + .and_then(PartCapture::start); let routes = warp::get() .and(stream) .or(capture);