gatherer/firehose: add

This commit is contained in:
Astro 2022-11-15 00:45:02 +01:00
parent 461c8852d4
commit f9e7189c76
7 changed files with 81 additions and 10 deletions

2
Cargo.lock generated
View File

@ -288,6 +288,8 @@ dependencies = [
"axum-macros",
"cave",
"chrono",
"futures",
"hyper",
"log",
"redis",
"serde",

View File

@ -3,7 +3,7 @@ use chrono::{DateTime, FixedOffset};
use futures::{Stream, StreamExt};
use eventsource_stream::Eventsource;
#[derive(Debug, serde::Deserialize)]
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Account {
pub username: String,
pub display_name: String,
@ -23,18 +23,18 @@ impl Account {
}
}
#[derive(Debug, serde::Deserialize)]
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Tag {
pub name: String,
}
#[derive(Debug, serde::Deserialize)]
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Application {
pub name: String,
pub website: Option<String>,
}
#[derive(Debug, serde::Deserialize)]
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Mention {
pub username: Option<String>,
pub url: String,
@ -51,7 +51,7 @@ impl Mention {
}
}
#[derive(Debug, serde::Deserialize)]
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct Post {
pub created_at: String,
pub uri: String,

View File

@ -110,6 +110,8 @@ impl Store {
.getset(&post_key, "1")
.expire(post_key, POST_EXPIRE)
.ignore()
.publish("firehose", serde_json::to_vec(&post).unwrap())
.ignore()
.query_async::<_, Value>(self)
.await?;
if check != Value::Bulk(vec![Value::Nil]) {

View File

@ -4,6 +4,7 @@ version = "0.0.0"
edition = "2021"
[dependencies]
futures = "0.3"
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
serde_yaml = "0.9"
@ -11,6 +12,7 @@ chrono = "0.4"
redis = { version = "0.22", features = ["tokio-comp", "connection-manager"] }
log = "0.4"
cave = { path = "../cave" }
hyper = { version = "0.14", features = ["stream"] }
axum = "0.5"
axum-macros = "0.2"
axum-extra = { version = "0.3", features = ["spa"] }

27
gatherer/src/firehose.rs Normal file
View File

@ -0,0 +1,27 @@
use futures::{Stream, StreamExt};
use redis::RedisError;
#[derive(Clone)]
pub struct FirehoseFactory {
redis_url: String,
}
impl FirehoseFactory {
pub fn new(redis_url: String) -> Self {
FirehoseFactory { redis_url }
}
pub async fn produce(&self) -> Result<impl Stream<Item = Result<Vec<u8>, RedisError>>, RedisError> {
let client = redis::Client::open(&self.redis_url[..])?;
let mut pubsub_conn = client.get_async_connection()
.await?
.into_pubsub();
pubsub_conn.subscribe("firehose")
.await?;
let stream = pubsub_conn.into_on_message()
.map(|msg| msg.get_payload());
Ok(stream)
}
}

View File

@ -4,15 +4,17 @@ use axum::{
async_trait,
Extension,
extract::{self, RequestParts, FromRequest},
http::{StatusCode, Request},
http::{StatusCode, Request, Response},
response::IntoResponse,
routing::get,
Router, middleware::{Next, self}, body::Body,
Router, middleware::{Next, self}, body::{Body, self},
};
use axum_extra::routing::SpaRouter;
use futures::{stream, StreamExt};
use cave::systemd;
use crate::{
html_template::HtmlTemplate,
firehose::FirehoseFactory,
RequestFactory,
request_mux::RequestMux,
trends::TrendsResults,
@ -90,6 +92,33 @@ async fn in_language(
.template()
}
async fn streaming_api(
Extension(firehose_factory): Extension<FirehoseFactory>,
) -> impl IntoResponse {
let firehose = firehose_factory.produce()
.await
.expect("firehose");
let stream_err = |e| stream::once(async move { Err(e) }).boxed();
let stream_ok = |data|
stream::iter([
Ok(b"event: update\ndata: ".to_vec()),
Ok(data),
Ok(b"\n\n".to_vec()),
].into_iter()).boxed();
let stream = stream::once(async { Ok(b":)\n".to_vec()) })
.chain(
firehose.flat_map(move |data| data.map_or_else(stream_err, stream_ok))
);
let body = body::boxed(hyper::body::Body::wrap_stream(stream));
Response::builder()
.status(200)
.header("content-type", "text/event-stream")
.header("cache-control", "no-store")
.body(body)
.expect("Response")
}
async fn print_request(
req: Request<Body>,
@ -108,14 +137,16 @@ async fn print_request(
Ok(res)
}
pub async fn start(listen_port: u16, mux: Mux) {
pub async fn start(listen_port: u16, mux: Mux, firehose_factory: FirehoseFactory) {
cave::systemd::status("Starting HTTP server");
// build our application with some routes
let app = Router::new()
.route("/", get(home))
.route("/in/:language", get(in_language))
.route("/api/v1/streaming/public", get(streaming_api))
.layer(Extension(mux))
.layer(Extension(firehose_factory))
.layer(middleware::from_fn(print_request))
.merge(SpaRouter::new("/assets", "assets"));

View File

@ -2,13 +2,17 @@ use cave::{
config::LoadConfig,
store::Store,
};
use crate::request_mux::{Muxable, RequestMux};
use crate::{
firehose::FirehoseFactory,
request_mux::{Muxable, RequestMux},
};
mod config;
mod trends;
mod html_template;
mod http_server;
mod request_mux;
mod firehose;
type Languages = Vec<String>;
@ -40,11 +44,14 @@ async fn main() {
let config = config::Config::load();
cave::systemd::status("Starting redis client");
let store = cave::store::Store::new(8, config.redis).await;
let store = cave::store::Store::new(8, config.redis.clone()).await;
let firehose_factory = FirehoseFactory::new(config.redis);
let http = http_server::start(
config.listen_port,
RequestMux::new(RequestFactory { store }),
firehose_factory,
);
cave::systemd::ready();
http.await;