This commit is contained in:
Astro 2022-12-25 04:02:18 +01:00
commit a1d8227eb0
6 changed files with 1939 additions and 0 deletions

1608
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

20
Cargo.toml Normal file
View File

@ -0,0 +1,20 @@
[package]
name = "buzz2elastic"
version = "0.1.0"
edition = "2021"
[dependencies]
tokio = { version = "1", features = ["full", "time"] }
tracing = "*"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
serde = "1"
serde_json = "1"
serde_yaml = "0.9"
reqwest = { version = "0.11", features = ["json", "stream"] }
thiserror = "1"
eventsource-stream = "0.2"
futures = "0.3"
systemd = "0.10"
metrics = "0.20"
metrics-util = "0.14"
metrics-exporter-prometheus = "0.11"

65
flake.lock Normal file
View File

@ -0,0 +1,65 @@
{
"nodes": {
"naersk": {
"inputs": {
"nixpkgs": [
"nixpkgs"
]
},
"locked": {
"lastModified": 1671096816,
"narHash": "sha256-ezQCsNgmpUHdZANDCILm3RvtO1xH8uujk/+EqNvzIOg=",
"owner": "nix-community",
"repo": "naersk",
"rev": "d998160d6a076cfe8f9741e56aeec7e267e3e114",
"type": "github"
},
"original": {
"owner": "nix-community",
"ref": "master",
"repo": "naersk",
"type": "github"
}
},
"nixpkgs": {
"locked": {
"lastModified": 1671877799,
"narHash": "sha256-jjC0NtPOT4huSwyichdrKHVCjuGr1al7Wu6PcHo5XZs=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "8351f271f85dae1ee28269028acde661e60394dd",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixpkgs-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"naersk": "naersk",
"nixpkgs": "nixpkgs",
"utils": "utils"
}
},
"utils": {
"locked": {
"lastModified": 1667395993,
"narHash": "sha256-nuEHfE/LcWyuSWnS8t12N1wc105Qtau+/OdUAjtQ0rA=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "5aed5285a952e0b949eb3ba02c12fa4fcfef535f",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "flake-utils",
"type": "github"
}
}
},
"root": "root",
"version": 7
}

69
flake.nix Normal file
View File

@ -0,0 +1,69 @@
{
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixpkgs-unstable";
utils.url = "github:numtide/flake-utils";
naersk = {
url = "github:nix-community/naersk/master";
inputs.nixpkgs.follows = "nixpkgs";
};
};
outputs = { self, nixpkgs, utils, naersk }:
let
inherit (nixpkgs) lib;
makeBuzz2elastic = pkgs:
let
naersk-lib = pkgs.callPackage naersk { };
in
naersk-lib.buildPackage {
pname = "buzz2elastic";
root = ./.;
nativeBuildInputs = with pkgs; [ pkg-config ];
buildInputs = with pkgs; [ openssl systemd ];
postInstall = ''
mkdir -p $out/share/buzz2elastic
cp -r static $out/share/buzz2elastic/
'';
checkInputs = [ pkgs.rustPackages.clippy ];
doCheck = true;
cargoTestCommands = x:
x ++ [
''cargo clippy --all --all-features --tests -- \
-D warnings \
-A clippy::nonminimal_bool''
];
meta.description = "Send Prometheus alerts to XMPP Multi-User Chatrooms";
};
in
utils.lib.eachDefaultSystem
(system:
let
pkgs = nixpkgs.legacyPackages.${system};
in
{
packages = {
default = self.packages."${system}".buzz2elastic;
buzz2elastic = makeBuzz2elastic pkgs;
};
apps.default = utils.lib.mkApp {
drv = self.packages."${system}".default;
};
devShells.default = with pkgs; mkShell {
nativeBuildInputs = [
pkg-config
openssl systemd
cargo rustc rustfmt rustPackages.clippy rust-analyzer
];
RUST_SRC_PATH = rustPlatform.rustLibSrc;
};
})
// {
overlays.default = (_: prev: {
buzz2elastic = makeBuzz2elastic prev;
});
nixosModules.default = import ./nixos-module.nix { inherit self; };
};
}

108
src/main.rs Normal file
View File

@ -0,0 +1,108 @@
// use metrics_util::MetricKindMask;
// use metrics_exporter_prometheus::PrometheusBuilder;
use reqwest::StatusCode;
use serde_json::map::Entry;
// use std::time::Duration;
use std::{panic, process};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
mod stream;
fn mangle_account(post: &mut serde_json::Value) {
match post {
serde_json::Value::Object(ref mut obj) => {
if let Entry::Occupied(mut account) = obj.entry("account") {
let id = account.get_mut().get_mut("acct").map(|id| id.take());
match id {
Some(id) =>
account.insert(id),
None =>
account.remove(),
};
}
}
_ => {}
}
}
fn mangle_post(post: &mut serde_json::Value) {
mangle_account(post);
if let Some(reblog) = post.get_mut("reblog") {
mangle_account(reblog);
}
}
#[tokio::main]
async fn main() {
exit_on_panic();
#[cfg(debug)]
let default_debug = "buzz2elastic=trace".into();
#[cfg(not(debug))]
let default_debug = "buzz2elastic=info".into();
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| default_debug),
)
.with(tracing_subscriber::fmt::layer())
.init();
// let recorder = PrometheusBuilder::new()
// .add_global_label("application", env!("CARGO_PKG_NAME"))
// .idle_timeout(MetricKindMask::ALL, Some(Duration::from_secs(600)))
// .install_recorder()
// .unwrap();
let mut stream_rx = stream::spawn("fedi.buzz");
systemd::daemon::notify(false, [(systemd::daemon::STATE_READY, "1")].iter())
.unwrap();
let client = reqwest::Client::new();
loop {
let post_data = stream_rx.recv().await.unwrap();
let mut post = match serde_json::from_str::<serde_json::Value>(&post_data) {
Ok(post) => post,
Err(e) => {
tracing::error!("Cannot parse JSON: {}", e);
continue;
},
};
mangle_post(&mut post);
let post_id = if let Some(serde_json::Value::String(post_id)) = post.get("id") {
post_id.replace(' ', "+")
.replace('/', "%2F")
} else {
tracing::warn!("Post without id");
continue;
};
let result = client.post(format!("http://localhost:9200/ap/_create/{}", post_id))
.header("content-type", "application/json")
.body(serde_json::to_vec(&post).unwrap())
.send()
.await;
match result {
Ok(res) => {
if res.status() > StatusCode::MULTIPLE_CHOICES {
tracing::warn!("HTTP {} from Elastic", res.status());
} else {
tracing::trace!("HTTP {}", res.status());
}
}
Err(e) => {
tracing::error!("Error from Elastic: {}", e);
}
}
}
}
fn exit_on_panic() {
let orig_hook = panic::take_hook();
panic::set_hook(Box::new(move |panic_info| {
// invoke the default handler and exit the process
orig_hook(panic_info);
process::exit(1);
}));
}

69
src/stream.rs Normal file
View File

@ -0,0 +1,69 @@
use std::time::Duration;
use futures::{Stream, StreamExt};
use eventsource_stream::Eventsource;
use tokio::{
sync::mpsc::{channel, Receiver},
time::sleep,
};
#[derive(Debug)]
pub enum StreamError {
Http(reqwest::Error),
HttpStatus(reqwest::StatusCode),
InvalidContentType,
}
async fn run(host: &str) -> Result<impl Stream<Item = String>, StreamError> {
let url = format!("https://{}/api/v1/streaming/public", host);
let client = reqwest::Client::new();
let res = client.get(url)
.timeout(Duration::MAX)
.send()
.await
.map_err(StreamError::Http)?;
if res.status() != 200 {
return Err(StreamError::HttpStatus(res.status()));
}
let ct = res.headers().get("content-type")
.and_then(|c| c.to_str().ok());
if ct.map_or(true, |ct| ct != "text/event-stream") {
return Err(StreamError::InvalidContentType);
}
let src = res.bytes_stream()
.eventsource()
.filter_map(|result| async {
let result = result.ok()?;
if result.event == "update" {
Some(result)
} else {
None
}
})
.map(|event| event.data);
Ok(src)
}
pub fn spawn<H: Into<String>>(host: H) -> Receiver<String> {
let host = host.into();
let (tx, rx) = channel(1024);
tokio::spawn(async move {
loop {
match run(&host).await {
Ok(stream) =>
stream.for_each(|post| async {
match tx.try_send(post) {
Ok(()) => {}
Err(e) =>
tracing::warn!("Buffer full, dropping post: {}", e),
}
}).await,
Err(e) =>
tracing::error!("stream: {:?}", e),
}
sleep(Duration::from_secs(1)).await;
}
});
rx
}