gatherer/http_server: add metrics
This commit is contained in:
parent
b05d62db73
commit
934aa1461a
|
@ -298,7 +298,11 @@ dependencies = [
|
|||
"cave",
|
||||
"chrono",
|
||||
"futures",
|
||||
"http-body",
|
||||
"hyper",
|
||||
"metrics",
|
||||
"metrics-exporter-prometheus",
|
||||
"metrics-util",
|
||||
"redis",
|
||||
"serde",
|
||||
"serde_yaml",
|
||||
|
|
|
@ -16,4 +16,8 @@ hyper = { version = "0.14", features = ["stream"] }
|
|||
axum = "0.5"
|
||||
axum-macros = "0.2"
|
||||
axum-extra = { version = "0.3", features = ["spa"] }
|
||||
http-body = "0.4"
|
||||
askama = "0.11"
|
||||
metrics = "0.20"
|
||||
metrics-util = "0.14"
|
||||
metrics-exporter-prometheus = "0.11"
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use std::{
|
||||
collections::{HashMap, HashSet},
|
||||
ops::Deref,
|
||||
net::SocketAddr,
|
||||
net::SocketAddr, time::Instant,
|
||||
};
|
||||
use askama::Template;
|
||||
use axum::{
|
||||
|
@ -11,7 +11,9 @@ use axum::{
|
|||
http::{StatusCode, Request, Response},
|
||||
response::IntoResponse,
|
||||
routing::get,
|
||||
Router, middleware::{Next, self}, body::{Body, self},
|
||||
Router,
|
||||
middleware::{Next, self},
|
||||
body::{Body, Bytes},
|
||||
};
|
||||
use axum_extra::routing::SpaRouter;
|
||||
use futures::{stream, StreamExt};
|
||||
|
@ -21,6 +23,8 @@ use cave::{
|
|||
store::{Store, TREND_POOL_SIZE}, PERIODS,
|
||||
systemd,
|
||||
};
|
||||
use http_body::combinators::UnsyncBoxBody;
|
||||
use metrics_exporter_prometheus::PrometheusHandle;
|
||||
use crate::{
|
||||
html_template::HtmlTemplate,
|
||||
trends::{TrendAnalyzer, TrendsResults},
|
||||
|
@ -131,20 +135,32 @@ impl TrendsPage {
|
|||
}
|
||||
}
|
||||
|
||||
#[axum_macros::debug_handler]
|
||||
async fn home(Extension(state): Extension<ServerState>) -> impl IntoResponse {
|
||||
TrendsPage::generate(None, state)
|
||||
.await
|
||||
.template()
|
||||
async fn trends_page(
|
||||
state: ServerState,
|
||||
language: Option<String>,
|
||||
) -> Response<UnsyncBoxBody<Bytes, axum::Error>> {
|
||||
let t1 = Instant::now();
|
||||
let lang = if language.is_some() { "some" } else { "any" };
|
||||
let page = TrendsPage::generate(language, state)
|
||||
.await;
|
||||
let t2 = Instant::now();
|
||||
metrics::histogram!("trends_page_time", t2 - t1, "step" => "query", "lang" => lang);
|
||||
let res = page.template().into_response();
|
||||
let t3 = Instant::now();
|
||||
metrics::histogram!("trends_page_time", t3 - t2, "step" => "render", "lang" => lang);
|
||||
metrics::increment_counter!("trends_page_requests", "lang" => lang);
|
||||
res
|
||||
}
|
||||
|
||||
async fn home(Extension(state): Extension<ServerState>) -> Response<UnsyncBoxBody<Bytes, axum::Error>> {
|
||||
trends_page(state, None).await
|
||||
}
|
||||
|
||||
async fn in_language(
|
||||
Extension(state): Extension<ServerState>,
|
||||
extract::Path(language): extract::Path<String>,
|
||||
) -> impl IntoResponse {
|
||||
TrendsPage::generate(Some(language), state)
|
||||
.await
|
||||
.template()
|
||||
) -> Response<UnsyncBoxBody<Bytes, axum::Error>> {
|
||||
trends_page(state, Some(language)).await
|
||||
}
|
||||
|
||||
async fn streaming_api(
|
||||
|
@ -164,7 +180,7 @@ async fn streaming_api(
|
|||
.chain(
|
||||
firehose.flat_map(move |data| data.map_or_else(stream_err, stream_ok))
|
||||
);
|
||||
let body = body::boxed(hyper::body::Body::wrap_stream(stream));
|
||||
let body = axum::body::boxed(hyper::body::Body::wrap_stream(stream));
|
||||
|
||||
Response::builder()
|
||||
.status(200)
|
||||
|
@ -192,7 +208,12 @@ async fn print_request(
|
|||
Ok(res)
|
||||
}
|
||||
|
||||
pub async fn start(listen_port: u16, store: Store, firehose_factory: FirehoseFactory) {
|
||||
pub async fn start(
|
||||
listen_port: u16,
|
||||
store: Store,
|
||||
firehose_factory: FirehoseFactory,
|
||||
recorder: PrometheusHandle,
|
||||
) {
|
||||
cave::systemd::status("Starting HTTP server");
|
||||
|
||||
// build our application with some routes
|
||||
|
@ -202,6 +223,9 @@ pub async fn start(listen_port: u16, store: Store, firehose_factory: FirehoseFac
|
|||
.route("/api/v1/streaming/public", get(streaming_api))
|
||||
.layer(Extension(ServerState { store }))
|
||||
.layer(Extension(firehose_factory))
|
||||
.route("/metrics", get(|| async move {
|
||||
recorder.render().into_response()
|
||||
}))
|
||||
.layer(middleware::from_fn(print_request))
|
||||
.merge(SpaRouter::new("/assets", "assets"));
|
||||
|
||||
|
|
|
@ -1,3 +1,7 @@
|
|||
use std::time::Duration;
|
||||
use metrics_util::MetricKindMask;
|
||||
use metrics_exporter_prometheus::PrometheusBuilder;
|
||||
|
||||
use cave::{
|
||||
config::LoadConfig,
|
||||
firehose::FirehoseFactory,
|
||||
|
@ -15,6 +19,12 @@ async fn main() {
|
|||
|
||||
let config = config::Config::load();
|
||||
|
||||
let recorder = PrometheusBuilder::new()
|
||||
.add_global_label("application", env!("CARGO_PKG_NAME"))
|
||||
.idle_timeout(MetricKindMask::ALL, Some(Duration::from_secs(600)))
|
||||
.install_recorder()
|
||||
.unwrap();
|
||||
|
||||
cave::systemd::status("Starting redis client");
|
||||
let store = cave::store::Store::new(8, config.redis.clone()).await;
|
||||
|
||||
|
@ -24,6 +34,7 @@ async fn main() {
|
|||
config.listen_port,
|
||||
store,
|
||||
firehose_factory,
|
||||
recorder,
|
||||
);
|
||||
cave::systemd::ready();
|
||||
http.await;
|
||||
|
|
Loading…
Reference in New Issue