This commit is contained in:
Astro 2021-02-20 02:55:25 +01:00
parent 59a265fe24
commit 17087b7bae
3 changed files with 14 additions and 34 deletions

View File

@ -17,26 +17,22 @@ impl App {
}
}
pub fn source(&self, headers: HeaderMap, boundary: Vec<u8>) -> AppSource {
{
let mut state = self.state.write().unwrap();
*state = Some(State {
headers: Arc::new(headers),
boundary,
next_broadcast: broadcast::channel(128),
});
}
pub fn source(&self, headers: HeaderMap, boundary: Vec<u8>) -> Sender<Payload> {
let (tx, _rx) = broadcast::channel(128);
let mut state = self.state.write().unwrap();
*state = Some(State {
headers: Arc::new(headers),
boundary,
tx: tx.clone(),
});
AppSource {
app: self.clone(),
}
tx
}
pub fn subscribe(&self) -> Option<Receiver<Payload>> {
let state = self.state.read().unwrap();
state.as_ref().map(|state| {
let tx = &state.next_broadcast.0;
tx.subscribe()
state.tx.subscribe()
})
}
@ -64,16 +60,8 @@ impl Drop for AppSource {
}
}
impl AppSource {
pub fn begin_part(&mut self) -> Sender<Payload> {
let mut state = self.app.state.write().unwrap();
let (tx, _rx) = std::mem::replace(&mut state.as_mut().unwrap().next_broadcast, broadcast::channel(128));
tx
}
}
struct State {
headers: Arc<HeaderMap>,
boundary: Vec<u8>,
next_broadcast: (Sender<Payload>, Receiver<Payload>),
tx: Sender<Payload>,
}

View File

@ -1,9 +1,6 @@
use std::sync::Arc;
use futures::stream::TryStreamExt;
use mpart_async::server::{
MultipartParser,
ParseOutput::{Headers, Bytes},
};
use mpart_async::server::MultipartParser;
use reqwest::StatusCode;
use crate::app::App;
@ -44,14 +41,8 @@ async fn run_once(app: &App) -> Result<(), Box<dyn std::error::Error>> {
res.bytes_stream()
.map_ok(|buf| buf),
);
let mut source = app.source(headers, boundary);
let mut tx = source.begin_part();
let tx = app.source(headers, boundary);
while let Ok(Some(part)) = stream.try_next().await {
match &part {
Headers(_headers) =>
tx = source.begin_part(),
Bytes(_bytes) => {}
}
let _ = tx.send(Arc::new(part));
}

View File

@ -94,6 +94,7 @@ impl PartCapture {
(Some(ref mut rx), Some(ref stream_headers)) => {
match rx.recv().await {
Ok(output) => match output.as_ref() {
// TODO: not necessarily the 1st
ParseOutput::Headers(part_headers) => {
let mut headers = stream_headers.as_ref().clone();
for (name, value) in part_headers {