hunter: rework scheduling algorithm

This commit is contained in:
Astro 2022-11-07 00:58:28 +01:00
parent c218cae316
commit 1cf7e200ab
3 changed files with 37 additions and 29 deletions

View File

@ -65,9 +65,9 @@ async fn main() {
let _ = timeout(duration, async { let _ = timeout(duration, async {
let message = message_rx.recv().await.unwrap(); let message = message_rx.recv().await.unwrap();
match message { match message {
Message::Fetched { host, next_interval, new_posts } => { Message::Fetched { host, mean_interval, new_post_ratio } => {
workers_active -= 1; workers_active -= 1;
scheduler.enqueue(host, new_posts > 0, next_interval); scheduler.reenqueue(host, new_post_ratio, mean_interval);
} }
Message::IntroduceHosts { hosts } => { Message::IntroduceHosts { hosts } => {
for host in hosts.into_iter() { for host in hosts.into_iter() {

View File

@ -2,9 +2,12 @@ use std::collections::{HashMap, BTreeMap};
use std::time::Duration; use std::time::Duration;
use tokio::time::Instant; use tokio::time::Instant;
const MIN_INTERVAL: Duration = Duration::from_secs(30);
const MAX_INTERVAL: Duration = Duration::from_secs(7200);
const DEFAULT_INTERVAL: Duration = Duration::from_secs(120);
pub struct Instance { pub struct Instance {
last_fetch: Option<Instant>, last_fetch: Option<Instant>,
no_updates: u32,
error: bool, error: bool,
} }
@ -36,26 +39,29 @@ impl Scheduler {
if self.instances.get(&host).is_none() { if self.instances.get(&host).is_none() {
self.instances.insert(host.clone(), Instance { self.instances.insert(host.clone(), Instance {
last_fetch: None, last_fetch: None,
no_updates: 0,
error: false, error: false,
}); });
self.queue.insert(now, host); self.queue.insert(now, host);
} }
} }
pub fn enqueue(&mut self, host: String, fetched_anything: bool, next_interval: Duration) { pub fn reenqueue(&mut self, host: String, new_post_ratio: Option<f64>, mean_interval: Option<Duration>) {
let now = Instant::now(); let now = Instant::now();
let instance = self.instances.get_mut(&host).unwrap(); let instance = self.instances.get_mut(&host).unwrap();
let last_interval = instance.last_fetch.map(|last_fetch| now - last_fetch);
instance.last_fetch = Some(now); instance.last_fetch = Some(now);
instance.error = false; instance.error = false;
if fetched_anything {
instance.no_updates = 0;
} else {
instance.no_updates += 1;
}
let mut next = now + (1 + instance.no_updates) * next_interval; let next_interval = match (new_post_ratio, mean_interval, last_interval) {
(Some(new_post_ratio), Some(mean_interval), _) if new_post_ratio > 0. =>
mean_interval,
(_, _, Some(last_interval)) =>
2 * last_interval,
_ =>
DEFAULT_INTERVAL,
}.max(MIN_INTERVAL).min(MAX_INTERVAL);
let mut next = now + next_interval;
let mut d = 1; let mut d = 1;
// avoid timestamp collision in self.queue // avoid timestamp collision in self.queue
while self.queue.get(&next).is_some() { while self.queue.get(&next).is_some() {

View File

@ -2,19 +2,13 @@ use std::collections::HashSet;
use std::time::Duration; use std::time::Duration;
use crate::feed::Feed; use crate::feed::Feed;
// timeouts are fairly low as they will be multiplied with the amount
// of sequential fetches without new posts by the scheduler.
const DEFAULT_INTERVAL: Duration = Duration::from_secs(30);
const MIN_INTERVAL: Duration = Duration::from_secs(10);
const ERROR_INTERVAL: Duration = Duration::from_secs(180);
#[derive(Debug)] #[derive(Debug)]
pub enum Message { pub enum Message {
Fetched { Fetched {
host: String, host: String,
new_posts: usize, new_post_ratio: Option<f64>,
next_interval: Duration, mean_interval: Option<Duration>,
}, },
IntroduceHosts { hosts: Vec<String> }, IntroduceHosts { hosts: Vec<String> },
} }
@ -34,18 +28,19 @@ pub fn fetch_and_process(
.filter_map(|post| post.timestamp()) .filter_map(|post| post.timestamp())
.collect::<Vec<_>>(); .collect::<Vec<_>>();
timestamps.sort(); timestamps.sort();
let next_interval = if timestamps.len() > 1 { let mean_interval = if timestamps.len() > 2 {
((*timestamps.last().unwrap() - timestamps[0]) / (timestamps.len() as i32) Some(
).to_std().unwrap_or(DEFAULT_INTERVAL) ((*timestamps.last().unwrap() - timestamps[0]) / (timestamps.len() as i32 - 1)
.min(DEFAULT_INTERVAL) ).to_std().unwrap()
.max(MIN_INTERVAL) )
} else { } else {
DEFAULT_INTERVAL None
}; };
// introduce new hosts, validate posts // introduce new hosts, validate posts
let mut hosts = HashSet::new(); let mut hosts = HashSet::new();
let mut new_posts = 0; let mut new_posts = 0;
let posts_len = feed.posts.len();
for post in feed.posts.into_iter() { for post in feed.posts.into_iter() {
// introduce instances from mentions // introduce instances from mentions
for mention in &post.mentions { for mention in &post.mentions {
@ -67,6 +62,13 @@ pub fn fetch_and_process(
log::warn!("drop repost ({:?} on {})", post.account.host(), host); log::warn!("drop repost ({:?} on {})", post.account.host(), host);
} }
} }
log::info!("{}: {}/{} new posts", host, new_posts, posts_len);
let new_post_ratio = if posts_len > 0 {
Some((new_posts as f64) / (posts_len as f64))
} else {
None
};
let hosts = hosts.into_iter() let hosts = hosts.into_iter()
.map(|host| host.to_owned()) .map(|host| host.to_owned())
.collect(); .collect();
@ -77,16 +79,16 @@ pub fn fetch_and_process(
message_tx.send(Message::Fetched { message_tx.send(Message::Fetched {
host: host.clone(), host: host.clone(),
new_posts, new_post_ratio,
next_interval, mean_interval,
}).unwrap(); }).unwrap();
} }
Err(e) => { Err(e) => {
log::error!("Failed fetching {}: {}", host, e); log::error!("Failed fetching {}: {}", host, e);
message_tx.send(Message::Fetched { message_tx.send(Message::Fetched {
host, host,
new_posts: 0, new_post_ratio: None,
next_interval: ERROR_INTERVAL, mean_interval: None,
}).unwrap(); }).unwrap();
} }
} }