take target url from command line arg

This commit is contained in:
Astro 2021-02-21 21:16:59 +01:00
parent 3192528a3a
commit d64611c0c3
4 changed files with 38 additions and 14 deletions

View File

@ -34,7 +34,6 @@ impl App {
*state = None;
}
// TODO: async drop til 1st header
pub fn subscribe(&self) -> Option<Receiver<Payload>> {
let state = self.state.read().unwrap();
state.as_ref().map(|state| {

View File

@ -19,10 +19,8 @@ impl std::fmt::Display for Error {
}
}
async fn run_once(app: &App) -> Result<(), Box<dyn std::error::Error>> {
// TODO: from arg
// let res = reqwest::get("http://172.20.78.164:81/stream")
let res = reqwest::get("http://localhost:8080/?action=stream")
async fn run_once(app: &App, target: Arc<String>) -> Result<(), Box<dyn std::error::Error>> {
let res = reqwest::get(target.as_ref())
.await?;
if res.status() != StatusCode::OK {
Err(Error::InvalidStatus(res.status()))?;
@ -53,9 +51,10 @@ async fn run_once(app: &App) -> Result<(), Box<dyn std::error::Error>> {
Ok(())
}
pub async fn run(app: App) {
pub async fn run(app: App, target: String) {
let target = Arc::new(target);
loop {
run_once(&app).await
run_once(&app, target.clone()).await
.unwrap_or_else(|e| println!("Client: {:?}", e));
app.clear();

View File

@ -1,3 +1,4 @@
use std::{env::args, process::exit};
use futures::prelude::future::FutureExt;
mod app;
@ -6,8 +7,17 @@ mod server;
#[tokio::main(flavor = "current_thread")]
async fn main() {
let mut args = args();
let progname = args.next().unwrap();
let target = if let Some(target) = args.next() {
target
} else {
println!("Usage: {} <http://stream.mjpeg>", progname);
exit(255);
};
let app = app::App::new();
let c = tokio::spawn(client::run(app.clone()));
let c = tokio::spawn(client::run(app.clone(), target));
let s = tokio::spawn(server::run(app));
futures::select! {
_ = c.fuse() => panic!("client exit"),

View File

@ -3,7 +3,7 @@ use bytes::{BufMut, Bytes, BytesMut};
use futures::stream;
use tokio::sync::broadcast::{Receiver, error::RecvError};
use http::{
header::HeaderMap,
header::{CONTENT_LENGTH, HeaderMap},
status::StatusCode,
};
use hyper::Body;
@ -99,6 +99,7 @@ impl Reply for PartStream {
struct PartCapture {
rx: Receiver<Payload>,
headers: HeaderMap,
content_length: Option<usize>,
}
impl PartCapture {
@ -116,10 +117,21 @@ impl PartCapture {
Ok(output) => match output.as_ref() {
ParseOutput::Headers(part_headers) => {
let mut headers = http_headers.as_ref().clone();
let mut content_length = None;
for (name, value) in part_headers {
headers.insert(name, value.clone());
if name == CONTENT_LENGTH {
content_length = value.to_str()
.ok()
.and_then(|value| value.parse().ok());
}
}
return Ok(Box::new(PartCapture { rx, headers }));
return Ok(Box::new(PartCapture {
rx,
headers,
content_length,
}));
}
// skip content Bytes
_ => {},
@ -131,9 +143,12 @@ impl PartCapture {
}
impl Reply for PartCapture {
// TODO: count up to option<content-length>
fn into_response(self) -> Response {
let body_stream = stream::unfold(self.rx, |mut rx| async move {
let body_stream = stream::unfold((self.rx, self.content_length), |(mut rx, content_length)| async move {
if content_length == Some(0) {
return None;
}
match rx.recv().await {
Ok(payload) => {
match payload.as_ref() {
@ -142,12 +157,13 @@ impl Reply for PartCapture {
None
}
ParseOutput::Bytes(bytes) => {
Some((Ok(bytes.clone()), rx))
let content_length = content_length.map(|content_length| content_length.saturating_sub(bytes.len()));
Some((Ok(bytes.clone()), (rx, content_length)))
}
}
}
Err(e) => {
Some((Err(e), rx))
Some((Err(e), (rx, None)))
},
}
});