Compare commits

...

4 Commits

Author SHA1 Message Date
Astro bdd3fcb467 hunter/main: use mimalloc 2023-10-01 23:31:17 +02:00
Astro 96a66b2799 cargo update 2023-10-01 23:20:56 +02:00
Astro 195ba34501 firehose: convey event_type 2023-10-01 23:19:00 +02:00
Astro e07a6b9716 flake.lock: Update
Flake lock file updates:

• Updated input 'fenix':
    'github:nix-community/fenix/492f9d57e90e53e09af6e3ada26c866af0cf4bf0' (2023-08-11)
  → 'github:nix-community/fenix/9ccae1754eec0341b640d5705302ac0923d22875' (2023-10-01)
• Updated input 'fenix/rust-analyzer-src':
    'github:rust-lang/rust-analyzer/1b678231d71f48f078e1a80230c28a2fce2daec5' (2023-08-10)
  → 'github:rust-lang/rust-analyzer/0840038f02daec6ba3238f05d8caa037d28701a0' (2023-09-30)
• Updated input 'naersk':
    'github:nmattia/naersk/d9a33d69a9c421d64c8d925428864e93be895dcc' (2023-07-26)
  → 'github:nmattia/naersk/3f976d822b7b37fc6fb8e6f157c2dd05e7e94e89' (2023-09-07)
• Updated input 'nixpkgs':
    'path:/nix/store/9xpvg56sk83p217x1ws6a42ga07mip2d-source?lastModified=1690640159&narHash=sha256-5DZUYnkeMOsVb/eqPYb9zns5YsnQXRJRC8Xx/nPMcno%3D&rev=e6ab46982debeab9831236869539a507f670a129' (2023-07-29)
  → 'path:/nix/store/z5r8imp4q5wjfl58d064rwm20cym02c0-source?lastModified=1693844670&narHash=sha256-t69F2nBB8DNQUWHD809oJZJVE%2B23XBrth4QZuVd6IE0%3D&rev=3c15feef7770eb5500a4b8792623e2d6f598c9c1' (2023-09-04)
• Updated input 'utils':
    'github:numtide/flake-utils/919d646de7be200f3bf08cb76ae1f09402b6f9b4' (2023-07-11)
  → 'github:numtide/flake-utils/ff7b65b44d01cf9ba6a71320833626af21126384' (2023-09-12)
2023-10-01 21:03:37 +02:00
11 changed files with 274 additions and 261 deletions

413
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -6,6 +6,7 @@ members = [
"gatherer", "gatherer",
"smokestack", "smokestack",
] ]
resolver = "2"
[profile.release] [profile.release]
codegen-units = 1 codegen-units = 1

View File

@ -45,11 +45,15 @@ async fn main() {
let firehose_factory = FirehoseFactory::new(config.redis); let firehose_factory = FirehoseFactory::new(config.redis);
let firehose = firehose_factory.produce() let firehose = firehose_factory.produce()
.await .await
.expect("firehose") .expect("firehose");
.filter_map(|item| async move { item.ok() });
cave::systemd::ready(); cave::systemd::ready();
firehose.for_each(move |data| { firehose.for_each(move |(event_type, data)| {
if event_type != b"update" {
// Only analyze new posts, no updates
return futures::future::ready(());
}
let trend_setter_tx = trend_setter_tx.clone(); let trend_setter_tx = trend_setter_tx.clone();
let mut store = store.clone(); let mut store = store.clone();
let profanity = profanity.clone(); let profanity = profanity.clone();

View File

@ -151,6 +151,7 @@ enum EncodedPost {
/// to the original incoming data /// to the original incoming data
#[derive(Debug)] #[derive(Debug)]
pub struct EncodablePost { pub struct EncodablePost {
pub event_type: String,
post: Post, post: Post,
encoded: EncodedPost, encoded: EncodedPost,
} }
@ -163,17 +164,19 @@ impl Deref for EncodablePost {
} }
impl EncodablePost { impl EncodablePost {
pub fn from_value(value: serde_json::Value) -> Result<Self, serde_json::Error> { pub fn from_value(event_type: String, value: serde_json::Value) -> Result<Self, serde_json::Error> {
let post = serde_json::from_value(value.clone())?; let post = serde_json::from_value(value.clone())?;
Ok(EncodablePost { Ok(EncodablePost {
event_type,
post, post,
encoded: EncodedPost::Value(value), encoded: EncodedPost::Value(value),
}) })
} }
pub fn from_bytes(bytes: Vec<u8>) -> Result<Self, serde_json::Error> { pub fn from_bytes(event_type: String, bytes: Vec<u8>) -> Result<Self, serde_json::Error> {
let post = serde_json::from_slice(&bytes)?; let post = serde_json::from_slice(&bytes)?;
Ok(EncodablePost { Ok(EncodablePost {
event_type,
post, post,
encoded: EncodedPost::Bytes(bytes), encoded: EncodedPost::Bytes(bytes),
}) })
@ -226,7 +229,7 @@ impl Feed {
let posts = tokio::task::spawn_blocking(move || { let posts = tokio::task::spawn_blocking(move || {
let values: Vec<serde_json::Value> = serde_json::from_slice(&body)?; let values: Vec<serde_json::Value> = serde_json::from_slice(&body)?;
let posts: Vec<EncodablePost> = values.into_iter() let posts: Vec<EncodablePost> = values.into_iter()
.filter_map(|value| EncodablePost::from_value(value).ok()) .filter_map(|value| EncodablePost::from_value("update".to_string(), value).ok())
.collect(); .collect();
Ok::<_, serde_json::Error>(posts) Ok::<_, serde_json::Error>(posts)
}).await.expect("join blocking") }).await.expect("join blocking")
@ -254,16 +257,9 @@ impl Feed {
} }
let src = res.bytes_stream().eventsource() let src = res.bytes_stream().eventsource()
.filter_map(|result| async { .filter_map(|result| async move {
let result = result.ok()?; let event = result.ok()?;
if result.event == "update" { EncodablePost::from_bytes(event.event, event.data.into_bytes()).ok()
Some(result)
} else {
None
}
})
.filter_map(|event| async move {
EncodablePost::from_bytes(event.data.into_bytes()).ok()
}); });
Ok(src) Ok(src)
} }

View File

@ -11,7 +11,7 @@ impl FirehoseFactory {
FirehoseFactory { redis_url } FirehoseFactory { redis_url }
} }
pub async fn produce(&self) -> Result<impl Stream<Item = Result<Vec<u8>, RedisError>>, RedisError> { pub async fn produce(&self) -> Result<impl Stream<Item = (Vec<u8>, Vec<u8>)>, RedisError> {
let client = redis::Client::open(&self.redis_url[..])?; let client = redis::Client::open(&self.redis_url[..])?;
let mut pubsub_conn = client.get_async_connection() let mut pubsub_conn = client.get_async_connection()
.await? .await?
@ -21,7 +21,13 @@ impl FirehoseFactory {
.await?; .await?;
let stream = pubsub_conn.into_on_message() let stream = pubsub_conn.into_on_message()
.map(|msg| msg.get_payload()); .filter_map(|msg| async move {
let data = msg.get_payload::<Vec<u8>>().ok()?;
let nul_pos = data.iter().position(|&b| b == 0)?;
let event_type = data[..nul_pos].to_owned();
let post_data: Vec<u8> = data[nul_pos + 1..].to_owned();
Some((event_type, post_data))
});
Ok(stream) Ok(stream)
} }
} }

View File

@ -130,8 +130,14 @@ impl Store {
post.tags.len(), post.uri); post.tags.len(), post.uri);
match post.encode() { match post.encode() {
Ok(encoded) => { Ok(encoded_post) => {
redis::Cmd::publish("firehose", encoded) let mut data = Vec::<u8>::with_capacity(
post.event_type.len() + 1 + encoded_post.len()
);
data.extend(post.event_type.as_bytes());
data.extend(&[0]);
data.extend(&encoded_post);
redis::Cmd::publish("firehose", data)
.query_async::<_, Value>(self) .query_async::<_, Value>(self)
.await?; .await?;
} }

View File

@ -8,11 +8,11 @@
"rust-analyzer-src": "rust-analyzer-src" "rust-analyzer-src": "rust-analyzer-src"
}, },
"locked": { "locked": {
"lastModified": 1691734822, "lastModified": 1696141234,
"narHash": "sha256-mIK3x93yNVB0IkwaO3nQ1au0sVBo8QiLf5mpOx88hZ0=", "narHash": "sha256-0dZpggYjjmWEk+rGixiBHOHuQfLzEzNfrtjSig04s6Q=",
"owner": "nix-community", "owner": "nix-community",
"repo": "fenix", "repo": "fenix",
"rev": "492f9d57e90e53e09af6e3ada26c866af0cf4bf0", "rev": "9ccae1754eec0341b640d5705302ac0923d22875",
"type": "github" "type": "github"
}, },
"original": { "original": {
@ -28,11 +28,11 @@
] ]
}, },
"locked": { "locked": {
"lastModified": 1690373729, "lastModified": 1694081375,
"narHash": "sha256-e136hTT7LqQ2QjOTZQMW+jnsevWwBpMj78u6FRUsH9I=", "narHash": "sha256-vzJXOUnmkMCm3xw8yfPP5m8kypQ3BhAIRe4RRCWpzy8=",
"owner": "nmattia", "owner": "nmattia",
"repo": "naersk", "repo": "naersk",
"rev": "d9a33d69a9c421d64c8d925428864e93be895dcc", "rev": "3f976d822b7b37fc6fb8e6f157c2dd05e7e94e89",
"type": "github" "type": "github"
}, },
"original": { "original": {
@ -43,10 +43,10 @@
}, },
"nixpkgs": { "nixpkgs": {
"locked": { "locked": {
"lastModified": 1690640159, "lastModified": 1693844670,
"narHash": "sha256-5DZUYnkeMOsVb/eqPYb9zns5YsnQXRJRC8Xx/nPMcno=", "narHash": "sha256-t69F2nBB8DNQUWHD809oJZJVE+23XBrth4QZuVd6IE0=",
"path": "/nix/store/9xpvg56sk83p217x1ws6a42ga07mip2d-source", "path": "/nix/store/z5r8imp4q5wjfl58d064rwm20cym02c0-source",
"rev": "e6ab46982debeab9831236869539a507f670a129", "rev": "3c15feef7770eb5500a4b8792623e2d6f598c9c1",
"type": "path" "type": "path"
}, },
"original": { "original": {
@ -65,11 +65,11 @@
"rust-analyzer-src": { "rust-analyzer-src": {
"flake": false, "flake": false,
"locked": { "locked": {
"lastModified": 1691691861, "lastModified": 1696050837,
"narHash": "sha256-EBNtN+r7eIrNrZTpPHwCV/KpEqd36ZyDuf8aUyiCFOU=", "narHash": "sha256-2K3Aq4gjPZBDnkAMJaMA4ElE+BNbmrqtSBWtt9kPGaM=",
"owner": "rust-lang", "owner": "rust-lang",
"repo": "rust-analyzer", "repo": "rust-analyzer",
"rev": "1b678231d71f48f078e1a80230c28a2fce2daec5", "rev": "0840038f02daec6ba3238f05d8caa037d28701a0",
"type": "github" "type": "github"
}, },
"original": { "original": {
@ -99,11 +99,11 @@
"systems": "systems" "systems": "systems"
}, },
"locked": { "locked": {
"lastModified": 1689068808, "lastModified": 1694529238,
"narHash": "sha256-6ixXo3wt24N/melDWjq70UuHQLxGV8jZvooRanIHXw0=", "narHash": "sha256-zsNZZGTGnMOf9YpHKJqMSsa0dXbfmxeoJ7xHlrt+xmY=",
"owner": "numtide", "owner": "numtide",
"repo": "flake-utils", "repo": "flake-utils",
"rev": "919d646de7be200f3bf08cb76ae1f09402b6f9b4", "rev": "ff7b65b44d01cf9ba6a71320833626af21126384",
"type": "github" "type": "github"
}, },
"original": { "original": {

View File

@ -168,16 +168,16 @@ async fn streaming_api(
let firehose = firehose_factory.produce() let firehose = firehose_factory.produce()
.await .await
.expect("firehose"); .expect("firehose");
let stream_err = |e| stream::once(async move { Err(e) }).boxed(); let stream = stream::once(async { Ok::<Vec<u8>, axum::Error>(b":)\n".to_vec()) })
let stream_ok = |data|
stream::iter([
Ok(b"event: update\ndata: ".to_vec()),
Ok(data),
Ok(b"\n\n".to_vec()),
].into_iter()).boxed();
let stream = stream::once(async { Ok(b":)\n".to_vec()) })
.chain( .chain(
firehose.flat_map(move |data| data.map_or_else(stream_err, stream_ok)) firehose.flat_map(|(event_type, data)|
stream::iter([
Ok(b"event: ".to_vec()),
Ok(event_type),
Ok(b"\ndata: ".to_vec()),
Ok(data),
Ok(b"\n\n".to_vec()),
].into_iter()).boxed())
); );
let body = axum::body::boxed(hyper::body::Body::wrap_stream(stream)); let body = axum::body::boxed(hyper::body::Body::wrap_stream(stream));

View File

@ -22,6 +22,4 @@ metrics-util = "0.14"
metrics-exporter-prometheus = "0.11" metrics-exporter-prometheus = "0.11"
serde_json = "1" serde_json = "1"
urlencoding = "1" urlencoding = "1"
mimalloc = { version = "*", default-features = false }
[target.'cfg(not(target_env = "msvc"))'.dependencies]
jemallocator = "0.5"

View File

@ -15,9 +15,8 @@ mod webfinger;
use scheduler::InstanceHost; use scheduler::InstanceHost;
use worker::Message; use worker::Message;
#[cfg(not(target_env = "msvc"))]
#[global_allocator] #[global_allocator]
static GLOBAL: jemallocator::Jemalloc = jemallocator::Jemalloc; static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
fn main() -> Result<(), Box<dyn std::error::Error>> { fn main() -> Result<(), Box<dyn std::error::Error>> {
tokio_uring::start(async { tokio_uring::start(async {

View File

@ -165,10 +165,15 @@ impl Drop for Pipe {
} }
} }
async fn publisher(state: State, firehose: impl Stream<Item = Vec<u8>>) { async fn publisher(state: State, firehose: impl Stream<Item = (Vec<u8>, Vec<u8>)>) {
firehose.for_each(move |data| { firehose.for_each(move |(event_type, data)| {
let state = state.clone(); let state = state.clone();
async move { async move {
if event_type != b"update" {
// Only process new posts, no updates
return;
}
let post = serde_json::from_slice(&data) let post = serde_json::from_slice(&data)
.ok(); .ok();
let msg = post.and_then(format_message); let msg = post.and_then(format_message);
@ -193,8 +198,7 @@ async fn main() {
let firehose_factory = FirehoseFactory::new(config.redis); let firehose_factory = FirehoseFactory::new(config.redis);
let firehose = firehose_factory.produce() let firehose = firehose_factory.produce()
.await .await
.expect("firehose") .expect("firehose");
.filter_map(|item| async move { item.ok() });
tokio::spawn( tokio::spawn(
publisher(state.clone(), firehose) publisher(state.clone(), firehose)
); );