168 lines
4.2 KiB
Rust
168 lines
4.2 KiB
Rust
use std::{
|
|
collections::HashMap,
|
|
ops::Deref,
|
|
net::SocketAddr,
|
|
};
|
|
use askama::Template;
|
|
use axum::{
|
|
async_trait,
|
|
Extension,
|
|
extract::{self, RequestParts, FromRequest},
|
|
http::{StatusCode, Request, Response},
|
|
response::IntoResponse,
|
|
routing::get,
|
|
Router, middleware::{Next, self}, body::{Body, self},
|
|
};
|
|
use axum_extra::routing::SpaRouter;
|
|
use futures::{stream, StreamExt};
|
|
use cave::{
|
|
firehose::FirehoseFactory,
|
|
systemd,
|
|
};
|
|
use crate::{
|
|
html_template::HtmlTemplate,
|
|
RequestFactory,
|
|
request_mux::RequestMux,
|
|
trends::TrendsResults,
|
|
};
|
|
|
|
type Mux = RequestMux<RequestFactory>;
|
|
|
|
#[async_trait]
|
|
impl<B> FromRequest<B> for Mux
|
|
where
|
|
B: Send,
|
|
{
|
|
type Rejection = (StatusCode, String);
|
|
|
|
async fn from_request(req: &mut RequestParts<B>) -> Result<Self, Self::Rejection> {
|
|
let Extension(mux) = Extension::<Mux>::from_request(req)
|
|
.await
|
|
.map_err(internal_error)?;
|
|
|
|
Ok(mux)
|
|
}
|
|
}
|
|
|
|
/// Utility function for mapping any error into a `500 Internal Server Error`
|
|
/// response.
|
|
fn internal_error<E>(err: E) -> (StatusCode, String)
|
|
where
|
|
E: std::error::Error,
|
|
{
|
|
(StatusCode::INTERNAL_SERVER_ERROR, err.to_string())
|
|
}
|
|
|
|
|
|
#[derive(Template)]
|
|
#[template(path = "trends.html")]
|
|
struct TrendsPage {
|
|
language: Option<String>,
|
|
languages: Vec<String>,
|
|
results: TrendsResults,
|
|
tag_images: HashMap<String, String>,
|
|
}
|
|
|
|
impl TrendsPage {
|
|
async fn generate(language: Option<String>, mux: Mux) -> Self {
|
|
let (results, languages, tag_images) = mux.request(language.clone()).await;
|
|
|
|
// redis queries done, data is ready for rendering, means the
|
|
// service is very much alive:
|
|
systemd::watchdog();
|
|
|
|
TrendsPage {
|
|
results,
|
|
language,
|
|
languages,
|
|
tag_images,
|
|
}
|
|
}
|
|
|
|
fn template(self) -> HtmlTemplate<Self> {
|
|
HtmlTemplate(self)
|
|
}
|
|
}
|
|
|
|
#[axum_macros::debug_handler]
|
|
async fn home(Extension(mux): Extension<Mux>) -> impl IntoResponse {
|
|
TrendsPage::generate(None, mux)
|
|
.await
|
|
.template()
|
|
}
|
|
|
|
async fn in_language(
|
|
Extension(mux): Extension<Mux>,
|
|
extract::Path(language): extract::Path<String>,
|
|
) -> impl IntoResponse {
|
|
TrendsPage::generate(Some(language), mux)
|
|
.await
|
|
.template()
|
|
}
|
|
|
|
async fn streaming_api(
|
|
Extension(firehose_factory): Extension<FirehoseFactory>,
|
|
) -> impl IntoResponse {
|
|
let firehose = firehose_factory.produce()
|
|
.await
|
|
.expect("firehose");
|
|
let stream_err = |e| stream::once(async move { Err(e) }).boxed();
|
|
let stream_ok = |data|
|
|
stream::iter([
|
|
Ok(b"event: update\ndata: ".to_vec()),
|
|
Ok(data),
|
|
Ok(b"\n\n".to_vec()),
|
|
].into_iter()).boxed();
|
|
let stream = stream::once(async { Ok(b":)\n".to_vec()) })
|
|
.chain(
|
|
firehose.flat_map(move |data| data.map_or_else(stream_err, stream_ok))
|
|
);
|
|
let body = body::boxed(hyper::body::Body::wrap_stream(stream));
|
|
|
|
Response::builder()
|
|
.status(200)
|
|
.header("content-type", "text/event-stream")
|
|
.header("cache-control", "no-store")
|
|
.body(body)
|
|
.expect("Response")
|
|
}
|
|
|
|
|
|
async fn print_request(
|
|
req: Request<Body>,
|
|
next: Next<Body>,
|
|
) -> Result<impl IntoResponse, (StatusCode, String)> {
|
|
log::info!(
|
|
"{} {} {:?}",
|
|
req.method(),
|
|
req.uri(),
|
|
req.headers().get("user-agent")
|
|
.and_then(|ua| ua.to_str().ok())
|
|
.unwrap_or("-")
|
|
);
|
|
let res = next.run(req).await;
|
|
|
|
Ok(res)
|
|
}
|
|
|
|
pub async fn start(listen_port: u16, mux: Mux, firehose_factory: FirehoseFactory) {
|
|
cave::systemd::status("Starting HTTP server");
|
|
|
|
// build our application with some routes
|
|
let app = Router::new()
|
|
.route("/", get(home))
|
|
.route("/in/:language", get(in_language))
|
|
.route("/api/v1/streaming/public", get(streaming_api))
|
|
.layer(Extension(mux))
|
|
.layer(Extension(firehose_factory))
|
|
.layer(middleware::from_fn(print_request))
|
|
.merge(SpaRouter::new("/assets", "assets"));
|
|
|
|
// run it
|
|
let addr = SocketAddr::from(([127, 0, 0, 1], listen_port));
|
|
axum::Server::bind(&addr)
|
|
.serve(app.into_make_service())
|
|
.await
|
|
.unwrap();
|
|
}
|