server: refactor

This commit is contained in:
Astro 2021-02-21 20:35:08 +01:00
parent 677aaa285b
commit 17e4ce2ef9
3 changed files with 51 additions and 52 deletions

View File

@ -17,11 +17,11 @@ impl App {
} }
} }
pub fn source(&self, headers: HeaderMap, boundary: Vec<u8>) -> Sender<Payload> { pub fn source(&self, headers: Arc<HeaderMap>, boundary: Arc<Vec<u8>>) -> Sender<Payload> {
let (tx, _rx) = broadcast::channel(128); let (tx, _rx) = broadcast::channel(128);
let mut state = self.state.write().unwrap(); let mut state = self.state.write().unwrap();
*state = Some(State { *state = Some(State {
headers: Arc::new(headers), headers,
boundary, boundary,
tx: tx.clone(), tx: tx.clone(),
}); });
@ -37,16 +37,14 @@ impl App {
}) })
} }
pub fn boundary(&self) -> Vec<u8> { pub fn boundary(&self) -> Option<Arc<Vec<u8>>> {
let state = self.state.read().unwrap(); let state = self.state.read().unwrap();
state.as_ref().unwrap().boundary.clone() state.as_ref().map(|state| state.boundary.clone())
} }
pub fn headers(&self) -> Option<Arc<HeaderMap>> { pub fn headers(&self) -> Option<Arc<HeaderMap>> {
let state = self.state.read().unwrap(); let state = self.state.read().unwrap();
state.as_ref().map(|state| { state.as_ref().map(|state| state.headers.clone())
state.headers.clone()
})
} }
} }
@ -63,6 +61,6 @@ impl Drop for AppSource {
struct State { struct State {
headers: Arc<HeaderMap>, headers: Arc<HeaderMap>,
boundary: Vec<u8>, boundary: Arc<Vec<u8>>,
tx: Sender<Payload>, tx: Sender<Payload>,
} }

View File

@ -34,10 +34,12 @@ async fn run_once(app: &App) -> Result<(), Box<dyn std::error::Error>> {
boundary = Some(field[9..].to_owned()); boundary = Some(field[9..].to_owned());
} }
} }
let boundary = boundary.ok_or(Error::InvalidMultipart)?; let boundary = Arc::new(
let headers = res.headers().clone(); boundary.ok_or(Error::InvalidMultipart)?
);
let headers = Arc::new(res.headers().clone());
let mut stream = MultipartParser::new( let mut stream = MultipartParser::new(
boundary.clone(), boundary.as_ref().clone(),
res.bytes_stream() res.bytes_stream()
.map_ok(|buf| buf), .map_ok(|buf| buf),
); );

View File

@ -1,5 +1,5 @@
use std::{convert::Infallible, sync::Arc}; use std::{convert::Infallible, sync::Arc};
use bytes::{BufMut, Bytes, BytesMut}; use bytes::{BufMut, BytesMut};
use futures::stream; use futures::stream;
use tokio::sync::broadcast::Receiver; use tokio::sync::broadcast::Receiver;
use http::{ use http::{
@ -21,34 +21,40 @@ fn bad_gateway() -> Box<dyn warp::Reply> {
} }
struct PartStream { struct PartStream {
app: App, rx: Receiver<Payload>,
headers: Arc<HeaderMap>,
boundary: Arc<Vec<u8>>,
} }
impl PartStream { impl PartStream {
async fn new(app: App) -> Result<impl warp::Reply, Infallible> { async fn start(app: App) -> Result<impl warp::Reply, Infallible> {
Ok(PartStream { app }) let (rx, headers, boundary) =
match (app.subscribe(), app.headers(), app.boundary()) {
(Some(rx), Some(headers), Some(boundary)) =>
(rx, headers, boundary),
_ =>
return Ok(bad_gateway()),
};
Ok(Box::new(PartStream {
rx,
headers,
boundary,
}))
} }
} }
impl Reply for PartStream { impl Reply for PartStream {
fn into_response(self) -> Response { fn into_response(self) -> Response {
let (rx, headers) = match (self.app.subscribe(), self.app.headers()) { let headers = self.headers.clone();
(Some(rx), Some(headers)) => (rx, headers), let body_stream = stream::unfold(self, |mut this| async move {
_ => { let result = match this.rx.recv().await {
let mut res = Response::new(Body::empty());
*res.status_mut() = StatusCode::BAD_GATEWAY;
return res;
}
};
let body_stream = stream::unfold((self, rx), |(this, mut rx)| async move {
let result = match rx.recv().await {
Ok(payload) => { Ok(payload) => {
let bytes = match payload.as_ref() { let bytes = match payload.as_ref() {
ParseOutput::Headers(headers) => { ParseOutput::Headers(headers) => {
let mut buf = BytesMut::new(); let mut buf = BytesMut::new();
buf.put_slice(b"--"); buf.put_slice(b"--");
buf.put_slice(&this.app.boundary()); buf.put_slice(&this.boundary);
buf.put_slice(b"\r\n"); buf.put_slice(b"\r\n");
for (name, value) in headers { for (name, value) in headers {
buf.put_slice(name.as_ref()); buf.put_slice(name.as_ref());
@ -65,18 +71,10 @@ impl Reply for PartStream {
}; };
Some(Ok(bytes)) Some(Ok(bytes))
} }
Err(e) => { Err(e) =>
match this.app.subscribe() {
Some(next_rx) => {
rx = next_rx;
Some(Ok(Bytes::from("")))
}
None =>
Some(Err(e)), Some(Err(e)),
}
}
}; };
result.map(|result| (result, (this, rx))) result.map(|result| (result, this))
}); });
let mut res = Response::new(Body::wrap_stream(body_stream)); let mut res = Response::new(Body::wrap_stream(body_stream));
for (name, value) in headers.as_ref() { for (name, value) in headers.as_ref() {
@ -93,14 +91,22 @@ struct PartCapture {
} }
impl PartCapture { impl PartCapture {
// async fn start(app: App) -> PartCapture { async fn start(app: App) -> Result<Box<dyn warp::Reply>, Infallible> {
async fn start(mut rx: Receiver<Payload>, stream_headers: Arc<HeaderMap>) -> Result<Box<dyn warp::Reply>, Infallible> {
let (mut rx, http_headers) =
match (app.subscribe(), app.headers()) {
(Some(rx), Some(headers)) =>
(rx, headers),
_ =>
return Ok(bad_gateway()),
};
loop { loop {
match rx.recv().await { match rx.recv().await {
Ok(output) => match output.as_ref() { Ok(output) => match output.as_ref() {
// TODO: not necessarily the 1st // TODO: not necessarily the 1st
ParseOutput::Headers(part_headers) => { ParseOutput::Headers(part_headers) => {
let mut headers = stream_headers.as_ref().clone(); let mut headers = http_headers.as_ref().clone();
for (name, value) in part_headers { for (name, value) in part_headers {
headers.insert(name, value.clone()); headers.insert(name, value.clone());
} }
@ -109,7 +115,7 @@ impl PartCapture {
// skip content Bytes // skip content Bytes
_ => {}, _ => {},
} }
Err(e) => return Ok(bad_gateway()), Err(_) => return Ok(bad_gateway()),
} }
} }
} }
@ -131,7 +137,6 @@ impl Reply for PartCapture {
} }
} }
Err(e) => { Err(e) => {
println!("e: {:?}", e);
Some((Err(e), rx)) Some((Err(e), rx))
}, },
} }
@ -146,17 +151,11 @@ impl Reply for PartCapture {
pub async fn run(app: App) { pub async fn run(app: App) {
let app_ = app.clone(); let app_ = app.clone();
let stream = warp::path!("stream.mjpeg") let stream = warp::path!("stream.mjpeg")
.and_then(move || PartStream::new(app_.clone())); .map(move || app_.clone())
.and_then(PartStream::start);
let capture = warp::path!("capture.jpg") let capture = warp::path!("capture.jpg")
.map(move || app.clone()) .map(move || app.clone())
.and_then(|app: App| async move { .and_then(PartCapture::start);
match (app.subscribe(), app.headers()) {
(Some(rx), Some(headers)) =>
PartCapture::start(rx, headers).await,
_ =>
Ok(bad_gateway()),
}
});
let routes = warp::get() let routes = warp::get()
.and(stream) .and(stream)
.or(capture); .or(capture);