add metrics

This commit is contained in:
Astro 2022-12-20 03:13:44 +01:00
parent 402d7fc9a7
commit a3c87c9311
5 changed files with 272 additions and 17 deletions

199
Cargo.lock generated
View File

@ -2,6 +2,26 @@
# It is not intended for manual editing. # It is not intended for manual editing.
version = 3 version = 3
[[package]]
name = "ahash"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47"
dependencies = [
"getrandom",
"once_cell",
"version_check",
]
[[package]]
name = "aho-corasick"
version = "0.7.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc936419f96fa211c1b9166887b38e5e40b19958e5b895be7c1f93adec7071ac"
dependencies = [
"memchr",
]
[[package]] [[package]]
name = "android_system_properties" name = "android_system_properties"
version = "0.1.5" version = "0.1.5"
@ -215,6 +235,9 @@ dependencies = [
"futures", "futures",
"http", "http",
"http_digest_headers", "http_digest_headers",
"metrics",
"metrics-exporter-prometheus",
"metrics-util",
"reqwest", "reqwest",
"serde", "serde",
"serde_json", "serde_json",
@ -302,6 +325,28 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "crossbeam-epoch"
version = "0.9.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "01a9af1f4c2ef74bb8aa1f7e19706bc72d03598c8a570bb5de72243c7a9d9d5a"
dependencies = [
"autocfg",
"cfg-if",
"crossbeam-utils",
"memoffset",
"scopeguard",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fb766fa798726286dbbb842f174001dab8abc7b627a1dd86e0b7222a95d929f"
dependencies = [
"cfg-if",
]
[[package]] [[package]]
name = "crypto-common" name = "crypto-common"
version = "0.1.6" version = "0.1.6"
@ -386,6 +431,12 @@ dependencies = [
"cfg-if", "cfg-if",
] ]
[[package]]
name = "endian-type"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d"
[[package]] [[package]]
name = "eventsource-stream" name = "eventsource-stream"
version = "0.2.3" version = "0.2.3"
@ -603,6 +654,9 @@ name = "hashbrown"
version = "0.12.3" version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
dependencies = [
"ahash",
]
[[package]] [[package]]
name = "heck" name = "heck"
@ -848,6 +902,15 @@ dependencies = [
"cfg-if", "cfg-if",
] ]
[[package]]
name = "mach"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b823e83b2affd8f40a9ee8c29dbc56404c1e34cd2710921f2801e2cf29527afa"
dependencies = [
"libc",
]
[[package]] [[package]]
name = "matchers" name = "matchers"
version = "0.1.0" version = "0.1.0"
@ -878,6 +941,77 @@ version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d"
[[package]]
name = "memoffset"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5de893c32cde5f383baa4c04c5d6dbdd735cfd4a794b0debdb2bb1b421da5ff4"
dependencies = [
"autocfg",
]
[[package]]
name = "metrics"
version = "0.20.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b9b8653cec6897f73b519a43fba5ee3d50f62fe9af80b428accdcc093b4a849"
dependencies = [
"ahash",
"metrics-macros",
"portable-atomic",
]
[[package]]
name = "metrics-exporter-prometheus"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8603921e1f54ef386189335f288441af761e0fc61bcb552168d9cedfe63ebc70"
dependencies = [
"hyper",
"indexmap",
"ipnet",
"metrics",
"metrics-util",
"parking_lot",
"portable-atomic",
"quanta",
"thiserror",
"tokio",
"tracing",
]
[[package]]
name = "metrics-macros"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "731f8ecebd9f3a4aa847dfe75455e4757a45da40a7793d2f0b1f9b6ed18b23f3"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "metrics-util"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7d24dc2dbae22bff6f1f9326ffce828c9f07ef9cc1e8002e5279f845432a30a"
dependencies = [
"aho-corasick",
"crossbeam-epoch",
"crossbeam-utils",
"hashbrown",
"indexmap",
"metrics",
"num_cpus",
"ordered-float",
"parking_lot",
"portable-atomic",
"quanta",
"radix_trie",
"sketches-ddsketch",
]
[[package]] [[package]]
name = "mime" name = "mime"
version = "0.3.16" version = "0.3.16"
@ -930,6 +1064,15 @@ dependencies = [
"tempfile", "tempfile",
] ]
[[package]]
name = "nibble_vec"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a5d83df9f36fe23f0c3648c6bbb8b0298bb5f1939c8f2704431371f4b84d43"
dependencies = [
"smallvec",
]
[[package]] [[package]]
name = "nom" name = "nom"
version = "7.1.1" version = "7.1.1"
@ -1030,6 +1173,15 @@ dependencies = [
"vcpkg", "vcpkg",
] ]
[[package]]
name = "ordered-float"
version = "2.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7940cf2ca942593318d07fcf2596cdca60a85c9e7fab408a5e21a4f9dcd40d87"
dependencies = [
"num-traits",
]
[[package]] [[package]]
name = "overload" name = "overload"
version = "0.1.1" version = "0.1.1"
@ -1121,6 +1273,12 @@ version = "0.3.26"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ac9a59f73473f1b8d852421e59e64809f025994837ef743615c6d0c5b305160" checksum = "6ac9a59f73473f1b8d852421e59e64809f025994837ef743615c6d0c5b305160"
[[package]]
name = "portable-atomic"
version = "0.3.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81bdd679d533107e090c2704a35982fc06302e30898e63ffa26a81155c012e92"
[[package]] [[package]]
name = "postgres-protocol" name = "postgres-protocol"
version = "0.6.4" version = "0.6.4"
@ -1165,6 +1323,22 @@ dependencies = [
"unicode-ident", "unicode-ident",
] ]
[[package]]
name = "quanta"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b7e31331286705f455e56cca62e0e717158474ff02b7936c1fa596d983f4ae27"
dependencies = [
"crossbeam-utils",
"libc",
"mach",
"once_cell",
"raw-cpuid",
"wasi 0.10.0+wasi-snapshot-preview1",
"web-sys",
"winapi",
]
[[package]] [[package]]
name = "quote" name = "quote"
version = "1.0.21" version = "1.0.21"
@ -1174,6 +1348,16 @@ dependencies = [
"proc-macro2", "proc-macro2",
] ]
[[package]]
name = "radix_trie"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c069c179fcdc6a2fe24d8d18305cf085fdbd4f922c041943e203685d6a1c58fd"
dependencies = [
"endian-type",
"nibble_vec",
]
[[package]] [[package]]
name = "rand" name = "rand"
version = "0.8.5" version = "0.8.5"
@ -1204,6 +1388,15 @@ dependencies = [
"getrandom", "getrandom",
] ]
[[package]]
name = "raw-cpuid"
version = "10.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6823ea29436221176fe662da99998ad3b4db2c7f31e7b6f5fe43adccd6320bb"
dependencies = [
"bitflags",
]
[[package]] [[package]]
name = "redox_syscall" name = "redox_syscall"
version = "0.2.16" version = "0.2.16"
@ -1454,6 +1647,12 @@ version = "0.3.10"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de" checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de"
[[package]]
name = "sketches-ddsketch"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ceb945e54128e09c43d8e4f1277851bd5044c6fc540bbaa2ad888f60b3da9ae7"
[[package]] [[package]]
name = "slab" name = "slab"
version = "0.4.7" version = "0.4.7"

View File

@ -24,3 +24,6 @@ eventsource-stream = "0.2"
futures = "0.3" futures = "0.3"
tokio-postgres = "0.7" tokio-postgres = "0.7"
systemd = "0.10" systemd = "0.10"
metrics = "0.20"
metrics-util = "0.14"
metrics-exporter-prometheus = "0.11"

View File

@ -4,7 +4,9 @@ use axum::{
response::{IntoResponse, Response}, response::{IntoResponse, Response},
routing::{get}, Json, Router, routing::{get}, Json, Router,
}; };
use metrics::increment_counter;
use metrics_util::MetricKindMask;
use metrics_exporter_prometheus::{PrometheusBuilder};
use serde_json::json; use serde_json::json;
use sigh::{PrivateKey, PublicKey}; use sigh::{PrivateKey, PublicKey};
use std::{net::SocketAddr, sync::Arc, time::Duration, collections::HashMap}; use std::{net::SocketAddr, sync::Arc, time::Duration, collections::HashMap};
@ -39,13 +41,20 @@ impl FromRef<State> for Arc<reqwest::Client> {
} }
} }
fn track_request(method: &'static str, controller: &'static str, result: &'static str) {
increment_counter!("request", "controller" => controller, "method" => method, "result" => result);
}
async fn webfinger( async fn webfinger(
axum::extract::State(state): axum::extract::State<State>, axum::extract::State(state): axum::extract::State<State>,
Query(params): Query<HashMap<String, String>>, Query(params): Query<HashMap<String, String>>,
) -> Response { ) -> Response {
let resource = match params.get("resource") { let resource = match params.get("resource") {
Some(resource) => resource, Some(resource) => resource,
None => return StatusCode::NOT_FOUND.into_response(), None => {
track_request("GET", "webfinger", "invalid");
return StatusCode::NOT_FOUND.into_response();
},
}; };
let (target_kind, target_host) = let (target_kind, target_host) =
if resource.starts_with("acct:tag-") { if resource.starts_with("acct:tag-") {
@ -59,8 +68,10 @@ async fn webfinger(
(actor::ActorKind::InstanceRelay(resource[off..at.unwrap_or(resource.len())].to_string()), (actor::ActorKind::InstanceRelay(resource[off..at.unwrap_or(resource.len())].to_string()),
at.map_or_else(|| state.hostname.clone(), |at| Arc::new(resource[at + 1..].to_string()))) at.map_or_else(|| state.hostname.clone(), |at| Arc::new(resource[at + 1..].to_string())))
} else { } else {
track_request("GET", "webfinger", "not_found");
return StatusCode::NOT_FOUND.into_response(); return StatusCode::NOT_FOUND.into_response();
}; };
track_request("GET", "webfinger", "found");
let target = actor::Actor { let target = actor::Actor {
host: target_host, host: target_host,
kind: target_kind, kind: target_kind,
@ -82,6 +93,7 @@ async fn get_tag_actor(
axum::extract::State(state): axum::extract::State<State>, axum::extract::State(state): axum::extract::State<State>,
Path(tag): Path<String> Path(tag): Path<String>
) -> Response { ) -> Response {
track_request("GET", "actor", "tag");
let target = actor::Actor { let target = actor::Actor {
host: state.hostname.clone(), host: state.hostname.clone(),
kind: actor::ActorKind::TagRelay(tag.to_lowercase()), kind: actor::ActorKind::TagRelay(tag.to_lowercase()),
@ -94,6 +106,7 @@ async fn get_instance_actor(
axum::extract::State(state): axum::extract::State<State>, axum::extract::State(state): axum::extract::State<State>,
Path(instance): Path<String> Path(instance): Path<String>
) -> Response { ) -> Response {
track_request("GET", "actor", "instance");
let target = actor::Actor { let target = actor::Actor {
host: state.hostname.clone(), host: state.hostname.clone(),
kind: actor::ActorKind::InstanceRelay(instance.to_lowercase()), kind: actor::ActorKind::InstanceRelay(instance.to_lowercase()),
@ -134,10 +147,13 @@ async fn post_relay(
dbg!(&endpoint); dbg!(&endpoint);
let action = match serde_json::from_value::<activitypub::Action<serde_json::Value>>(endpoint.payload.clone()) { let action = match serde_json::from_value::<activitypub::Action<serde_json::Value>>(endpoint.payload.clone()) {
Ok(action) => action, Ok(action) => action,
Err(e) => return ( Err(e) => {
StatusCode::BAD_REQUEST, track_request("POST", "relay", "bad_action");
format!("Bad action: {:?}", e) return (
).into_response(), StatusCode::BAD_REQUEST,
format!("Bad action: {:?}", e)
).into_response();
}
}; };
let object_type = action.object let object_type = action.object
.and_then(|object| object.get("type").cloned()) .and_then(|object| object.get("type").cloned())
@ -168,14 +184,19 @@ async fn post_relay(
&endpoint.actor.inbox, &endpoint.actor.inbox,
&target.uri(), &target.uri(),
).await { ).await {
Ok(()) => {} Ok(()) => {
Err(e) => track_request("POST", "relay", "follow");
}
Err(e) => {
// duplicate key constraint // duplicate key constraint
tracing::error!("add_follow: {}", e), tracing::error!("add_follow: {}", e);
track_request("POST", "relay", "follow_error");
}
} }
} }
Err(e) => { Err(e) => {
tracing::error!("post accept: {}", e); tracing::error!("post accept: {}", e);
track_request("POST", "relay", "follow_accept_error");
} }
} }
}); });
@ -189,19 +210,23 @@ async fn post_relay(
&endpoint.actor.id, &endpoint.actor.id,
&target.uri(), &target.uri(),
).await { ).await {
Ok(()) => Ok(()) => {
track_request("POST", "relay", "unfollow");
(StatusCode::ACCEPTED, (StatusCode::ACCEPTED,
[("content-type", "application/activity+json")], [("content-type", "application/activity+json")],
"{}" "{}"
).into_response(), ).into_response()
}
Err(e) => { Err(e) => {
tracing::error!("del_follow: {}", e); tracing::error!("del_follow: {}", e);
track_request("POST", "relay", "unfollow_error");
(StatusCode::INTERNAL_SERVER_ERROR, (StatusCode::INTERNAL_SERVER_ERROR,
format!("{}", e) format!("{}", e)
).into_response() ).into_response()
} }
} }
} else { } else {
track_request("POST", "relay", "unrecognized");
(StatusCode::BAD_REQUEST, "Not a recognized request").into_response() (StatusCode::BAD_REQUEST, "Not a recognized request").into_response()
} }
} }
@ -223,6 +248,13 @@ async fn main() {
&std::env::args().nth(1) &std::env::args().nth(1)
.expect("Call with config.yaml") .expect("Call with config.yaml")
); );
let recorder = PrometheusBuilder::new()
.add_global_label("application", env!("CARGO_PKG_NAME"))
.idle_timeout(MetricKindMask::ALL, Some(Duration::from_secs(600)))
.install_recorder()
.unwrap();
let database = db::Database::connect(&config.db).await; let database = db::Database::connect(&config.db).await;
let stream_rx = stream::spawn(config.upstream.clone()); let stream_rx = stream::spawn(config.upstream.clone());
@ -246,6 +278,9 @@ async fn main() {
.route("/tag/:tag", get(get_tag_actor).post(post_tag_relay)) .route("/tag/:tag", get(get_tag_actor).post(post_tag_relay))
.route("/instance/:instance", get(get_instance_actor).post(post_instance_relay)) .route("/instance/:instance", get(get_instance_actor).post(post_instance_relay))
.route("/.well-known/webfinger", get(webfinger)) .route("/.well-known/webfinger", get(webfinger))
.route("/metrics", get(|| async move {
recorder.render().into_response()
}))
.with_state(State { .with_state(State {
database, database,
client, client,

View File

@ -1,5 +1,5 @@
use std::{sync::Arc, collections::HashSet}; use std::{sync::Arc, collections::HashSet};
use metrics::increment_counter;
use serde::Deserialize; use serde::Deserialize;
use serde_json::json; use serde_json::json;
use sigh::PrivateKey; use sigh::PrivateKey;
@ -84,7 +84,10 @@ pub fn spawn(
let post_url = match post.url { let post_url = match post.url {
Some(url) => url, Some(url) => url,
// skip reposts // skip reposts
None => continue, None => {
increment_counter!("post", "action" => "skip");
continue;
}
}; };
let mut seen_actors = HashSet::new(); let mut seen_actors = HashSet::new();
let mut seen_inboxes = HashSet::new(); let mut seen_inboxes = HashSet::new();
@ -117,6 +120,7 @@ pub fn spawn(
let private_key_ = private_key.clone(); let private_key_ = private_key.clone();
tracing::debug!("relay {} to {}", actor_id, inbox); tracing::debug!("relay {} to {}", actor_id, inbox);
tokio::spawn(async move { tokio::spawn(async move {
increment_counter!("relay", "target" => inbox.clone());
if let Err(e) = send::send_raw( if let Err(e) = send::send_raw(
&client_, &inbox, &client_, &inbox,
&key_id, &private_key_, body_ &key_id, &private_key_, body_
@ -135,6 +139,11 @@ pub fn spawn(
seen_actors.insert(actor); seen_actors.insert(actor);
} }
if seen_inboxes.is_empty() {
increment_counter!("post", "action" => "no_relay");
} else {
increment_counter!("post", "action" => "relay");
}
} }
}); });
} }

View File

@ -1,8 +1,10 @@
use std::{sync::Arc}; use std::{
sync::Arc,
time::Instant,
};
use http::StatusCode; use http::StatusCode;
use http_digest_headers::{DigestHeader, DigestMethod}; use http_digest_headers::{DigestHeader, DigestMethod};
use metrics::histogram;
use serde::Serialize; use serde::Serialize;
use sigh::{PrivateKey, SigningConfig, alg::RsaSha256}; use sigh::{PrivateKey, SigningConfig, alg::RsaSha256};
@ -60,24 +62,31 @@ pub async fn send_raw(
let url = reqwest::Url::parse(uri) let url = reqwest::Url::parse(uri)
.map_err(|_| SendError::InvalidUri)?; .map_err(|_| SendError::InvalidUri)?;
let host = format!("{}", url.host().ok_or(SendError::InvalidUri)?);
let mut req = http::Request::builder() let mut req = http::Request::builder()
.method("POST") .method("POST")
.uri(uri) .uri(uri)
.header("host", format!("{}", url.host().ok_or(SendError::InvalidUri)?)) .header("host", &host)
.header("content-type", "application/activity+json") .header("content-type", "application/activity+json")
.header("date", chrono::Utc::now().to_rfc2822() .header("date", chrono::Utc::now().to_rfc2822()
.replace("+0000", "GMT")) .replace("+0000", "GMT"))
.header("digest", digest_header) .header("digest", digest_header)
.body(body.as_ref().clone()) .body(body.as_ref().clone())
.map_err(SendError::HttpReq)?; .map_err(SendError::HttpReq)?;
let t1 = Instant::now();
SigningConfig::new(RsaSha256, private_key, key_id) SigningConfig::new(RsaSha256, private_key, key_id)
.sign(&mut req)?; .sign(&mut req)?;
let t2 = Instant::now();
let req: reqwest::Request = req.try_into()?; let req: reqwest::Request = req.try_into()?;
let res = client.execute(req) let res = client.execute(req)
.await?; .await?;
let t3 = Instant::now();
histogram!("sign_req", t2 - t1);
if res.status() >= StatusCode::OK && res.status() < StatusCode::MULTIPLE_CHOICES { if res.status() >= StatusCode::OK && res.status() < StatusCode::MULTIPLE_CHOICES {
histogram!("req", t3 - t2, "res" => "ok", "host" => host);
Ok(()) Ok(())
} else { } else {
histogram!("req", t3 - t2, "res" => "err", "host" => host);
let response = res.text().await?; let response = res.text().await?;
Err(SendError::Response(response)) Err(SendError::Response(response))
} }