server: improve PartCapture error handling

This commit is contained in:
Astro 2021-02-21 19:35:04 +01:00
parent a2e37ffe20
commit 677aaa285b
1 changed files with 16 additions and 10 deletions

View File

@ -14,6 +14,12 @@ use warp::{
}; };
use crate::app::{App, Payload}; use crate::app::{App, Payload};
fn bad_gateway() -> Box<dyn warp::Reply> {
let reply = warp::reply();
let reply = warp::reply::with_status(reply, StatusCode::BAD_GATEWAY);
Box::new(reply)
}
struct PartStream { struct PartStream {
app: App, app: App,
} }
@ -88,7 +94,7 @@ struct PartCapture {
impl PartCapture { impl PartCapture {
// async fn start(app: App) -> PartCapture { // async fn start(app: App) -> PartCapture {
async fn start((mut rx, stream_headers): (Receiver<Payload>, Arc<HeaderMap>)) -> Result<Box<dyn warp::Reply>, Infallible> { async fn start(mut rx: Receiver<Payload>, stream_headers: Arc<HeaderMap>) -> Result<Box<dyn warp::Reply>, Infallible> {
loop { loop {
match rx.recv().await { match rx.recv().await {
Ok(output) => match output.as_ref() { Ok(output) => match output.as_ref() {
@ -103,15 +109,14 @@ impl PartCapture {
// skip content Bytes // skip content Bytes
_ => {}, _ => {},
} }
Err(e) => return Ok(Box::new( Err(e) => return Ok(bad_gateway()),
warp::reply::with_status(warp::reply(), warp::http::StatusCode::BAD_GATEWAY)
)),
} }
} }
} }
} }
impl Reply for PartCapture { impl Reply for PartCapture {
// TODO: count up to option<content-length>
fn into_response(self) -> Response { fn into_response(self) -> Response {
let body_stream = stream::unfold(self.rx, |mut rx| async move { let body_stream = stream::unfold(self.rx, |mut rx| async move {
match rx.recv().await { match rx.recv().await {
@ -145,12 +150,13 @@ pub async fn run(app: App) {
let capture = warp::path!("capture.jpg") let capture = warp::path!("capture.jpg")
.map(move || app.clone()) .map(move || app.clone())
.and_then(|app: App| async move { .and_then(|app: App| async move {
app.subscribe() match (app.subscribe(), app.headers()) {
.and_then(|rx| app.headers() (Some(rx), Some(headers)) =>
.map(|headers| (rx, headers))) PartCapture::start(rx, headers).await,
.ok_or_else(|| warp::reject::not_found()) _ =>
}) Ok(bad_gateway()),
.and_then(PartCapture::start); }
});
let routes = warp::get() let routes = warp::get()
.and(stream) .and(stream)
.or(capture); .or(capture);