server: improve PartStream error handling/part skipping

This commit is contained in:
Astro 2021-02-21 20:45:02 +01:00
parent 17e4ce2ef9
commit f2e0f16350
3 changed files with 21 additions and 9 deletions

View File

@ -18,7 +18,7 @@ impl App {
} }
pub fn source(&self, headers: Arc<HeaderMap>, boundary: Arc<Vec<u8>>) -> Sender<Payload> { pub fn source(&self, headers: Arc<HeaderMap>, boundary: Arc<Vec<u8>>) -> Sender<Payload> {
let (tx, _rx) = broadcast::channel(128); let (tx, _rx) = broadcast::channel(64);
let mut state = self.state.write().unwrap(); let mut state = self.state.write().unwrap();
*state = Some(State { *state = Some(State {
headers, headers,

View File

@ -19,6 +19,7 @@ impl std::fmt::Display for Error {
} }
async fn run_once(app: &App) -> Result<(), Box<dyn std::error::Error>> { async fn run_once(app: &App) -> Result<(), Box<dyn std::error::Error>> {
// TODO: from arg
// let res = reqwest::get("http://172.20.78.164:81/stream") // let res = reqwest::get("http://172.20.78.164:81/stream")
let res = reqwest::get("http://localhost:8080/?action=stream") let res = reqwest::get("http://localhost:8080/?action=stream")
.await?; .await?;

View File

@ -1,7 +1,7 @@
use std::{convert::Infallible, sync::Arc}; use std::{convert::Infallible, sync::Arc};
use bytes::{BufMut, BytesMut}; use bytes::{BufMut, Bytes, BytesMut};
use futures::stream; use futures::stream;
use tokio::sync::broadcast::Receiver; use tokio::sync::broadcast::{Receiver, error::RecvError};
use http::{ use http::{
header::HeaderMap, header::HeaderMap,
status::StatusCode, status::StatusCode,
@ -21,6 +21,7 @@ fn bad_gateway() -> Box<dyn warp::Reply> {
} }
struct PartStream { struct PartStream {
wait_for_header: bool,
rx: Receiver<Payload>, rx: Receiver<Payload>,
headers: Arc<HeaderMap>, headers: Arc<HeaderMap>,
boundary: Arc<Vec<u8>>, boundary: Arc<Vec<u8>>,
@ -37,6 +38,7 @@ impl PartStream {
}; };
Ok(Box::new(PartStream { Ok(Box::new(PartStream {
wait_for_header: true,
rx, rx,
headers, headers,
boundary, boundary,
@ -48,10 +50,12 @@ impl Reply for PartStream {
fn into_response(self) -> Response { fn into_response(self) -> Response {
let headers = self.headers.clone(); let headers = self.headers.clone();
let body_stream = stream::unfold(self, |mut this| async move { let body_stream = stream::unfold(self, |mut this| async move {
let result = match this.rx.recv().await { match this.rx.recv().await {
Ok(payload) => { Ok(payload) => {
let bytes = match payload.as_ref() { let bytes = match payload.as_ref() {
ParseOutput::Headers(headers) => { ParseOutput::Headers(headers) => {
this.wait_for_header = false;
let mut buf = BytesMut::new(); let mut buf = BytesMut::new();
buf.put_slice(b"--"); buf.put_slice(b"--");
buf.put_slice(&this.boundary); buf.put_slice(&this.boundary);
@ -65,16 +69,23 @@ impl Reply for PartStream {
buf.put_slice(b"\r\n"); buf.put_slice(b"\r\n");
buf.freeze() buf.freeze()
} }
ParseOutput::Bytes(_) if this.wait_for_header => {
Bytes::from("")
}
ParseOutput::Bytes(bytes) => { ParseOutput::Bytes(bytes) => {
bytes.clone() bytes.clone()
} }
}; };
Some(Ok(bytes)) Some(Ok(bytes))
} }
Err(e) => Err(RecvError::Lagged(lagged)) => {
Some(Err(e)), println!("PartStream lagged across {} items", lagged);
}; this.wait_for_header = true;
result.map(|result| (result, this)) Some(Ok(Bytes::from("")))
}
Err(RecvError::Closed) =>
None,
}.map(|result: Result<Bytes, Infallible>| (result, this))
}); });
let mut res = Response::new(Body::wrap_stream(body_stream)); let mut res = Response::new(Body::wrap_stream(body_stream));
for (name, value) in headers.as_ref() { for (name, value) in headers.as_ref() {
@ -92,7 +103,6 @@ struct PartCapture {
impl PartCapture { impl PartCapture {
async fn start(app: App) -> Result<Box<dyn warp::Reply>, Infallible> { async fn start(app: App) -> Result<Box<dyn warp::Reply>, Infallible> {
let (mut rx, http_headers) = let (mut rx, http_headers) =
match (app.subscribe(), app.headers()) { match (app.subscribe(), app.headers()) {
(Some(rx), Some(headers)) => (Some(rx), Some(headers)) =>
@ -129,6 +139,7 @@ impl Reply for PartCapture {
Ok(payload) => { Ok(payload) => {
match payload.as_ref() { match payload.as_ref() {
ParseOutput::Headers(_headers) => { ParseOutput::Headers(_headers) => {
// next frame; done
None None
} }
ParseOutput::Bytes(bytes) => { ParseOutput::Bytes(bytes) => {