use std::collections::HashSet; use std::future::Future; use std::sync::Arc; use std::time::{Duration, Instant}; use cave::{ feed::{Feed, EncodablePost}, store::Store, }; use futures::{StreamExt, future}; use crate::posts_cache::PostsCache; use crate::scheduler::Host; #[derive(Clone)] pub struct RobotsTxt { robot: Arc>, } impl RobotsTxt { pub async fn fetch( client: &reqwest::Client, host: &Host, ) -> Self { let url = format!("https://{}/robots.txt", host); metrics::increment_gauge!("hunter_requests", 1.0, "type" => "robotstxt"); let robot = async { let body = client.get(url) .send().await .ok()? .text().await .ok()?; texting_robots::Robot::new( env!("CARGO_PKG_NAME"), body.as_bytes(), ).ok() }.await; metrics::decrement_gauge!("hunter_requests", 1.0, "type" => "robotstxt"); RobotsTxt { robot: Arc::new(robot), } } pub fn allowed(&self, url: &str) -> bool { if let Some(ref robot) = self.robot.as_ref() { robot.allowed(url) } else { true } } pub fn delay(&self) -> Option { if let Some(ref robot) = self.robot.as_ref() { robot.delay.map(|delay| Duration::from_secs(delay as u64)) } else { None } } } #[derive(Debug)] pub enum Message { WorkerDone, Fetched { host: Host, new_post_ratio: Option, mean_interval: Option, }, IntroduceHosts { hosts: Vec }, } pub async fn run( message_tx: tokio::sync::mpsc::UnboundedSender, store: Store, posts_cache: PostsCache, client: reqwest::Client, host: Host, ) { // Fetch /robots.txt let robots_txt = RobotsTxt::fetch(&client, &host).await; let robots_delay = robots_txt.delay(); // Fetch posts and open stream let ((mut new_post_ratio, mut mean_interval), stream_result) = future::join( fetch_timeline( message_tx.clone(), store.clone(), &posts_cache, &client, robots_txt.clone(), &host ), open_stream( message_tx.clone(), store.clone(), &posts_cache, &client, robots_txt, host.clone() ), ).await; // Next worker can start message_tx.send(Message::WorkerDone).unwrap(); // Process stream if let Ok(stream) = stream_result { tracing::info!("Processing stream for {}", host); metrics::increment_gauge!("hunter_requests", 1.0, "type" => "stream"); let start_time = Instant::now(); let post_count = stream.await; let end_time = Instant::now(); metrics::decrement_gauge!("hunter_requests", 1.0, "type" => "stream"); tracing::warn!("Ended stream for {}. {} posts in {:?}", host, post_count, end_time - start_time); if post_count > 0 { if let Some(ref mut new_post_ratio) = new_post_ratio { *new_post_ratio += post_count as f64 / 100.; } let stream_avg_interval = Duration::from_secs_f64( (end_time - start_time).as_secs_f64() / (post_count as f64) ); if mean_interval.map_or(true, |mean_interval| stream_avg_interval < mean_interval) { mean_interval = Some(stream_avg_interval); } } } // Ready for reenqueue if let Some(mean_interval) = &mut mean_interval { if let Some(robots_delay) = robots_delay { *mean_interval = (*mean_interval).max(robots_delay); } } message_tx.send(Message::Fetched { host: host, new_post_ratio, mean_interval, }).unwrap(); } async fn fetch_timeline( message_tx: tokio::sync::mpsc::UnboundedSender, mut store: Store, posts_cache: &PostsCache, client: &reqwest::Client, robots_txt: RobotsTxt, host: &Host, ) -> (Option, Option) { let url = format!("https://{}/api/v1/timelines/public?limit=40", host); if ! robots_txt.allowed(&url) { tracing::warn!("Timeline of {} forbidden by robots.txt", host); return (None, None); } // free as early as possible drop(robots_txt); metrics::increment_gauge!("hunter_requests", 1.0, "type" => "timeline"); let t1 = Instant::now(); let result = Feed::fetch(&client, &url).await; let t2 = Instant::now(); metrics::decrement_gauge!("hunter_requests", 1.0, "type" => "timeline"); metrics::histogram!("hunter_fetch_seconds", t2 - t1, "result" => if result.is_ok() { "ok" } else { "error" }); match result { Ok(feed) => { let mean_interval = feed.mean_post_interval(); let (new_post_ratio, introduce_hosts) = process_posts(&mut store, posts_cache, &host, feed.posts.into_iter()).await; let _ = message_tx.send(Message::IntroduceHosts { hosts: introduce_hosts.into_iter().collect(), }); // successfully fetched, save for future run store.save_host(&host).await.unwrap(); (new_post_ratio, mean_interval) } Err(e) => { tracing::error!("Failed fetching {}: {}", host, e); (None, None) } } } async fn process_posts( store: &mut Store, posts_cache: &PostsCache, host: &Host, posts: impl Iterator, ) -> (Option, HashSet) { // introduce new hosts, validate posts let mut introduce_hosts = HashSet::new(); let mut new_posts = 0; let mut posts_len = 0; for post in posts { posts_len += 1; // potentially save a round-trip to redis with an in-process cache if ! posts_cache.insert(post.uri.clone()) { let t1 = Instant::now(); // introduce instances from reblog authors if let Some(reblog_account_host) = post.reblog.as_ref().and_then(|reblog| reblog.account.host()) { introduce_hosts.insert(reblog_account_host); } // introduce instances from mentions for mention in &post.mentions { if let Some(user_host) = mention.user_host() { introduce_hosts.insert(user_host); } } // check if it's an actual post if let Some(account_host) = post.account.host() { // send away to redis if store.save_post(post).await == Ok(true) { new_posts += 1; } // introduce instances from accounts introduce_hosts.insert(account_host); } else { tracing::warn!("drop repost ({:?} on {})", post.account.host(), host); } let t2 = Instant::now(); metrics::histogram!("hunter_post_process_seconds", t2 - t1) } } tracing::trace!("{}: {}/{} new posts", host, new_posts, posts_len); metrics::counter!("hunter_posts", new_posts, "type" => "new"); metrics::counter!("hunter_posts", posts_len, "type" => "total"); let new_post_ratio = if posts_len > 0 { let ratio = (new_posts as f64) / (posts_len as f64); metrics::histogram!("hunter_new_post_ratio", ratio); Some(ratio) } else { None }; (new_post_ratio, introduce_hosts) } async fn open_stream( message_tx: tokio::sync::mpsc::UnboundedSender, store: Store, posts_cache: &PostsCache, client: &reqwest::Client, robots_txt: RobotsTxt, host: Host, ) -> Result, ()> { let url = format!("https://{}/api/v1/streaming/public", host); if ! robots_txt.allowed(&url) { tracing::warn!("Streaming of {} forbidden by robots.txt", host); return Err(()); } // free as early as possible drop(robots_txt); let posts_cache = posts_cache.clone(); metrics::increment_gauge!("hunter_requests", 1.0, "type" => "stream_open"); let stream = Feed::stream(client, &url).await; metrics::decrement_gauge!("hunter_requests", 1.0, "type" => "stream_open"); let stream = stream.map_err(|e| { tracing::error!("Stream error for {}: {}", host, e); })?; Ok(stream.fold(0, move |post_count, post| { let message_tx = message_tx.clone(); let mut store = store.clone(); let posts_cache = posts_cache.clone(); let host = host.clone(); async move { let (_, introduce_hosts) = process_posts(&mut store, &posts_cache, &host, [post].into_iter()).await; let _ = message_tx.send(Message::IntroduceHosts { hosts: introduce_hosts.into_iter().collect(), }); post_count + 1 } })) }