heliwatch/beast/src/beast.rs
2022-01-28 22:22:22 +01:00

158 lines
4.3 KiB
Rust

use futures::stream;
use tokio::io::AsyncReadExt;
use tokio::net::TcpStream;
use adsb_deku::deku::DekuContainerRead;
const DEFRAMER_DEFAULT_CAPACITY: usize = 22;
struct Deframer {
buffer: Vec<u8>,
escapes: usize,
}
impl Deframer {
pub fn new() -> Self {
Deframer {
buffer: Vec::with_capacity(DEFRAMER_DEFAULT_CAPACITY),
escapes: 0,
}
}
pub fn push(&mut self, c: u8) -> Option<Frame> {
let mut result = None;
if c == 0x1a {
self.escapes += 1;
} else {
if self.escapes > 0 {
for _ in 0..(self.escapes / 2) {
self.buffer.push(0x1a);
}
if self.escapes & 1 == 1 {
let new_buffer = Vec::with_capacity(DEFRAMER_DEFAULT_CAPACITY);
let buffer = std::mem::replace(&mut self.buffer, new_buffer);
if buffer.len() > 0 {
result = Frame::new(buffer);
}
}
self.escapes = 0;
}
self.buffer.push(c);
}
result
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
#[repr(u8)]
pub enum FrameType {
ModeAC = '1' as u8,
ModeSShort = '2' as u8,
ModeSLong = '3' as u8,
Config = '4' as u8,
}
#[derive(Debug)]
pub struct Frame {
data: Vec<u8>,
}
impl Frame {
pub fn new(data: Vec<u8>) -> Option<Self> {
if data.len() < 1 {
return None;
}
let this = Frame { data };
match (this.frame_type(), this.data.len()) {
(Some(FrameType::ModeAC), 10) =>
Some(this),
(Some(FrameType::ModeSShort), 15) =>
Some(this),
(Some(FrameType::ModeSLong), 22) =>
Some(this),
_ =>
None // don't care
}
}
pub fn frame_type(&self) -> Option<FrameType> {
match self.data[0] as char {
'1' => Some(FrameType::ModeAC),
'2' => Some(FrameType::ModeSShort),
'3' => Some(FrameType::ModeSLong),
'4' => Some(FrameType::Config),
_ => None,
}
}
pub fn timestamp(&self) -> u64 {
match self.frame_type() {
Some(_) => {
let mut result = 0;
for i in 1..=6 {
result |= u64::from(self.data[i]) << (8 * (6 - i));
}
result
}
None => 0
}
}
/// dbfs
pub fn rssi(&self) -> Option<f32> {
if self.data.len() < 8 {
None
} else if self.data[7] == 0xff {
None
} else {
Some(
10.0 * (((self.data[7] as f32) / 256.0).powi(2) + 1.0e-5).log10()
)
}
}
pub fn parse_adsb(&self) -> Option<adsb_deku::Frame> {
match self.frame_type() {
Some(FrameType::ModeSShort) | Some(FrameType::ModeSLong) => {
if let Ok((_rest, msg)) = adsb_deku::Frame::from_bytes((&self.data[8..], 0)) {
Some(msg)
} else {
eprintln!("adsb decode error");
None
}
}
_ => None
}
}
}
pub async fn connect(host: &str, port: u16) -> Result<impl futures::Stream<Item = Frame>, Box<dyn std::error::Error + Send + Sync>> {
println!("Connecting to {}:{}", host, port);
let conn = TcpStream::connect((host, port)).await?;
println!("Connected to {}:{}", host, port);
let stream = stream::unfold((conn, vec![], Deframer::new()), |(mut conn, mut buf, mut deframe)| async {
loop {
for i in 0..buf.len() {
if let Some(frame) = deframe.push(buf[i]) {
buf = buf.split_off(i + 1);
return Some((frame, (conn, buf, deframe)));
}
} // buf consumed
if buf.len() < DEFRAMER_DEFAULT_CAPACITY {
buf = vec![0; 1024];
}
match conn.read(&mut buf).await {
Ok(0) | Err(_) => {
println!("Disconnected");
return None;
}
Ok(len) =>
buf.truncate(len),
}
}
});
Ok(stream)
}