server: start using warp routes for PartCapture

This commit is contained in:
Astro 2021-02-21 04:05:59 +01:00
parent 17087b7bae
commit a2e37ffe20
2 changed files with 29 additions and 33 deletions

View File

@ -29,6 +29,7 @@ impl App {
tx tx
} }
// TODO: async drop til 1st header
pub fn subscribe(&self) -> Option<Receiver<Payload>> { pub fn subscribe(&self) -> Option<Receiver<Payload>> {
let state = self.state.read().unwrap(); let state = self.state.read().unwrap();
state.as_ref().map(|state| { state.as_ref().map(|state| {

View File

@ -1,7 +1,7 @@
use std::{convert::Infallible}; use std::{convert::Infallible, sync::Arc};
use bytes::{BufMut, Bytes, BytesMut}; use bytes::{BufMut, Bytes, BytesMut};
use futures::stream; use futures::stream;
use tokio::sync::broadcast::{Receiver}; use tokio::sync::broadcast::Receiver;
use http::{ use http::{
header::HeaderMap, header::HeaderMap,
status::StatusCode, status::StatusCode,
@ -82,49 +82,38 @@ impl Reply for PartStream {
} }
struct PartCapture { struct PartCapture {
rx: Option<Receiver<Payload>>, rx: Receiver<Payload>,
headers: Option<HeaderMap>, headers: HeaderMap,
} }
impl PartCapture { impl PartCapture {
// async fn start(app: App) -> PartCapture { // async fn start(app: App) -> PartCapture {
async fn start(app: App) -> Result<impl warp::Reply, Infallible> { async fn start((mut rx, stream_headers): (Receiver<Payload>, Arc<HeaderMap>)) -> Result<Box<dyn warp::Reply>, Infallible> {
let mut rx = app.subscribe(); loop {
let headers = match (&mut rx, app.headers()) { match rx.recv().await {
(Some(ref mut rx), Some(ref stream_headers)) => { Ok(output) => match output.as_ref() {
match rx.recv().await { // TODO: not necessarily the 1st
Ok(output) => match output.as_ref() { ParseOutput::Headers(part_headers) => {
// TODO: not necessarily the 1st let mut headers = stream_headers.as_ref().clone();
ParseOutput::Headers(part_headers) => { for (name, value) in part_headers {
let mut headers = stream_headers.as_ref().clone(); headers.insert(name, value.clone());
for (name, value) in part_headers {
headers.insert(name, value.clone());
}
Some(headers)
} }
_ => None, return Ok(Box::new(PartCapture { rx, headers }));
} }
_ => None, // skip content Bytes
_ => {},
} }
Err(e) => return Ok(Box::new(
warp::reply::with_status(warp::reply(), warp::http::StatusCode::BAD_GATEWAY)
)),
} }
_ => None, }
};
Ok(PartCapture { rx, headers })
} }
} }
impl Reply for PartCapture { impl Reply for PartCapture {
fn into_response(self) -> Response { fn into_response(self) -> Response {
let (rx, headers) = match (self.rx, self.headers) { let body_stream = stream::unfold(self.rx, |mut rx| async move {
(Some(rx), Some(headers)) => (rx, headers),
_ => {
let mut res = Response::new(Body::empty());
*res.status_mut() = StatusCode::BAD_GATEWAY;
return res;
}
};
let body_stream = stream::unfold(rx, |mut rx| async move {
match rx.recv().await { match rx.recv().await {
Ok(payload) => { Ok(payload) => {
match payload.as_ref() { match payload.as_ref() {
@ -144,7 +133,7 @@ impl Reply for PartCapture {
}); });
let mut res = Response::new(Body::wrap_stream(body_stream)); let mut res = Response::new(Body::wrap_stream(body_stream));
*res.headers_mut() = headers; *res.headers_mut() = self.headers;
res res
} }
} }
@ -155,6 +144,12 @@ pub async fn run(app: App) {
.and_then(move || PartStream::new(app_.clone())); .and_then(move || PartStream::new(app_.clone()));
let capture = warp::path!("capture.jpg") let capture = warp::path!("capture.jpg")
.map(move || app.clone()) .map(move || app.clone())
.and_then(|app: App| async move {
app.subscribe()
.and_then(|rx| app.headers()
.map(|headers| (rx, headers)))
.ok_or_else(|| warp::reject::not_found())
})
.and_then(PartCapture::start); .and_then(PartCapture::start);
let routes = warp::get() let routes = warp::get()
.and(stream) .and(stream)