use std::{convert::Infallible}; use bytes::{BufMut, Bytes, BytesMut}; use futures::stream; use tokio::sync::broadcast::{Receiver}; use http::{ header::HeaderMap, status::StatusCode, }; use hyper::Body; use mpart_async::server::ParseOutput; use warp::{ Filter, reply::{Reply, Response}, }; use crate::app::{App, Payload}; struct PartStream { app: App, } impl PartStream { async fn new(app: App) -> Result { Ok(PartStream { app }) } } impl Reply for PartStream { fn into_response(self) -> Response { let (rx, headers) = match (self.app.subscribe(), self.app.headers()) { (Some(rx), Some(headers)) => (rx, headers), _ => { let mut res = Response::new(Body::empty()); *res.status_mut() = StatusCode::BAD_GATEWAY; return res; } }; let body_stream = stream::unfold((self, rx), |(this, mut rx)| async move { let result = match rx.recv().await { Ok(payload) => { let bytes = match payload.as_ref() { ParseOutput::Headers(headers) => { let mut buf = BytesMut::new(); buf.put_slice(b"--"); buf.put_slice(&this.app.boundary()); buf.put_slice(b"\r\n"); for (name, value) in headers { buf.put_slice(name.as_ref()); buf.put_slice(b": "); buf.put_slice(value.as_ref()); buf.put_slice(b"\r\n"); } buf.put_slice(b"\r\n"); buf.freeze() } ParseOutput::Bytes(bytes) => { bytes.clone() } }; Some(Ok(bytes)) } Err(e) => { match this.app.subscribe() { Some(next_rx) => { rx = next_rx; Some(Ok(Bytes::from(""))) } None => Some(Err(e)), } } }; result.map(|result| (result, (this, rx))) }); let mut res = Response::new(Body::wrap_stream(body_stream)); for (name, value) in headers.as_ref() { res.headers_mut() .insert(name, value.clone()); } res } } struct PartCapture { rx: Option>, headers: Option, } impl PartCapture { // async fn start(app: App) -> PartCapture { async fn start(app: App) -> Result { let mut rx = app.subscribe(); let headers = match (&mut rx, app.headers()) { (Some(ref mut rx), Some(ref stream_headers)) => { match rx.recv().await { Ok(output) => match output.as_ref() { ParseOutput::Headers(part_headers) => { let mut headers = stream_headers.as_ref().clone(); for (name, value) in part_headers { headers.insert(name, value.clone()); } Some(headers) } _ => None, } _ => None, } } _ => None, }; Ok(PartCapture { rx, headers }) } } impl Reply for PartCapture { fn into_response(self) -> Response { let (rx, headers) = match (self.rx, self.headers) { (Some(rx), Some(headers)) => (rx, headers), _ => { let mut res = Response::new(Body::empty()); *res.status_mut() = StatusCode::BAD_GATEWAY; return res; } }; let body_stream = stream::unfold(rx, |mut rx| async move { match rx.recv().await { Ok(payload) => { match payload.as_ref() { ParseOutput::Headers(_headers) => { None } ParseOutput::Bytes(bytes) => { Some((Ok(bytes.clone()), rx)) } } } Err(e) => { println!("e: {:?}", e); Some((Err(e), rx)) }, } }); let mut res = Response::new(Body::wrap_stream(body_stream)); *res.headers_mut() = headers; res } } pub async fn run(app: App) { let app_ = app.clone(); let stream = warp::path!("stream.mjpeg") .and_then(move || PartStream::new(app_.clone())); let capture = warp::path!("capture.jpg") .map(move || app.clone()) .and_then(PartCapture::start); let routes = warp::get() .and(stream) .or(capture); warp::serve(routes).run(([127, 0, 0, 1], 3030)).await; }