rust-mjpeg-proxy/src/client.rs

65 lines
1.7 KiB
Rust

use std::sync::Arc;
use futures::stream::TryStreamExt;
use mpart_async::server::MultipartParser;
use tokio::time::{Duration, sleep};
use reqwest::StatusCode;
use crate::app::App;
#[derive(Debug, Clone)]
pub enum Error {
InvalidStatus(StatusCode),
InvalidMultipart,
}
impl std::error::Error for Error {}
impl std::fmt::Display for Error {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> Result<(), std::fmt::Error> {
write!(fmt, "Invalid multipart")
}
}
async fn run_once(app: &App) -> Result<(), Box<dyn std::error::Error>> {
// TODO: from arg
// let res = reqwest::get("http://172.20.78.164:81/stream")
let res = reqwest::get("http://localhost:8080/?action=stream")
.await?;
if res.status() != StatusCode::OK {
Err(Error::InvalidStatus(res.status()))?;
}
let content_type = res.headers().get("content-type")
.ok_or(Error::InvalidMultipart)?;
let mut boundary = None;
for field in content_type.as_ref().split(|b| *b == ';' as u8) {
if field.starts_with(b"boundary=") {
boundary = Some(field[9..].to_owned());
}
}
let boundary = Arc::new(
boundary.ok_or(Error::InvalidMultipart)?
);
let headers = Arc::new(res.headers().clone());
let mut stream = MultipartParser::new(
boundary.as_ref().clone(),
res.bytes_stream()
.map_ok(|buf| buf),
);
let tx = app.source(headers, boundary);
while let Ok(Some(part)) = stream.try_next().await {
let _ = tx.send(Arc::new(part));
}
Ok(())
}
pub async fn run(app: App) {
loop {
run_once(&app).await
.unwrap_or_else(|e| println!("Client: {:?}", e));
app.clear();
sleep(Duration::from_secs(1)).await;
}
}