heliwatch/heliwatch/src/adsb.rs

218 lines
6.2 KiB
Rust

use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::{Duration, Instant};
use serde::Deserialize;
use tokio::sync::mpsc::{channel, Receiver};
use super::location::Locations;
/// ft
const MAX_ALTITUDE: u32 = 5000;
/// s
const STATE_TIMEOUT: u64 = 180;
#[derive(Deserialize, Debug, Clone)]
pub struct Info {
hex: String,
flight: String,
lat: f64,
lon: f64,
/// ft
altitude: u32,
track: u32,
/// kts
speed: u32,
}
impl Info {
pub fn get_hex(&self) -> &str {
let mut hex = &self.hex[..];
while hex.len() > 0 && hex.chars().last().unwrap().is_whitespace() {
hex = &hex[..hex.len() - 1];
}
hex
}
pub fn get_flight(&self) -> Option<String> {
let mut i = self.flight.len();
while i > 0 && self.flight.chars().nth(i - 1).unwrap().is_whitespace() {
i -= 1;
}
if i > 0 {
Some(self.flight[0..i].to_string())
} else {
None
}
}
pub fn get_altitude_m(&self) -> f64 {
self.altitude as f64 * 0.3048
}
// pub fn get_speed_kph(&self) -> f64 {
// self.speed as f64 * 1.852
// }
}
#[derive(Debug, Clone)]
pub struct Event {
pub action: Action,
pub info: Info,
pub location: Arc<String>,
}
#[derive(Debug, Clone, PartialEq)]
pub enum Action {
Appeared,
Disappeared,
Moved,
Ignored,
}
struct State {
info: Option<Info>,
location: Option<Arc<String>>,
last: Instant,
}
impl State {
pub fn new() -> Self {
State {
info: None,
location: None,
last: Instant::now(),
}
}
pub fn update(&mut self, info: Info, locations: &Locations) -> Option<Event> {
self.last = Instant::now();
let coord = geo::Coordinate { x: info.lon, y: info.lat };
if let Some(old_info) = self.info.replace(info) {
let info = self.info.as_ref().unwrap();
if old_info.lon != info.lon || old_info.lat != info.lat {
let location = locations.find(&coord);
if location != self.location {
if let Some(location) = location {
self.location = Some(location.clone());
Some(Event {
action: Action::Moved,
info: self.info.clone().unwrap(),
location,
})
} else {
if self.location.is_some() {
println!("{}: move to nowhere", info.flight);
}
// move to no location
self.location = None;
None
}
} else {
// pos moved, but no new location
None
}
} else {
// pos not moved
None
}
} else {
self.location = locations.find(&coord);
Some(Event {
action: Action::Appeared,
info: self.info.clone().unwrap(),
location: self.location.clone()
.unwrap_or_else(|| Arc::new("irgendwo".to_owned())),
})
}
}
}
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
async fn fetch(url: &str) -> Result<Vec<Info>> {
let values: Vec<serde_json::Value> = reqwest::get(url)
.await?
.json()
.await?;
// skip invalid entries
let result = values.into_iter()
.filter_map(|value| serde_json::from_value(value).ok())
.collect();
Ok(result)
}
pub fn run(url: &'static str, locations: Locations) -> Receiver<Event> {
let (tx, rx) = channel(1);
tokio::spawn(async move {
let mut states = HashMap::new();
// ignore anything above MAX_ALTITUDE permanently
let mut ignored = HashSet::new();
// cache that lives longer than dump1090's
let mut flights = HashMap::new();
loop {
let infos = match fetch(url).await {
Ok(infos) => infos,
Err(e) => {
eprintln!("{}", e);
continue;
}
};
let mut events = vec![];
for mut info in infos {
if info.altitude > MAX_ALTITUDE {
if !ignored.contains(&info.hex) {
ignored.insert(info.hex.clone());
if let Some(_) = states.remove(&info.hex) {
events.push(Event {
action: Action::Ignored,
info,
location: Arc::new("ignoriert".to_owned()),
});
}
}
continue;
}
if ignored.contains(&info.hex) {
continue;
}
if let Some(flight) = info.get_flight() {
flights.insert(info.hex.clone(), flight);
} else if let Some(flight) = flights.get(&info.hex) {
info.flight = flight.to_string();
} else {
continue;
};
if let Some(event) = states.entry(info.hex.clone())
.or_insert(State::new())
.update(info, &locations) {
tx.send(event).await.unwrap();
}
}
states.retain(|_, state| {
if state.last + Duration::from_secs(STATE_TIMEOUT) < Instant::now() {
events.push(Event {
action: Action::Disappeared,
info: state.info.clone().unwrap(),
location: Arc::new("weg".to_owned()),
});
false
} else {
true
}
});
for event in events {
tx.send(event).await.unwrap();
}
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
});
rx
}