diff --git a/Cargo.lock b/Cargo.lock index 639a7e9..f992fc0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -272,6 +272,7 @@ dependencies = [ "redis", "reqwest", "serde", + "serde_json", "serde_yaml", "systemd", "tokio", diff --git a/cave/Cargo.toml b/cave/Cargo.toml index ccf1c80..38c1b86 100644 --- a/cave/Cargo.toml +++ b/cave/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" futures = "0.3" tokio = { version = "1", features = ["full"] } serde = { version = "1", features = ["derive"] } +serde_json = "1" serde_yaml = "0.9" chrono = "0.4" bb8 = "0.8" diff --git a/cave/src/feed.rs b/cave/src/feed.rs index f77961f..b7ae39f 100644 --- a/cave/src/feed.rs +++ b/cave/src/feed.rs @@ -1,6 +1,6 @@ use std::{collections::{HashMap, HashSet}, time::Duration}; use chrono::{DateTime, FixedOffset}; -use futures::Stream; +use futures::{Stream, StreamExt}; use eventsource_stream::Eventsource; #[derive(Debug, serde::Deserialize)] @@ -151,11 +151,40 @@ impl Feed { Ok(Feed { posts }) } - pub async fn stream(client: &reqwest::Client, url: &str) -> Result { + pub async fn stream(client: &reqwest::Client, url: &str) -> Result, String> { let res = client.get(url) + .timeout(Duration::MAX) .send() - .await?; - let src = res.bytes_stream().eventsource(); + .await + .map_err(|e| format!("{}", e))?; + if res.status() != 200 { + return Err(format!("HTTP {}", res.status())); + } + let ct = res.headers().get("content-type") + .and_then(|c| c.to_str().ok()); + if ct.map_or(true, |ct| ct != "text/event-stream") { + return Err(format!("Invalid Content-Type: {:?}", ct)); + } + + let src = res.bytes_stream().eventsource() + .filter_map(|result| async { + let result = result.ok()?; + if result.event == "update" { + Some(result) + } else { + None + } + }) + .filter_map(|event| async move { + match serde_json::from_str(&event.data) { + Ok(post) => + Some(post), + Err(e) => { + log::error!("Error decoding stream data: {}", e); + None + } + } + }); Ok(src) } } diff --git a/hunter/src/main.rs b/hunter/src/main.rs index 16f04dd..d0d5630 100644 --- a/hunter/src/main.rs +++ b/hunter/src/main.rs @@ -63,8 +63,10 @@ async fn main() { let _ = timeout(duration, async { let message = message_rx.recv().await.unwrap(); match message { - Message::Fetched { host, mean_interval, new_post_ratio } => { + Message::WorkerDone => { workers_active -= 1; + } + Message::Fetched { host, mean_interval, new_post_ratio } => { scheduler.reenqueue(host, new_post_ratio, mean_interval); } Message::IntroduceHosts { hosts } => { diff --git a/hunter/src/worker.rs b/hunter/src/worker.rs index da45b34..350cf47 100644 --- a/hunter/src/worker.rs +++ b/hunter/src/worker.rs @@ -1,9 +1,11 @@ use std::collections::HashSet; +use std::future::Future; use std::time::Duration; use cave::{ feed::{Feed, Post}, store::Store, }; +use futures::{StreamExt, future}; use crate::trend_setter::UpdateSet; pub struct RobotsTxt { @@ -47,6 +49,7 @@ impl RobotsTxt { #[derive(Debug)] pub enum Message { + WorkerDone, Fetched { host: String, new_post_ratio: Option, @@ -62,23 +65,38 @@ pub async fn run( client: reqwest::Client, host: String, ) { + // Fetch /robots.txt let robots_txt = RobotsTxt::fetch(&client, &host).await; - let (new_post_ratio, mut mean_interval) = - // let (_, stream) = future::join( + // Fetch posts and open stream + let ((new_post_ratio, mut mean_interval), stream_result) = future::join( fetch_timeline( message_tx.clone(), store.clone(), trend_setter_tx.clone(), - client.clone(), &robots_txt, &host - // ), - // open_stream(&host), - ).await; + &client, &robots_txt, &host + ), + open_stream( + message_tx.clone(), store.clone(), + trend_setter_tx.clone(), + &client, &robots_txt, host.clone() + ), + ).await; + // Next worker can start + message_tx.send(Message::WorkerDone).unwrap(); + + // Process stream + if let Ok(stream) = stream_result { + log::info!("Processing stream for {}", host); + stream.await; + log::warn!("Ended stream for {}", host); + } + + // Ready for reenqueue if let Some(mean_interval) = &mut mean_interval { if let Some(delay) = robots_txt.delay() { *mean_interval = (*mean_interval).max(delay); } } - message_tx.send(Message::Fetched { host: host.to_string(), new_post_ratio, @@ -90,7 +108,7 @@ async fn fetch_timeline( message_tx: tokio::sync::mpsc::UnboundedSender, mut store: Store, trend_setter_tx: crate::trend_setter::Tx, - client: reqwest::Client, + client: &reqwest::Client, robots_txt: &RobotsTxt, host: &str, ) -> (Option, Option) { @@ -172,3 +190,38 @@ async fn process_posts( (new_post_ratio, introduce_hosts) } + +async fn open_stream( + message_tx: tokio::sync::mpsc::UnboundedSender, + store: Store, + trend_setter_tx: crate::trend_setter::Tx, + client: &reqwest::Client, + robots_txt: &RobotsTxt, + host: String, +) -> Result, ()> { + let url = format!("https://{}/api/v1/streaming/public", host); + if ! robots_txt.allowed(&url) { + log::warn!("Streaming of {} forbidden by robots.txt", host); + return Err(()); + } + + let stream = Feed::stream(client, &url).await + .map_err(|e| { + log::error!("Stream error for {}: {}", host, e); + })?; + + Ok(stream.for_each(move |post| { + let message_tx = message_tx.clone(); + let mut store = store.clone(); + let trend_setter_tx = trend_setter_tx.clone(); + let host = host.clone(); + async move { + log::info!("Stream {} new post: {}", host, &post.uri); + let (_, introduce_hosts) = + process_posts(&mut store, &trend_setter_tx, &host, [post].into_iter()).await; + let _ = message_tx.send(Message::IntroduceHosts { + hosts: introduce_hosts.into_iter().collect(), + }); + } + })) +}