use std::collections::HashMap; use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; use futures::stream::StreamExt; use tokio::sync::mpsc::channel; use adsb::ICAOAddress; use super::beast; const MAX_CPR_INTERVAL: u64 = 2; #[derive(Default)] pub struct Entry { pub emitter_category: Option, pub callsign: Option, pub altitude: Option, cpr1: Option<(Instant, adsb::CPRFrame)>, cpr2: Option<(Instant, adsb::CPRFrame)>, pub position: Option, pub heading: Option, pub ground_speed: Option, pub vertical_rate: Option, pub vertical_rate_source: Option, last_update: Option, } impl Entry { const MAX_AGE: u64 = 300; fn update(&mut self, kind: adsb::ADSBMessageKind) { match kind { adsb::ADSBMessageKind::AirbornePosition { altitude, cpr_frame } => { self.altitude = Some(altitude); if self.cpr2.as_ref().map(|(_, cpr2)| cpr2.parity != cpr_frame.parity).unwrap_or(false) { self.cpr1 = self.cpr2.take(); } self.cpr2 = Some((Instant::now(), cpr_frame)); match (&self.cpr1, &self.cpr2) { (Some((time1, cpr1)), Some((time2, cpr2))) if time2.duration_since(*time1) <= Duration::from_secs(MAX_CPR_INTERVAL) => { if let Some(pos) = adsb::cpr::get_position((cpr1, cpr2)) { if pos.latitude >= -90.0 && pos.latitude <= 90.0 && pos.longitude >= -180.0 && pos.longitude <= 180.0 { self.position = Some(pos); } } } _ => {} } } adsb::ADSBMessageKind::AirborneVelocity { heading, ground_speed, vertical_rate, vertical_rate_source, } => { self.heading = Some(heading); self.ground_speed = Some(ground_speed); self.vertical_rate = Some(vertical_rate); self.vertical_rate_source = Some(vertical_rate_source); } adsb::ADSBMessageKind::AircraftIdentification { emitter_category, callsign } => { self.emitter_category = Some(emitter_category); self.callsign = Some(callsign); } } self.last_update = Some(Instant::now()); } pub fn flight(&self) -> Option<&str> { self.callsign.as_ref().map(|callsign| { callsign.split(char::is_whitespace) .next().unwrap() }) } pub fn altitude_m(&self) -> Option { self.altitude.map(|altitude| altitude as f64 * 0.3048) } } #[derive(Clone)] pub struct Aircrafts { state: Arc>>>, } impl Aircrafts { pub fn new() -> Self { Aircrafts { state: Arc::new(RwLock::new(HashMap::new())), } } pub fn read(&self) -> std::sync::RwLockReadGuard>> { self.state.read().unwrap() } pub fn connect(&self, host: &'static str, port: u16) { // buffering channel because readsb is very sensitive let (tx, mut rx) = channel(16 * 1024); // network input tokio::spawn(async move { loop { let mut stream; if let Ok(stream_) = beast::connect(host, port).await { stream = Box::pin(stream_); } else { tokio::time::sleep(Duration::from_secs(1)).await; // Retry continue; } while let Some(frame) = stream.next().await { let _ = tx.send(frame).await; } } }); // state update let state = self.state.clone(); tokio::spawn(async move { while let Some(frame) = rx.recv().await { match frame.parse_adsb() { Some(adsb::Message { kind: adsb::MessageKind::ADSBMessage { icao_address, kind, crc, .. }, .. }) if crc => { state.write().unwrap() .entry(icao_address) .or_default() .write().unwrap() .update(kind); } _ => {} } } }); // discard old states let state = self.state.clone(); tokio::spawn(async move { loop { state.write().unwrap(). retain(|_, entry| { entry.read().unwrap() .last_update.map(|last_update| { last_update + Duration::from_secs(Entry::MAX_AGE) > Instant::now() }) .unwrap_or(false) }); tokio::time::sleep(Duration::from_secs(1)).await; } }); } }