From 1cf7e200ab6ed6f8a6cf7e22c5664662a80528b6 Mon Sep 17 00:00:00 2001 From: Astro Date: Mon, 7 Nov 2022 00:58:28 +0100 Subject: [PATCH] hunter: rework scheduling algorithm --- hunter/src/main.rs | 4 ++-- hunter/src/scheduler.rs | 24 +++++++++++++++--------- hunter/src/worker.rs | 38 ++++++++++++++++++++------------------ 3 files changed, 37 insertions(+), 29 deletions(-) diff --git a/hunter/src/main.rs b/hunter/src/main.rs index f2bc71f..8ea16eb 100644 --- a/hunter/src/main.rs +++ b/hunter/src/main.rs @@ -65,9 +65,9 @@ async fn main() { let _ = timeout(duration, async { let message = message_rx.recv().await.unwrap(); match message { - Message::Fetched { host, next_interval, new_posts } => { + Message::Fetched { host, mean_interval, new_post_ratio } => { workers_active -= 1; - scheduler.enqueue(host, new_posts > 0, next_interval); + scheduler.reenqueue(host, new_post_ratio, mean_interval); } Message::IntroduceHosts { hosts } => { for host in hosts.into_iter() { diff --git a/hunter/src/scheduler.rs b/hunter/src/scheduler.rs index 17345de..a84ba9c 100644 --- a/hunter/src/scheduler.rs +++ b/hunter/src/scheduler.rs @@ -2,9 +2,12 @@ use std::collections::{HashMap, BTreeMap}; use std::time::Duration; use tokio::time::Instant; +const MIN_INTERVAL: Duration = Duration::from_secs(30); +const MAX_INTERVAL: Duration = Duration::from_secs(7200); +const DEFAULT_INTERVAL: Duration = Duration::from_secs(120); + pub struct Instance { last_fetch: Option, - no_updates: u32, error: bool, } @@ -36,26 +39,29 @@ impl Scheduler { if self.instances.get(&host).is_none() { self.instances.insert(host.clone(), Instance { last_fetch: None, - no_updates: 0, error: false, }); self.queue.insert(now, host); } } - pub fn enqueue(&mut self, host: String, fetched_anything: bool, next_interval: Duration) { + pub fn reenqueue(&mut self, host: String, new_post_ratio: Option, mean_interval: Option) { let now = Instant::now(); let instance = self.instances.get_mut(&host).unwrap(); + let last_interval = instance.last_fetch.map(|last_fetch| now - last_fetch); instance.last_fetch = Some(now); instance.error = false; - if fetched_anything { - instance.no_updates = 0; - } else { - instance.no_updates += 1; - } - let mut next = now + (1 + instance.no_updates) * next_interval; + let next_interval = match (new_post_ratio, mean_interval, last_interval) { + (Some(new_post_ratio), Some(mean_interval), _) if new_post_ratio > 0. => + mean_interval, + (_, _, Some(last_interval)) => + 2 * last_interval, + _ => + DEFAULT_INTERVAL, + }.max(MIN_INTERVAL).min(MAX_INTERVAL); + let mut next = now + next_interval; let mut d = 1; // avoid timestamp collision in self.queue while self.queue.get(&next).is_some() { diff --git a/hunter/src/worker.rs b/hunter/src/worker.rs index 2977183..a2b059c 100644 --- a/hunter/src/worker.rs +++ b/hunter/src/worker.rs @@ -2,19 +2,13 @@ use std::collections::HashSet; use std::time::Duration; use crate::feed::Feed; -// timeouts are fairly low as they will be multiplied with the amount -// of sequential fetches without new posts by the scheduler. - -const DEFAULT_INTERVAL: Duration = Duration::from_secs(30); -const MIN_INTERVAL: Duration = Duration::from_secs(10); -const ERROR_INTERVAL: Duration = Duration::from_secs(180); #[derive(Debug)] pub enum Message { Fetched { host: String, - new_posts: usize, - next_interval: Duration, + new_post_ratio: Option, + mean_interval: Option, }, IntroduceHosts { hosts: Vec }, } @@ -34,18 +28,19 @@ pub fn fetch_and_process( .filter_map(|post| post.timestamp()) .collect::>(); timestamps.sort(); - let next_interval = if timestamps.len() > 1 { - ((*timestamps.last().unwrap() - timestamps[0]) / (timestamps.len() as i32) - ).to_std().unwrap_or(DEFAULT_INTERVAL) - .min(DEFAULT_INTERVAL) - .max(MIN_INTERVAL) + let mean_interval = if timestamps.len() > 2 { + Some( + ((*timestamps.last().unwrap() - timestamps[0]) / (timestamps.len() as i32 - 1) + ).to_std().unwrap() + ) } else { - DEFAULT_INTERVAL + None }; // introduce new hosts, validate posts let mut hosts = HashSet::new(); let mut new_posts = 0; + let posts_len = feed.posts.len(); for post in feed.posts.into_iter() { // introduce instances from mentions for mention in &post.mentions { @@ -67,6 +62,13 @@ pub fn fetch_and_process( log::warn!("drop repost ({:?} on {})", post.account.host(), host); } } + log::info!("{}: {}/{} new posts", host, new_posts, posts_len); + let new_post_ratio = if posts_len > 0 { + Some((new_posts as f64) / (posts_len as f64)) + } else { + None + }; + let hosts = hosts.into_iter() .map(|host| host.to_owned()) .collect(); @@ -77,16 +79,16 @@ pub fn fetch_and_process( message_tx.send(Message::Fetched { host: host.clone(), - new_posts, - next_interval, + new_post_ratio, + mean_interval, }).unwrap(); } Err(e) => { log::error!("Failed fetching {}: {}", host, e); message_tx.send(Message::Fetched { host, - new_posts: 0, - next_interval: ERROR_INTERVAL, + new_post_ratio: None, + mean_interval: None, }).unwrap(); } }