use std::{convert::Infallible, sync::Arc}; use bytes::{BufMut, Bytes, BytesMut}; use futures::stream; use tokio::sync::broadcast::{Receiver, error::RecvError}; use http::{ header::HeaderMap, status::StatusCode, }; use hyper::Body; use mpart_async::server::ParseOutput; use warp::{ Filter, reply::{Reply, Response}, }; use crate::app::{App, Payload}; fn bad_gateway() -> Box { let reply = warp::reply(); let reply = warp::reply::with_status(reply, StatusCode::BAD_GATEWAY); Box::new(reply) } struct PartStream { wait_for_header: bool, rx: Receiver, headers: Arc, boundary: Arc>, } impl PartStream { async fn start(app: App) -> Result { let (rx, headers, boundary) = match (app.subscribe(), app.headers(), app.boundary()) { (Some(rx), Some(headers), Some(boundary)) => (rx, headers, boundary), _ => return Ok(bad_gateway()), }; Ok(Box::new(PartStream { wait_for_header: true, rx, headers, boundary, })) } } impl Reply for PartStream { fn into_response(self) -> Response { let headers = self.headers.clone(); let body_stream = stream::unfold(self, |mut this| async move { match this.rx.recv().await { Ok(payload) => { let bytes = match payload.as_ref() { ParseOutput::Headers(headers) => { this.wait_for_header = false; let mut buf = BytesMut::new(); buf.put_slice(b"--"); buf.put_slice(&this.boundary); buf.put_slice(b"\r\n"); for (name, value) in headers { buf.put_slice(name.as_ref()); buf.put_slice(b": "); buf.put_slice(value.as_ref()); buf.put_slice(b"\r\n"); } buf.put_slice(b"\r\n"); buf.freeze() } ParseOutput::Bytes(_) if this.wait_for_header => { Bytes::from("") } ParseOutput::Bytes(bytes) => { bytes.clone() } }; Some(Ok(bytes)) } Err(RecvError::Lagged(lagged)) => { println!("PartStream lagged across {} items", lagged); this.wait_for_header = true; Some(Ok(Bytes::from(""))) } Err(RecvError::Closed) => None, }.map(|result: Result| (result, this)) }); let mut res = Response::new(Body::wrap_stream(body_stream)); for (name, value) in headers.as_ref() { res.headers_mut() .insert(name, value.clone()); } res } } struct PartCapture { rx: Receiver, headers: HeaderMap, } impl PartCapture { async fn start(app: App) -> Result, Infallible> { let (mut rx, http_headers) = match (app.subscribe(), app.headers()) { (Some(rx), Some(headers)) => (rx, headers), _ => return Ok(bad_gateway()), }; loop { match rx.recv().await { Ok(output) => match output.as_ref() { ParseOutput::Headers(part_headers) => { let mut headers = http_headers.as_ref().clone(); for (name, value) in part_headers { headers.insert(name, value.clone()); } return Ok(Box::new(PartCapture { rx, headers })); } // skip content Bytes _ => {}, } Err(_) => return Ok(bad_gateway()), } } } } impl Reply for PartCapture { // TODO: count up to option fn into_response(self) -> Response { let body_stream = stream::unfold(self.rx, |mut rx| async move { match rx.recv().await { Ok(payload) => { match payload.as_ref() { ParseOutput::Headers(_headers) => { // next frame; done None } ParseOutput::Bytes(bytes) => { Some((Ok(bytes.clone()), rx)) } } } Err(e) => { Some((Err(e), rx)) }, } }); let mut res = Response::new(Body::wrap_stream(body_stream)); *res.headers_mut() = self.headers; res } } pub async fn run(app: App) { let app_ = app.clone(); let stream = warp::path!("stream.mjpeg") .map(move || app_.clone()) .and_then(PartStream::start); let capture = warp::path!("capture.jpg") .map(move || app.clone()) .and_then(PartCapture::start); let routes = warp::get() .and(stream) .or(capture); warp::serve(routes).run(([127, 0, 0, 1], 3030)).await; }