From a2e37ffe202d886b74663771d6cb0e25fc9dc353 Mon Sep 17 00:00:00 2001 From: Astro Date: Sun, 21 Feb 2021 04:05:59 +0100 Subject: [PATCH] server: start using warp routes for PartCapture --- src/app.rs | 1 + src/server.rs | 61 +++++++++++++++++++++++---------------------------- 2 files changed, 29 insertions(+), 33 deletions(-) diff --git a/src/app.rs b/src/app.rs index d0f4883..8af267e 100644 --- a/src/app.rs +++ b/src/app.rs @@ -29,6 +29,7 @@ impl App { tx } + // TODO: async drop til 1st header pub fn subscribe(&self) -> Option> { let state = self.state.read().unwrap(); state.as_ref().map(|state| { diff --git a/src/server.rs b/src/server.rs index a73a19e..6334a1a 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,7 +1,7 @@ -use std::{convert::Infallible}; +use std::{convert::Infallible, sync::Arc}; use bytes::{BufMut, Bytes, BytesMut}; use futures::stream; -use tokio::sync::broadcast::{Receiver}; +use tokio::sync::broadcast::Receiver; use http::{ header::HeaderMap, status::StatusCode, @@ -82,49 +82,38 @@ impl Reply for PartStream { } struct PartCapture { - rx: Option>, - headers: Option, + rx: Receiver, + headers: HeaderMap, } impl PartCapture { // async fn start(app: App) -> PartCapture { - async fn start(app: App) -> Result { - let mut rx = app.subscribe(); - let headers = match (&mut rx, app.headers()) { - (Some(ref mut rx), Some(ref stream_headers)) => { - match rx.recv().await { - Ok(output) => match output.as_ref() { - // TODO: not necessarily the 1st - ParseOutput::Headers(part_headers) => { - let mut headers = stream_headers.as_ref().clone(); - for (name, value) in part_headers { - headers.insert(name, value.clone()); - } - Some(headers) + async fn start((mut rx, stream_headers): (Receiver, Arc)) -> Result, Infallible> { + loop { + match rx.recv().await { + Ok(output) => match output.as_ref() { + // TODO: not necessarily the 1st + ParseOutput::Headers(part_headers) => { + let mut headers = stream_headers.as_ref().clone(); + for (name, value) in part_headers { + headers.insert(name, value.clone()); } - _ => None, + return Ok(Box::new(PartCapture { rx, headers })); } - _ => None, + // skip content Bytes + _ => {}, } + Err(e) => return Ok(Box::new( + warp::reply::with_status(warp::reply(), warp::http::StatusCode::BAD_GATEWAY) + )), } - _ => None, - }; - Ok(PartCapture { rx, headers }) + } } } impl Reply for PartCapture { fn into_response(self) -> Response { - let (rx, headers) = match (self.rx, self.headers) { - (Some(rx), Some(headers)) => (rx, headers), - _ => { - let mut res = Response::new(Body::empty()); - *res.status_mut() = StatusCode::BAD_GATEWAY; - return res; - } - }; - - let body_stream = stream::unfold(rx, |mut rx| async move { + let body_stream = stream::unfold(self.rx, |mut rx| async move { match rx.recv().await { Ok(payload) => { match payload.as_ref() { @@ -144,7 +133,7 @@ impl Reply for PartCapture { }); let mut res = Response::new(Body::wrap_stream(body_stream)); - *res.headers_mut() = headers; + *res.headers_mut() = self.headers; res } } @@ -155,6 +144,12 @@ pub async fn run(app: App) { .and_then(move || PartStream::new(app_.clone())); let capture = warp::path!("capture.jpg") .map(move || app.clone()) + .and_then(|app: App| async move { + app.subscribe() + .and_then(|rx| app.headers() + .map(|headers| (rx, headers))) + .ok_or_else(|| warp::reject::not_found()) + }) .and_then(PartCapture::start); let routes = warp::get() .and(stream)