diff --git a/Cargo.lock b/Cargo.lock index 81ea7e4..2f6fd10 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8,6 +8,17 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "ahash" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcb51a0695d8f838b1ee009b3fbf66bda078cd64590202a864a8f3e8c4315c47" +dependencies = [ + "getrandom", + "once_cell", + "version_check", +] + [[package]] name = "aho-corasick" version = "0.7.20" @@ -376,6 +387,9 @@ dependencies = [ "chrono", "futures", "jemallocator", + "metrics", + "metrics-exporter-prometheus", + "metrics-util", "rand", "reqwest", "serde", @@ -519,6 +533,19 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-epoch" +version = "0.9.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01a9af1f4c2ef74bb8aa1f7e19706bc72d03598c8a570bb5de72243c7a9d9d5a" +dependencies = [ + "autocfg", + "cfg-if", + "crossbeam-utils", + "memoffset", + "scopeguard", +] + [[package]] name = "crossbeam-utils" version = "0.8.14" @@ -603,6 +630,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "endian-type" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" + [[package]] name = "enum-as-inner" version = "0.5.1" @@ -832,6 +865,9 @@ name = "hashbrown" version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +dependencies = [ + "ahash", +] [[package]] name = "hdrhistogram" @@ -1166,6 +1202,15 @@ dependencies = [ "linked-hash-map", ] +[[package]] +name = "mach" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b823e83b2affd8f40a9ee8c29dbc56404c1e34cd2710921f2801e2cf29527afa" +dependencies = [ + "libc", +] + [[package]] name = "match_cfg" version = "0.1.0" @@ -1205,6 +1250,77 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" +[[package]] +name = "memoffset" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5de893c32cde5f383baa4c04c5d6dbdd735cfd4a794b0debdb2bb1b421da5ff4" +dependencies = [ + "autocfg", +] + +[[package]] +name = "metrics" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b9b8653cec6897f73b519a43fba5ee3d50f62fe9af80b428accdcc093b4a849" +dependencies = [ + "ahash", + "metrics-macros", + "portable-atomic", +] + +[[package]] +name = "metrics-exporter-prometheus" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8603921e1f54ef386189335f288441af761e0fc61bcb552168d9cedfe63ebc70" +dependencies = [ + "hyper", + "indexmap", + "ipnet", + "metrics", + "metrics-util", + "parking_lot", + "portable-atomic", + "quanta", + "thiserror", + "tokio", + "tracing", +] + +[[package]] +name = "metrics-macros" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "731f8ecebd9f3a4aa847dfe75455e4757a45da40a7793d2f0b1f9b6ed18b23f3" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "metrics-util" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7d24dc2dbae22bff6f1f9326ffce828c9f07ef9cc1e8002e5279f845432a30a" +dependencies = [ + "aho-corasick", + "crossbeam-epoch", + "crossbeam-utils", + "hashbrown", + "indexmap", + "metrics", + "num_cpus", + "ordered-float", + "parking_lot", + "portable-atomic", + "quanta", + "radix_trie", + "sketches-ddsketch", +] + [[package]] name = "mime" version = "0.3.16" @@ -1266,6 +1382,15 @@ dependencies = [ "tempfile", ] +[[package]] +name = "nibble_vec" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a5d83df9f36fe23f0c3648c6bbb8b0298bb5f1939c8f2704431371f4b84d43" +dependencies = [ + "smallvec", +] + [[package]] name = "nom" version = "7.1.1" @@ -1356,6 +1481,15 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "ordered-float" +version = "2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7940cf2ca942593318d07fcf2596cdca60a85c9e7fab408a5e21a4f9dcd40d87" +dependencies = [ + "num-traits", +] + [[package]] name = "parking_lot" version = "0.12.1" @@ -1423,6 +1557,12 @@ version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6ac9a59f73473f1b8d852421e59e64809f025994837ef743615c6d0c5b305160" +[[package]] +name = "portable-atomic" +version = "0.3.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26f6a7b87c2e435a3241addceeeff740ff8b7e76b74c13bf9acb17fa454ea00b" + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -1471,6 +1611,22 @@ dependencies = [ "prost", ] +[[package]] +name = "quanta" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7e31331286705f455e56cca62e0e717158474ff02b7936c1fa596d983f4ae27" +dependencies = [ + "crossbeam-utils", + "libc", + "mach", + "once_cell", + "raw-cpuid", + "wasi 0.10.0+wasi-snapshot-preview1", + "web-sys", + "winapi", +] + [[package]] name = "quick-error" version = "1.2.3" @@ -1486,6 +1642,16 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "radix_trie" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c069c179fcdc6a2fe24d8d18305cf085fdbd4f922c041943e203685d6a1c58fd" +dependencies = [ + "endian-type", + "nibble_vec", +] + [[package]] name = "rand" version = "0.8.5" @@ -1516,6 +1682,15 @@ dependencies = [ "getrandom", ] +[[package]] +name = "raw-cpuid" +version = "10.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6823ea29436221176fe662da99998ad3b4db2c7f31e7b6f5fe43adccd6320bb" +dependencies = [ + "bitflags", +] + [[package]] name = "redis" version = "0.22.1" @@ -1769,6 +1944,12 @@ dependencies = [ "libc", ] +[[package]] +name = "sketches-ddsketch" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ceb945e54128e09c43d8e4f1277851bd5044c6fc540bbaa2ad888f60b3da9ae7" + [[package]] name = "slab" version = "0.4.7" diff --git a/hunter/Cargo.toml b/hunter/Cargo.toml index 3a50653..efc03ef 100644 --- a/hunter/Cargo.toml +++ b/hunter/Cargo.toml @@ -16,6 +16,9 @@ tracing = "0.1" cave = { path = "../cave" } rand = "0.8" texting_robots = "0.2" +metrics = "0.20" +metrics-util = "0.14" +metrics-exporter-prometheus = "0.11" [target.'cfg(not(target_env = "msvc"))'.dependencies] jemallocator = "0.5" diff --git a/hunter/config.yaml b/hunter/config.yaml index d1e695a..11d3975 100644 --- a/hunter/config.yaml +++ b/hunter/config.yaml @@ -27,3 +27,5 @@ hosts: # - uwu.social max_workers: 8 + +prometheus_port: 9101 diff --git a/hunter/src/config.rs b/hunter/src/config.rs index 47744f0..5258ecb 100644 --- a/hunter/src/config.rs +++ b/hunter/src/config.rs @@ -3,4 +3,5 @@ pub struct Config { pub redis: String, pub hosts: Vec, pub max_workers: usize, + pub prometheus_port: u16, } diff --git a/hunter/src/main.rs b/hunter/src/main.rs index 7400fa7..c9bd6b4 100644 --- a/hunter/src/main.rs +++ b/hunter/src/main.rs @@ -1,5 +1,7 @@ use std::time::Duration; use futures::{StreamExt, pin_mut}; +use metrics_util::MetricKindMask; +use metrics_exporter_prometheus::PrometheusBuilder; use tokio::time::timeout; use cave::config::LoadConfig; @@ -25,6 +27,13 @@ async fn main() { let config = config::Config::load(); + PrometheusBuilder::new() + .with_http_listener(([0; 8], config.prometheus_port)) + .add_global_label("application", env!("CARGO_PKG_NAME")) + .idle_timeout(MetricKindMask::ALL, Some(Duration::from_secs(600))) + .install() + .unwrap(); + let mut store = cave::store::Store::new(16, config.redis).await; let posts_cache = posts_cache::PostsCache::new(65536); @@ -85,6 +94,9 @@ async fn main() { loop { tracing::trace!("{} workers active, queued {} of {}", workers_active, scheduler.queue_len(), scheduler.size()); cave::systemd::status(&format!("{} workers active, queued {} of {}", workers_active, scheduler.queue_len(), scheduler.size())); + metrics::gauge!("hunter_workers", workers_active as f64, "worker" => "active"); + metrics::gauge!("hunter_workers", scheduler.queue_len() as f64, "worker" => "queued"); + metrics::gauge!("hunter_workers", scheduler.size() as f64, "worker" => "total"); let next_task = if workers_active < config.max_workers { scheduler.dequeue() } else { diff --git a/hunter/src/worker.rs b/hunter/src/worker.rs index 9529bb1..3d47f5a 100644 --- a/hunter/src/worker.rs +++ b/hunter/src/worker.rs @@ -2,7 +2,7 @@ use std::collections::HashSet; use std::future::Future; use std::ops::Deref; use std::sync::{Arc, RwLock}; -use std::time::Duration; +use std::time::{Duration, Instant}; use cave::{ feed::{Feed, EncodablePost}, store::Store, @@ -24,11 +24,13 @@ impl RobotsTxt { ) -> Self { let url = format!("https://{}/robots.txt", host); let robot = async { + metrics::increment_gauge!("hunter_requests", 1.0, "type" => "robotstxt"); let body = client.get(url) .send().await .ok()? .text().await .ok()?; + metrics::decrement_gauge!("hunter_requests", 1.0, "type" => "robotstxt"); texting_robots::Robot::new( env!("CARGO_PKG_NAME"), body.as_bytes(), @@ -96,7 +98,9 @@ pub async fn run( // Process stream if let Ok(stream) = stream_result { tracing::info!("Processing stream for {}", host); + metrics::increment_gauge!("hunter_requests", 1.0, "type" => "stream"); stream.await; + metrics::decrement_gauge!("hunter_requests", 1.0, "type" => "stream"); tracing::warn!("Ended stream for {}", host); } @@ -130,7 +134,14 @@ async fn fetch_timeline( // free as early as possible drop(robots_txt); - match Feed::fetch(&client, &url).await { + metrics::increment_gauge!("hunter_requests", 1.0, "type" => "timeline"); + let t1 = Instant::now(); + let result = Feed::fetch(&client, &url).await; + let t2 = Instant::now(); + metrics::decrement_gauge!("hunter_requests", 1.0, "type" => "timeline"); + metrics::histogram!("hunter_fetch_seconds", t2 - t1, "result" => if result.is_ok() { "ok" } else { "error" }); + + match result { Ok(feed) => { let mean_interval = feed.mean_post_interval(); @@ -167,6 +178,8 @@ async fn process_posts( // potentially save a round-trip to redis with an in-process cache if ! posts_cache.insert(post.uri.clone()) { + let t1 = Instant::now(); + // introduce instances from mentions for mention in &post.mentions { if let Some(user_host) = mention.user_host() { @@ -191,6 +204,9 @@ async fn process_posts( } else { tracing::warn!("drop repost ({:?} on {})", post.account.host(), host); } + + let t2 = Instant::now(); + metrics::histogram!("hunter_post_process_seconds", t2 - t1) } } tracing::info!("{}: {}/{} new posts", host, new_posts, posts_len); @@ -225,10 +241,12 @@ async fn open_stream( drop(robots_txt); let posts_cache = posts_cache.clone(); + metrics::increment_gauge!("hunter_requests", 1.0, "type" => "stream_open"); let stream = Feed::stream(client, &url).await .map_err(|e| { tracing::error!("Stream error for {}: {}", host, e); })?; + metrics::decrement_gauge!("hunter_requests", 1.0, "type" => "stream_open"); Ok(stream.for_each(move |post| { let message_tx = message_tx.clone(); diff --git a/nixos-module.nix b/nixos-module.nix index f726b03..b995c1b 100644 --- a/nixos-module.nix +++ b/nixos-module.nix @@ -7,6 +7,7 @@ let redis = "redis://127.0.0.1:${toString cfg.redis.port}/"; hosts = [ "mastodon.social" "fosstodon.org" "chaos.social" "dresden.network" ]; max_workers = 16; + prometheus_port = 9101; }; hunterSettings = lib.recursiveUpdate hunterDefaultSettings cfg.hunter.settings; @@ -97,6 +98,10 @@ in DefaultLimitNOFILE=${toString limitNOFILE} ''; + networking.firewall.allowedTCPPorts = [ + hunterSettings.prometheus_port + ]; + services.redis.servers.caveman = lib.mkIf cfg.hunter.enable { enable = true; port = cfg.redis.port;