buzz2elastic/src/main.rs

106 lines
3.2 KiB
Rust

// use metrics_util::MetricKindMask;
// use metrics_exporter_prometheus::PrometheusBuilder;
use reqwest::StatusCode;
use serde_json::map::Entry;
// use std::time::Duration;
use std::{panic, process};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
mod stream;
fn mangle_account(post: &mut serde_json::Value) {
if let serde_json::Value::Object(ref mut obj) = post {
if let Entry::Occupied(mut account) = obj.entry("account") {
let id = account.get_mut().get_mut("acct").map(serde_json::Value::take);
match id {
Some(id) =>
account.insert(id),
None =>
account.remove(),
};
}
}
}
fn mangle_post(post: &mut serde_json::Value) {
mangle_account(post);
if let Some(reblog) = post.get_mut("reblog") {
mangle_account(reblog);
}
}
#[tokio::main]
async fn main() {
exit_on_panic();
#[cfg(debug)]
let default_debug = "buzz2elastic=trace".into();
#[cfg(not(debug))]
let default_debug = "buzz2elastic=info".into();
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or(default_debug),
)
.with(tracing_subscriber::fmt::layer())
.init();
// let recorder = PrometheusBuilder::new()
// .add_global_label("application", env!("CARGO_PKG_NAME"))
// .idle_timeout(MetricKindMask::ALL, Some(Duration::from_secs(600)))
// .install_recorder()
// .unwrap();
let mut stream_rx = stream::spawn("fedi.buzz");
systemd::daemon::notify(false, [(systemd::daemon::STATE_READY, "1")].iter())
.unwrap();
let client = reqwest::Client::new();
loop {
let post_data = stream_rx.recv().await.unwrap();
let mut post = match serde_json::from_str::<serde_json::Value>(&post_data) {
Ok(post) => post,
Err(e) => {
tracing::error!("Cannot parse JSON: {}", e);
continue;
},
};
mangle_post(&mut post);
let post_id = if let Some(serde_json::Value::String(post_id)) = post.get("id") {
post_id.replace(' ', "+")
.replace('/', "%2F")
} else {
tracing::warn!("Post without id");
continue;
};
let result = client.post(format!("http://localhost:9200/ap/_create/{}", post_id))
.header("content-type", "application/json")
.body(serde_json::to_vec(&post).unwrap())
.send()
.await;
match result {
Ok(res) => {
if res.status() > StatusCode::MULTIPLE_CHOICES {
tracing::warn!("HTTP {} from Elastic", res.status());
} else {
tracing::trace!("HTTP {}", res.status());
}
}
Err(e) => {
tracing::error!("Error from Elastic: {}", e);
}
}
}
}
fn exit_on_panic() {
let orig_hook = panic::take_hook();
panic::set_hook(Box::new(move |panic_info| {
// invoke the default handler and exit the process
orig_hook(panic_info);
process::exit(1);
}));
}