hunter: add metrics for prometheus

This commit is contained in:
Astro 2022-12-26 03:44:42 +01:00
parent f5fb098bd2
commit ce3f184d04
7 changed files with 224 additions and 2 deletions

181
Cargo.lock generated
View File

@ -8,6 +8,17 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "ahash"
version = "0.7.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47"
dependencies = [
"getrandom",
"once_cell",
"version_check",
]
[[package]]
name = "aho-corasick"
version = "0.7.20"
@ -376,6 +387,9 @@ dependencies = [
"chrono",
"futures",
"jemallocator",
"metrics",
"metrics-exporter-prometheus",
"metrics-util",
"rand",
"reqwest",
"serde",
@ -519,6 +533,19 @@ dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-epoch"
version = "0.9.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "01a9af1f4c2ef74bb8aa1f7e19706bc72d03598c8a570bb5de72243c7a9d9d5a"
dependencies = [
"autocfg",
"cfg-if",
"crossbeam-utils",
"memoffset",
"scopeguard",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.14"
@ -603,6 +630,12 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "endian-type"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d"
[[package]]
name = "enum-as-inner"
version = "0.5.1"
@ -832,6 +865,9 @@ name = "hashbrown"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888"
dependencies = [
"ahash",
]
[[package]]
name = "hdrhistogram"
@ -1166,6 +1202,15 @@ dependencies = [
"linked-hash-map",
]
[[package]]
name = "mach"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b823e83b2affd8f40a9ee8c29dbc56404c1e34cd2710921f2801e2cf29527afa"
dependencies = [
"libc",
]
[[package]]
name = "match_cfg"
version = "0.1.0"
@ -1205,6 +1250,77 @@ version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d"
[[package]]
name = "memoffset"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5de893c32cde5f383baa4c04c5d6dbdd735cfd4a794b0debdb2bb1b421da5ff4"
dependencies = [
"autocfg",
]
[[package]]
name = "metrics"
version = "0.20.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b9b8653cec6897f73b519a43fba5ee3d50f62fe9af80b428accdcc093b4a849"
dependencies = [
"ahash",
"metrics-macros",
"portable-atomic",
]
[[package]]
name = "metrics-exporter-prometheus"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8603921e1f54ef386189335f288441af761e0fc61bcb552168d9cedfe63ebc70"
dependencies = [
"hyper",
"indexmap",
"ipnet",
"metrics",
"metrics-util",
"parking_lot",
"portable-atomic",
"quanta",
"thiserror",
"tokio",
"tracing",
]
[[package]]
name = "metrics-macros"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "731f8ecebd9f3a4aa847dfe75455e4757a45da40a7793d2f0b1f9b6ed18b23f3"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "metrics-util"
version = "0.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7d24dc2dbae22bff6f1f9326ffce828c9f07ef9cc1e8002e5279f845432a30a"
dependencies = [
"aho-corasick",
"crossbeam-epoch",
"crossbeam-utils",
"hashbrown",
"indexmap",
"metrics",
"num_cpus",
"ordered-float",
"parking_lot",
"portable-atomic",
"quanta",
"radix_trie",
"sketches-ddsketch",
]
[[package]]
name = "mime"
version = "0.3.16"
@ -1266,6 +1382,15 @@ dependencies = [
"tempfile",
]
[[package]]
name = "nibble_vec"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a5d83df9f36fe23f0c3648c6bbb8b0298bb5f1939c8f2704431371f4b84d43"
dependencies = [
"smallvec",
]
[[package]]
name = "nom"
version = "7.1.1"
@ -1356,6 +1481,15 @@ dependencies = [
"vcpkg",
]
[[package]]
name = "ordered-float"
version = "2.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7940cf2ca942593318d07fcf2596cdca60a85c9e7fab408a5e21a4f9dcd40d87"
dependencies = [
"num-traits",
]
[[package]]
name = "parking_lot"
version = "0.12.1"
@ -1423,6 +1557,12 @@ version = "0.3.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ac9a59f73473f1b8d852421e59e64809f025994837ef743615c6d0c5b305160"
[[package]]
name = "portable-atomic"
version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26f6a7b87c2e435a3241addceeeff740ff8b7e76b74c13bf9acb17fa454ea00b"
[[package]]
name = "ppv-lite86"
version = "0.2.17"
@ -1471,6 +1611,22 @@ dependencies = [
"prost",
]
[[package]]
name = "quanta"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b7e31331286705f455e56cca62e0e717158474ff02b7936c1fa596d983f4ae27"
dependencies = [
"crossbeam-utils",
"libc",
"mach",
"once_cell",
"raw-cpuid",
"wasi 0.10.0+wasi-snapshot-preview1",
"web-sys",
"winapi",
]
[[package]]
name = "quick-error"
version = "1.2.3"
@ -1486,6 +1642,16 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "radix_trie"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c069c179fcdc6a2fe24d8d18305cf085fdbd4f922c041943e203685d6a1c58fd"
dependencies = [
"endian-type",
"nibble_vec",
]
[[package]]
name = "rand"
version = "0.8.5"
@ -1516,6 +1682,15 @@ dependencies = [
"getrandom",
]
[[package]]
name = "raw-cpuid"
version = "10.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6823ea29436221176fe662da99998ad3b4db2c7f31e7b6f5fe43adccd6320bb"
dependencies = [
"bitflags",
]
[[package]]
name = "redis"
version = "0.22.1"
@ -1769,6 +1944,12 @@ dependencies = [
"libc",
]
[[package]]
name = "sketches-ddsketch"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ceb945e54128e09c43d8e4f1277851bd5044c6fc540bbaa2ad888f60b3da9ae7"
[[package]]
name = "slab"
version = "0.4.7"

View File

@ -16,6 +16,9 @@ tracing = "0.1"
cave = { path = "../cave" }
rand = "0.8"
texting_robots = "0.2"
metrics = "0.20"
metrics-util = "0.14"
metrics-exporter-prometheus = "0.11"
[target.'cfg(not(target_env = "msvc"))'.dependencies]
jemallocator = "0.5"

View File

@ -27,3 +27,5 @@ hosts:
# - uwu.social
max_workers: 8
prometheus_port: 9101

View File

@ -3,4 +3,5 @@ pub struct Config {
pub redis: String,
pub hosts: Vec<String>,
pub max_workers: usize,
pub prometheus_port: u16,
}

View File

@ -1,5 +1,7 @@
use std::time::Duration;
use futures::{StreamExt, pin_mut};
use metrics_util::MetricKindMask;
use metrics_exporter_prometheus::PrometheusBuilder;
use tokio::time::timeout;
use cave::config::LoadConfig;
@ -25,6 +27,13 @@ async fn main() {
let config = config::Config::load();
PrometheusBuilder::new()
.with_http_listener(([0; 8], config.prometheus_port))
.add_global_label("application", env!("CARGO_PKG_NAME"))
.idle_timeout(MetricKindMask::ALL, Some(Duration::from_secs(600)))
.install()
.unwrap();
let mut store = cave::store::Store::new(16, config.redis).await;
let posts_cache = posts_cache::PostsCache::new(65536);
@ -85,6 +94,9 @@ async fn main() {
loop {
tracing::trace!("{} workers active, queued {} of {}", workers_active, scheduler.queue_len(), scheduler.size());
cave::systemd::status(&format!("{} workers active, queued {} of {}", workers_active, scheduler.queue_len(), scheduler.size()));
metrics::gauge!("hunter_workers", workers_active as f64, "worker" => "active");
metrics::gauge!("hunter_workers", scheduler.queue_len() as f64, "worker" => "queued");
metrics::gauge!("hunter_workers", scheduler.size() as f64, "worker" => "total");
let next_task = if workers_active < config.max_workers {
scheduler.dequeue()
} else {

View File

@ -2,7 +2,7 @@ use std::collections::HashSet;
use std::future::Future;
use std::ops::Deref;
use std::sync::{Arc, RwLock};
use std::time::Duration;
use std::time::{Duration, Instant};
use cave::{
feed::{Feed, EncodablePost},
store::Store,
@ -24,11 +24,13 @@ impl RobotsTxt {
) -> Self {
let url = format!("https://{}/robots.txt", host);
let robot = async {
metrics::increment_gauge!("hunter_requests", 1.0, "type" => "robotstxt");
let body = client.get(url)
.send().await
.ok()?
.text().await
.ok()?;
metrics::decrement_gauge!("hunter_requests", 1.0, "type" => "robotstxt");
texting_robots::Robot::new(
env!("CARGO_PKG_NAME"),
body.as_bytes(),
@ -96,7 +98,9 @@ pub async fn run(
// Process stream
if let Ok(stream) = stream_result {
tracing::info!("Processing stream for {}", host);
metrics::increment_gauge!("hunter_requests", 1.0, "type" => "stream");
stream.await;
metrics::decrement_gauge!("hunter_requests", 1.0, "type" => "stream");
tracing::warn!("Ended stream for {}", host);
}
@ -130,7 +134,14 @@ async fn fetch_timeline(
// free as early as possible
drop(robots_txt);
match Feed::fetch(&client, &url).await {
metrics::increment_gauge!("hunter_requests", 1.0, "type" => "timeline");
let t1 = Instant::now();
let result = Feed::fetch(&client, &url).await;
let t2 = Instant::now();
metrics::decrement_gauge!("hunter_requests", 1.0, "type" => "timeline");
metrics::histogram!("hunter_fetch_seconds", t2 - t1, "result" => if result.is_ok() { "ok" } else { "error" });
match result {
Ok(feed) => {
let mean_interval = feed.mean_post_interval();
@ -167,6 +178,8 @@ async fn process_posts(
// potentially save a round-trip to redis with an in-process cache
if ! posts_cache.insert(post.uri.clone()) {
let t1 = Instant::now();
// introduce instances from mentions
for mention in &post.mentions {
if let Some(user_host) = mention.user_host() {
@ -191,6 +204,9 @@ async fn process_posts(
} else {
tracing::warn!("drop repost ({:?} on {})", post.account.host(), host);
}
let t2 = Instant::now();
metrics::histogram!("hunter_post_process_seconds", t2 - t1)
}
}
tracing::info!("{}: {}/{} new posts", host, new_posts, posts_len);
@ -225,10 +241,12 @@ async fn open_stream(
drop(robots_txt);
let posts_cache = posts_cache.clone();
metrics::increment_gauge!("hunter_requests", 1.0, "type" => "stream_open");
let stream = Feed::stream(client, &url).await
.map_err(|e| {
tracing::error!("Stream error for {}: {}", host, e);
})?;
metrics::decrement_gauge!("hunter_requests", 1.0, "type" => "stream_open");
Ok(stream.for_each(move |post| {
let message_tx = message_tx.clone();

View File

@ -7,6 +7,7 @@ let
redis = "redis://127.0.0.1:${toString cfg.redis.port}/";
hosts = [ "mastodon.social" "fosstodon.org" "chaos.social" "dresden.network" ];
max_workers = 16;
prometheus_port = 9101;
};
hunterSettings = lib.recursiveUpdate hunterDefaultSettings cfg.hunter.settings;
@ -97,6 +98,10 @@ in
DefaultLimitNOFILE=${toString limitNOFILE}
'';
networking.firewall.allowedTCPPorts = [
hunterSettings.prometheus_port
];
services.redis.servers.caveman = lib.mkIf cfg.hunter.enable {
enable = true;
port = cfg.redis.port;