this basically works

This commit is contained in:
Astro 2022-12-17 00:17:53 +01:00
commit 33ec144979
5 changed files with 2040 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/target

1830
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

17
Cargo.toml Normal file
View File

@ -0,0 +1,17 @@
[package]
name = "alert2muc"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
futures = "0.3"
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
tokio-xmpp = "3"
xmpp-parsers = "0.19"
axum = "0.6"

101
src/jabber.rs Normal file
View File

@ -0,0 +1,101 @@
use std::str::FromStr;
use std::convert::TryFrom;
use futures::{SinkExt, StreamExt};
use tokio::sync::mpsc;
use tokio_xmpp::{AsyncClient, Packet, Event};
use xmpp_parsers::{Jid, BareJid, FullJid, Element};
use xmpp_parsers::message::{Body, Message, MessageType};
use xmpp_parsers::presence::{Presence, Show as PresenceShow, Type as PresenceType};
use xmpp_parsers::muc::Muc;
#[derive(Clone)]
pub struct Handle {
room_jid: BareJid,
tx: mpsc::Sender<Packet>,
}
impl Handle {
pub async fn send_message(&self, msg: String) {
let stanza = make_room_message(self.room_jid.clone(), msg);
self.tx.send(Packet::Stanza(stanza)).await.unwrap();
}
}
pub async fn run(jid: String, password: String, muc_jid: String) -> Handle {
let muc_jid: FullJid = match FullJid::from_str(&muc_jid) {
Ok(jid) => jid,
Err(err) => panic!("MUC Jid invalid: {:?}", err),
};
let (tx, mut rx) = mpsc::channel(1);
let handle = Handle {
room_jid: muc_jid.clone().into(),
tx: tx.clone(),
};
let mut client = AsyncClient::new(&jid, &password).unwrap();
loop {
match client.next().await {
Some(Event::Online { .. }) => {
println!("XMPP client now online at {}", jid);
let packet = Packet::Stanza(make_join_presence(muc_jid.clone()));
client.send(packet).await
.unwrap();
break;
}
Some(_) => {}
None => panic!("XMPP cannot connect"),
}
}
let (mut sink, mut stream) = client.split();
tokio::spawn(async move {
while let Some(event) = stream.next().await {
match event {
Event::Stanza(el) => {
if el.is("presence", "jabber:client") {
match Presence::try_from(el) {
Ok(presence) => {
if presence.from == Some(Jid::Full(muc_jid.clone())) {
if presence.type_ == PresenceType::Error {
println!("Failed to enter MUC {:?}", muc_jid);
} else {
println!("Entered MUC {:?}", muc_jid);
}
}
},
Err(err) => println!("Received invalid presence: {:?}", err),
}
}
}
_ => {}
}
}
panic!("XMPP end")
});
tokio::spawn(async move {
while let Some(packet) = rx.recv().await {
sink.send(packet).await.unwrap();
}
panic!("channel end")
});
handle
}
pub fn make_room_message(room_jid: BareJid, text: String) -> Element {
let mut message = Message::new(Some(Jid::Bare(room_jid)));
message.type_ = MessageType::Groupchat;
message.bodies.insert("de".to_string(), Body(text));
message.into()
}
fn make_join_presence(muc_jid: FullJid) -> Element {
let mut presence = Presence::new(PresenceType::None)
.with_to(Jid::Full(muc_jid))
.with_show(PresenceShow::Dnd);
presence.set_status("de".to_string(), "Augen und Ohren nach oben".to_string());
presence.add_payload(Muc::new());
presence.into()
}

91
src/main.rs Normal file
View File

@ -0,0 +1,91 @@
use std::{
net::SocketAddr,
collections::HashMap,
sync::{Arc, Mutex},
};
use axum::{
extract::State,
routing::post,
http::StatusCode,
response::{IntoResponse, Response},
Json, Router,
};
use serde::Deserialize;
mod jabber;
#[derive(Deserialize, Clone, Debug)]
struct Alert {
generatorURL: String,
startsAt: String,
annotations: AlertAnnotations,
labels: HashMap<String, String>,
}
#[derive(Deserialize, Clone, Debug)]
struct AlertAnnotations {
summary: String,
}
#[derive(Clone)]
struct AppState {
jabber: jabber::Handle,
alerts: Arc<Mutex<HashMap<String, Alert>>>,
}
async fn alerts(
State(state): State<AppState>,
Json(payload): Json<Vec<Alert>>,
) -> Response {
let mut message = "".to_string();
let mut alerts = state.alerts.lock().unwrap();
for alert in payload.into_iter() {
let is_old = if let Some(old_alert) = alerts.get(&alert.generatorURL) {
old_alert.startsAt == alert.startsAt
} else {
false
};
if ! is_old {
if message != "" {
message += "\n";
}
message += &format!("{}: {}", alert.annotations.summary, alert.generatorURL);
alerts.insert(alert.generatorURL.clone(), alert);
}
}
drop(alerts);
if message != "" {
let jabber = state.jabber.clone();
tokio::spawn(async move {
jabber.send_message(message).await;
});
}
StatusCode::OK.into_response()
}
#[tokio::main]
async fn main() {
// initialize tracing
tracing_subscriber::fmt::init();
let jabber = jabber::run(/*secret*/).await;
let state = AppState {
jabber,
alerts: Arc::new(Mutex::new(HashMap::new())),
};
// build our application with a route
let app = Router::new()
.route("/", post(alerts))
.with_state(state);
let addr = SocketAddr::from(([127, 0, 0, 1], 9022));
tracing::debug!("listening on {}", addr);
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await
.unwrap();
}