use std::collections::HashMap; use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; use futures::stream::StreamExt; use tokio::sync::mpsc::channel; use adsb::ICAOAddress; use super::beast; #[derive(Default)] pub struct Entry { pub emitter_category: Option, pub callsign: Option, pub altitude: Option, cpr1: Option, cpr2: Option, positions: [Option; 3], pub heading: Option, pub ground_speed: Option, pub vertical_rate: Option, pub vertical_rate_source: Option, last_update: Option, } impl Entry { const MAX_AGE: u64 = 300; fn update(&mut self, kind: adsb::ADSBMessageKind) { match kind { adsb::ADSBMessageKind::AirbornePosition { altitude, cpr_frame } => { self.altitude = Some(altitude); if self.cpr2.as_ref().map(|cpr2| cpr2.parity != cpr_frame.parity).unwrap_or(false) { self.cpr1 = self.cpr2.take(); } self.cpr2 = Some(cpr_frame); match (&self.cpr1, &self.cpr2) { (Some(cpr1), Some(cpr2)) => { match adsb::cpr::get_position((cpr1, cpr2)) { Some(pos) if pos.latitude >= -90.0 && pos.latitude <= 90.0 && pos.longitude >= -180.0 && pos.longitude <= 180.0 => { for i in 1..self.positions.len() { self.positions[i - 1] = self.positions[i].take(); } self.positions[self.positions.len() - 1] = Some(pos); } _ => {} } } _ => {} } } adsb::ADSBMessageKind::AirborneVelocity { heading, ground_speed, vertical_rate, vertical_rate_source, } => { self.heading = Some(heading); self.ground_speed = Some(ground_speed); self.vertical_rate = Some(vertical_rate); self.vertical_rate_source = Some(vertical_rate_source); } adsb::ADSBMessageKind::AircraftIdentification { emitter_category, callsign } => { self.emitter_category = Some(emitter_category); self.callsign = Some(callsign); } } self.last_update = Some(Instant::now()); } pub fn position(&self) -> Option<&adsb::Position> { let mut prev_lat = self.positions[0].as_ref()?.latitude; let mut prev_lon = self.positions[0].as_ref()?.longitude; for i in 1..self.positions.len() - 1 { let lat = self.positions[i].as_ref()?.latitude; let lon = self.positions[i].as_ref()?.longitude; if lat < prev_lat - 1.0 || lat > prev_lat + 1.0 || lon < prev_lon - 1.0 || lon > prev_lon + 1.0 { // erroneous jitter detected return None; } prev_lat = lat; prev_lon = lon; } self.positions[self.positions.len() - 1].as_ref() } pub fn flight(&self) -> Option<&str> { self.callsign.as_ref().map(|callsign| { callsign.split(char::is_whitespace) .next().unwrap() }) } pub fn altitude_m(&self) -> Option { self.altitude.map(|altitude| altitude as f64 * 0.3048) } } #[derive(Clone)] pub struct Aircrafts { state: Arc>>>, } impl Aircrafts { pub fn new() -> Self { Aircrafts { state: Arc::new(RwLock::new(HashMap::new())), } } pub fn read(&self) -> std::sync::RwLockReadGuard>> { self.state.read().unwrap() } pub fn connect(&self, host: &'static str, port: u16) { // buffering channel because readsb is very sensitive let (tx, mut rx) = channel(16 * 1024); // network input tokio::spawn(async move { loop { let mut stream; if let Ok(stream_) = beast::connect(host, port).await { stream = Box::pin(stream_); } else { tokio::time::sleep(Duration::from_secs(1)).await; // Retry continue; } while let Some(frame) = stream.next().await { let _ = tx.send(frame).await; } } }); // state update let state = self.state.clone(); tokio::spawn(async move { while let Some(frame) = rx.recv().await { match frame.parse_adsb() { Some(adsb::Message { kind: adsb::MessageKind::ADSBMessage { icao_address, kind, crc, .. }, .. }) if crc => { state.write().unwrap() .entry(icao_address) .or_default() .write().unwrap() .update(kind); } _ => {} } } }); // discard old states let state = self.state.clone(); tokio::spawn(async move { loop { state.write().unwrap(). retain(|_, entry| { entry.read().unwrap() .last_update.map(|last_update| { last_update + Duration::from_secs(Entry::MAX_AGE) > Instant::now() }) .unwrap_or(false) }); tokio::time::sleep(Duration::from_secs(1)).await; } }); } }