diff --git a/src/app.rs b/src/app.rs index 50bb4a9..d0f4883 100644 --- a/src/app.rs +++ b/src/app.rs @@ -17,26 +17,22 @@ impl App { } } - pub fn source(&self, headers: HeaderMap, boundary: Vec) -> AppSource { - { - let mut state = self.state.write().unwrap(); - *state = Some(State { - headers: Arc::new(headers), - boundary, - next_broadcast: broadcast::channel(128), - }); - } + pub fn source(&self, headers: HeaderMap, boundary: Vec) -> Sender { + let (tx, _rx) = broadcast::channel(128); + let mut state = self.state.write().unwrap(); + *state = Some(State { + headers: Arc::new(headers), + boundary, + tx: tx.clone(), + }); - AppSource { - app: self.clone(), - } + tx } pub fn subscribe(&self) -> Option> { let state = self.state.read().unwrap(); state.as_ref().map(|state| { - let tx = &state.next_broadcast.0; - tx.subscribe() + state.tx.subscribe() }) } @@ -64,16 +60,8 @@ impl Drop for AppSource { } } -impl AppSource { - pub fn begin_part(&mut self) -> Sender { - let mut state = self.app.state.write().unwrap(); - let (tx, _rx) = std::mem::replace(&mut state.as_mut().unwrap().next_broadcast, broadcast::channel(128)); - tx - } -} - struct State { headers: Arc, boundary: Vec, - next_broadcast: (Sender, Receiver), + tx: Sender, } diff --git a/src/client.rs b/src/client.rs index 880684a..876fdfb 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,9 +1,6 @@ use std::sync::Arc; use futures::stream::TryStreamExt; -use mpart_async::server::{ - MultipartParser, - ParseOutput::{Headers, Bytes}, -}; +use mpart_async::server::MultipartParser; use reqwest::StatusCode; use crate::app::App; @@ -44,14 +41,8 @@ async fn run_once(app: &App) -> Result<(), Box> { res.bytes_stream() .map_ok(|buf| buf), ); - let mut source = app.source(headers, boundary); - let mut tx = source.begin_part(); + let tx = app.source(headers, boundary); while let Ok(Some(part)) = stream.try_next().await { - match &part { - Headers(_headers) => - tx = source.begin_part(), - Bytes(_bytes) => {} - } let _ = tx.send(Arc::new(part)); } diff --git a/src/server.rs b/src/server.rs index dbad476..a73a19e 100644 --- a/src/server.rs +++ b/src/server.rs @@ -94,6 +94,7 @@ impl PartCapture { (Some(ref mut rx), Some(ref stream_headers)) => { match rx.recv().await { Ok(output) => match output.as_ref() { + // TODO: not necessarily the 1st ParseOutput::Headers(part_headers) => { let mut headers = stream_headers.as_ref().clone(); for (name, value) in part_headers {