From 934aa1461aecd6db0e54ae84d87f48c1a12196cd Mon Sep 17 00:00:00 2001 From: Astro Date: Fri, 30 Dec 2022 02:59:09 +0100 Subject: [PATCH] gatherer/http_server: add metrics --- Cargo.lock | 4 +++ gatherer/Cargo.toml | 4 +++ gatherer/src/http_server.rs | 50 +++++++++++++++++++++++++++---------- gatherer/src/main.rs | 11 ++++++++ 4 files changed, 56 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0f50d46..958f3c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -298,7 +298,11 @@ dependencies = [ "cave", "chrono", "futures", + "http-body", "hyper", + "metrics", + "metrics-exporter-prometheus", + "metrics-util", "redis", "serde", "serde_yaml", diff --git a/gatherer/Cargo.toml b/gatherer/Cargo.toml index 7ac00c7..ee4a8d4 100644 --- a/gatherer/Cargo.toml +++ b/gatherer/Cargo.toml @@ -16,4 +16,8 @@ hyper = { version = "0.14", features = ["stream"] } axum = "0.5" axum-macros = "0.2" axum-extra = { version = "0.3", features = ["spa"] } +http-body = "0.4" askama = "0.11" +metrics = "0.20" +metrics-util = "0.14" +metrics-exporter-prometheus = "0.11" diff --git a/gatherer/src/http_server.rs b/gatherer/src/http_server.rs index 37b5dbf..9bc2b5d 100644 --- a/gatherer/src/http_server.rs +++ b/gatherer/src/http_server.rs @@ -1,7 +1,7 @@ use std::{ collections::{HashMap, HashSet}, ops::Deref, - net::SocketAddr, + net::SocketAddr, time::Instant, }; use askama::Template; use axum::{ @@ -11,7 +11,9 @@ use axum::{ http::{StatusCode, Request, Response}, response::IntoResponse, routing::get, - Router, middleware::{Next, self}, body::{Body, self}, + Router, + middleware::{Next, self}, + body::{Body, Bytes}, }; use axum_extra::routing::SpaRouter; use futures::{stream, StreamExt}; @@ -21,6 +23,8 @@ use cave::{ store::{Store, TREND_POOL_SIZE}, PERIODS, systemd, }; +use http_body::combinators::UnsyncBoxBody; +use metrics_exporter_prometheus::PrometheusHandle; use crate::{ html_template::HtmlTemplate, trends::{TrendAnalyzer, TrendsResults}, @@ -131,20 +135,32 @@ impl TrendsPage { } } -#[axum_macros::debug_handler] -async fn home(Extension(state): Extension) -> impl IntoResponse { - TrendsPage::generate(None, state) - .await - .template() +async fn trends_page( + state: ServerState, + language: Option, +) -> Response> { + let t1 = Instant::now(); + let lang = if language.is_some() { "some" } else { "any" }; + let page = TrendsPage::generate(language, state) + .await; + let t2 = Instant::now(); + metrics::histogram!("trends_page_time", t2 - t1, "step" => "query", "lang" => lang); + let res = page.template().into_response(); + let t3 = Instant::now(); + metrics::histogram!("trends_page_time", t3 - t2, "step" => "render", "lang" => lang); + metrics::increment_counter!("trends_page_requests", "lang" => lang); + res +} + +async fn home(Extension(state): Extension) -> Response> { + trends_page(state, None).await } async fn in_language( Extension(state): Extension, extract::Path(language): extract::Path, -) -> impl IntoResponse { - TrendsPage::generate(Some(language), state) - .await - .template() +) -> Response> { + trends_page(state, Some(language)).await } async fn streaming_api( @@ -164,7 +180,7 @@ async fn streaming_api( .chain( firehose.flat_map(move |data| data.map_or_else(stream_err, stream_ok)) ); - let body = body::boxed(hyper::body::Body::wrap_stream(stream)); + let body = axum::body::boxed(hyper::body::Body::wrap_stream(stream)); Response::builder() .status(200) @@ -192,7 +208,12 @@ async fn print_request( Ok(res) } -pub async fn start(listen_port: u16, store: Store, firehose_factory: FirehoseFactory) { +pub async fn start( + listen_port: u16, + store: Store, + firehose_factory: FirehoseFactory, + recorder: PrometheusHandle, +) { cave::systemd::status("Starting HTTP server"); // build our application with some routes @@ -202,6 +223,9 @@ pub async fn start(listen_port: u16, store: Store, firehose_factory: FirehoseFac .route("/api/v1/streaming/public", get(streaming_api)) .layer(Extension(ServerState { store })) .layer(Extension(firehose_factory)) + .route("/metrics", get(|| async move { + recorder.render().into_response() + })) .layer(middleware::from_fn(print_request)) .merge(SpaRouter::new("/assets", "assets")); diff --git a/gatherer/src/main.rs b/gatherer/src/main.rs index 2fd3f85..288cf19 100644 --- a/gatherer/src/main.rs +++ b/gatherer/src/main.rs @@ -1,3 +1,7 @@ +use std::time::Duration; +use metrics_util::MetricKindMask; +use metrics_exporter_prometheus::PrometheusBuilder; + use cave::{ config::LoadConfig, firehose::FirehoseFactory, @@ -15,6 +19,12 @@ async fn main() { let config = config::Config::load(); + let recorder = PrometheusBuilder::new() + .add_global_label("application", env!("CARGO_PKG_NAME")) + .idle_timeout(MetricKindMask::ALL, Some(Duration::from_secs(600))) + .install_recorder() + .unwrap(); + cave::systemd::status("Starting redis client"); let store = cave::store::Store::new(8, config.redis.clone()).await; @@ -24,6 +34,7 @@ async fn main() { config.listen_port, store, firehose_factory, + recorder, ); cave::systemd::ready(); http.await;