caveman/gatherer/src/http_server.rs

240 lines
6.7 KiB
Rust

use std::{
collections::{HashMap, HashSet},
ops::Deref,
net::SocketAddr,
};
use askama::Template;
use axum::{
async_trait,
Extension,
extract::{self, RequestParts, FromRequest},
http::{StatusCode, Request, Response},
response::IntoResponse,
routing::get,
Router,
middleware::{Next, self},
body::{Body, Bytes},
};
use axum_extra::routing::SpaRouter;
use futures::{stream, StreamExt};
use futures::future::{join, join_all};
use cave::{
firehose::FirehoseFactory,
store::{Store, TREND_POOL_SIZE}, PERIODS,
systemd, db::Database,
};
use http_body::combinators::UnsyncBoxBody;
use metrics_exporter_prometheus::PrometheusHandle;
use crate::{
html_template::HtmlTemplate,
trends::{TrendAnalyzer, TrendsResults},
};
mod token_donate;
mod token_collect;
type Languages = Vec<String>;
#[derive(Clone)]
pub struct ServerState {
store: Store,
db: Database,
}
impl ServerState {
async fn query_trends(&self, language: Option<String>) -> (TrendsResults, Languages, HashMap<String, String>) {
let mut store = self.store.clone();
let mut store_ = self.store.clone();
let (results, mut languages) = join(async move {
TrendAnalyzer::run(&mut store_, TREND_POOL_SIZE, PERIODS, language)
.await
.unwrap()
}, async {
store.get_languages()
.await
.unwrap()
}).await;
languages.sort();
let tags = results.iter()
.flat_map(|(_until, _period, result)| result.iter().map(|(_score, tag)| tag.name.to_string()))
.collect::<HashSet<String>>();
let tag_images = join_all(tags.into_iter().map(|name| {
let mut store = store.clone();
async move {
let images = store.get_tag_images(&name)
.await
.unwrap()
.into_iter()
.enumerate()
.flat_map(|(i, url)| if i == 0 {
["".to_owned(), url]
} else {
[" ".to_owned(), url]
})
.collect::<String>();
(name, images)
}
})).await.into_iter().collect();
(results, languages, tag_images)
}
}
#[async_trait]
impl<B> FromRequest<B> for ServerState
where
B: Send,
{
type Rejection = (StatusCode, String);
async fn from_request(req: &mut RequestParts<B>) -> Result<Self, Self::Rejection> {
let Extension(state) = Extension::<ServerState>::from_request(req)
.await
.map_err(internal_error)?;
Ok(state)
}
}
/// Utility function for mapping any error into a `500 Internal Server Error`
/// response.
fn internal_error<E>(err: E) -> (StatusCode, String)
where
E: std::error::Error,
{
(StatusCode::INTERNAL_SERVER_ERROR, err.to_string())
}
#[derive(Template)]
#[template(path = "trends.html")]
struct TrendsPage {
language: Option<String>,
languages: Vec<String>,
results: TrendsResults,
tag_images: HashMap<String, String>,
}
impl TrendsPage {
async fn generate(language: Option<String>, state: ServerState) -> Self {
let (results, languages, tag_images) = state.query_trends(language.clone()).await;
// redis queries done, data is ready for rendering, means the
// service is very much alive:
systemd::watchdog();
TrendsPage {
results,
language,
languages,
tag_images,
}
}
fn template(self) -> HtmlTemplate<Self> {
HtmlTemplate(self)
}
}
async fn trends_page(
state: ServerState,
language: Option<String>,
) -> Response<UnsyncBoxBody<Bytes, axum::Error>> {
let lang = if language.is_some() { "some" } else { "any" };
let page = TrendsPage::generate(language, state)
.await;
let res = page.template().into_response();
metrics::increment_counter!("trends_page_requests", "lang" => lang);
res
}
async fn home(Extension(state): Extension<ServerState>) -> Response<UnsyncBoxBody<Bytes, axum::Error>> {
trends_page(state, None).await
}
async fn in_language(
Extension(state): Extension<ServerState>,
extract::Path(language): extract::Path<String>,
) -> Response<UnsyncBoxBody<Bytes, axum::Error>> {
trends_page(state, Some(language)).await
}
async fn streaming_api(
Extension(firehose_factory): Extension<FirehoseFactory>,
) -> impl IntoResponse {
let firehose = firehose_factory.produce()
.await
.expect("firehose");
let stream_err = |e| stream::once(async move { Err(e) }).boxed();
let stream_ok = |data|
stream::iter([
Ok(b"event: update\ndata: ".to_vec()),
Ok(data),
Ok(b"\n\n".to_vec()),
].into_iter()).boxed();
let stream = stream::once(async { Ok(b":)\n".to_vec()) })
.chain(
firehose.flat_map(move |data| data.map_or_else(stream_err, stream_ok))
);
let body = axum::body::boxed(hyper::body::Body::wrap_stream(stream));
Response::builder()
.status(200)
.header("content-type", "text/event-stream")
.header("cache-control", "no-store")
.body(body)
.expect("Response")
}
async fn print_request(
req: Request<Body>,
next: Next<Body>,
) -> Result<impl IntoResponse, (StatusCode, String)> {
tracing::info!(
"{} {} {:?}",
req.method(),
req.uri(),
req.headers().get("user-agent")
.and_then(|ua| ua.to_str().ok())
.unwrap_or("-")
);
let res = next.run(req).await;
Ok(res)
}
pub async fn start(
listen_port: u16,
store: Store,
db: Database,
firehose_factory: FirehoseFactory,
recorder: PrometheusHandle,
) {
cave::systemd::status("Starting HTTP server");
// build our application with some routes
let app = Router::new()
.route("/", get(home))
.route("/in/:language", get(in_language))
.route("/api/v1/streaming/public", get(streaming_api))
.route("/token/donate", get(token_donate::get_token_donate).post(token_donate::post_token_donate))
.route("/token/collect/:host", get(token_collect::get_token_collect))
.route("/token/thanks", get(token_collect::get_token_thanks))
.layer(Extension(ServerState { store, db }))
.layer(Extension(firehose_factory))
.route("/metrics", get(|| async move {
recorder.render().into_response()
}))
.layer(middleware::from_fn(print_request))
.merge(SpaRouter::new("/assets", "assets"));
// run it
let addr = SocketAddr::from(([127, 0, 0, 1], listen_port));
axum::Server::bind(&addr)
.serve(app.into_make_service())
.await
.unwrap();
}