use std::collections::HashMap; use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; use futures::stream::StreamExt; use tokio::sync::mpsc::channel; use adsb_deku::ICAO; use super::beast; const JITTER_WINDOW: usize = 5; #[derive(Default)] pub struct Entry { pub category: Option<(adsb_deku::adsb::TypeCoding, u8)>, pub callsign: Option, pub altitude: Option, cpr1: Option, cpr2: Option, // jitter buffer positions: [Option; JITTER_WINDOW], pub heading: Option, pub speed: Option, pub vertical_rate: Option, last_update: Option, } impl Entry { const MAX_AGE: u64 = 300; fn update(&mut self, kind: adsb_deku::adsb::ME) { match kind { adsb_deku::adsb::ME::AirbornePositionBaroAltitude(altitude) | adsb_deku::adsb::ME::AirbornePositionGNSSAltitude(altitude) => { self.altitude = Some(altitude.alt); if self.cpr2.map_or(false, |cpr2| cpr2.odd_flag != altitude.odd_flag) { // if last altitude had a different odd flag, // shift the entries self.cpr1 = self.cpr2.take(); } // add the new entry self.cpr2 = Some(altitude); match (&self.cpr1, &self.cpr2) { (Some(cpr1), Some(cpr2)) => { if let Some(pos) = adsb_deku::cpr::get_position((cpr1, cpr2)) { if pos.latitude < -90. || pos.latitude > 90. || pos.longitude < -180. || pos.longitude > 180. { eprintln!("invalid position: {:?}", pos); } else { // shift previous positions in jitter buffer for i in 1..self.positions.len() { self.positions[i - 1] = self.positions[i].take(); } // add position to jitter buffer self.positions[self.positions.len() - 1] = Some(pos); } } } _ => {} } } adsb_deku::adsb::ME::AirborneVelocity(velocity) => { if let Some((heading, speed, vrate)) = velocity.calculate() { self.heading = Some(heading); self.speed = Some(speed); self.vertical_rate = Some(vrate); } } adsb_deku::adsb::ME::AircraftIdentification(ident) => { self.category = Some((ident.tc, ident.ca)); self.callsign = Some(ident.cn); } adsb_deku::adsb::ME::TargetStateAndStatusInformation(_) => {} adsb_deku::adsb::ME::AircraftStatus(_) => {} adsb_deku::adsb::ME::AircraftOperationStatus(_) => {} msg => { eprintln!("unhandled adsb msg: {:?}", msg); } } self.last_update = Some(Instant::now()); } pub fn position(&self) -> Option<&adsb_deku::cpr::Position> { let mut prev_lat = self.positions[0].as_ref()?.latitude; let mut prev_lon = self.positions[0].as_ref()?.longitude; for i in 1..self.positions.len() { let lat = self.positions[i].as_ref()?.latitude; let lon = self.positions[i].as_ref()?.longitude; if lat < prev_lat - 1.0 || lat > prev_lat + 1.0 || lon < prev_lon - 1.0 || lon > prev_lon + 1.0 { // erroneous jitter detected eprintln!("{:?} error in: {:?}", self.callsign, self.positions.iter().filter_map(|pos| { pos.as_ref().map(|pos| (pos.latitude as i32, pos.longitude as i32)) }).collect::>()); return None; } prev_lat = lat; prev_lon = lon; } self.positions[self.positions.len() - 1].as_ref() } pub fn flight(&self) -> Option<&str> { self.callsign.as_ref().map(|callsign| { callsign.split(char::is_whitespace) .next().unwrap() }) } pub fn altitude_m(&self) -> Option { self.altitude.map(|altitude| altitude as f64 * 0.3048) } } #[derive(Clone)] pub struct Aircrafts { state: Arc>>>, } impl Aircrafts { pub fn new() -> Self { Aircrafts { state: Arc::new(RwLock::new(HashMap::new())), } } pub fn read(&self) -> std::sync::RwLockReadGuard>> { self.state.read().unwrap() } pub fn connect(&self, host: &'static str, port: u16) { // buffering channel because readsb is very sensitive let (tx, mut rx) = channel(16 * 1024); // network input tokio::spawn(async move { loop { let mut stream; if let Ok(stream_) = beast::connect(host, port).await { stream = Box::pin(stream_); } else { tokio::time::sleep(Duration::from_secs(1)).await; // Retry continue; } while let Some(frame) = stream.next().await { let _ = tx.send(frame).await; } } }); // state update let state = self.state.clone(); tokio::spawn(async move { while let Some(frame) = rx.recv().await { match frame.parse_adsb() { Some(adsb_deku::Frame { df: adsb_deku::DF::ADSB(adsb), crc }) => { if crc != 0 { eprintln!("crc: {:02X}", crc); } state.write().unwrap() .entry(adsb.icao) .or_default() .write().unwrap() .update(adsb.me); } _ => {} } } }); // discard old states let state = self.state.clone(); tokio::spawn(async move { loop { state.write().unwrap(). retain(|_, entry| { entry.read().unwrap() .last_update.map(|last_update| { last_update + Duration::from_secs(Entry::MAX_AGE) > Instant::now() }) .unwrap_or(false) }); tokio::time::sleep(Duration::from_secs(1)).await; } }); } }