caveman/hunter/src/scheduler.rs

115 lines
3.4 KiB
Rust

use std::collections::{HashMap, BTreeMap};
use std::sync::Arc;
use std::time::Duration;
use rand::{thread_rng, Rng};
use tokio::time::Instant;
use crate::block_list::BlockList;
const MIN_INTERVAL: Duration = Duration::from_secs(30);
const MAX_INTERVAL: Duration = Duration::from_secs(7200);
const DEFAULT_INTERVAL: Duration = Duration::from_secs(120);
pub type Host = Arc<String>;
pub struct Instance {
last_fetch: Option<Instant>,
error: bool,
}
/// Scheduler
pub struct Scheduler {
instances: HashMap<Host, Instance>,
queue: BTreeMap<Instant, Host>,
block_list: BlockList,
}
impl Scheduler {
pub fn new(block_list: BlockList) -> Self {
Scheduler {
instances: HashMap::new(),
queue: BTreeMap::new(),
block_list,
}
}
pub fn size(&self) -> usize {
self.instances.len()
}
pub fn queue_len(&self) -> usize {
self.queue.len()
}
pub async fn introduce(&mut self, host: String) -> bool {
if self.block_list.is_blocked(&host).await {
return false;
}
let now = Instant::now();
let host = Arc::new(host);
if let std::collections::hash_map::Entry::Vacant(entry) = self.instances.entry(host.clone()) {
entry.insert(Instance {
last_fetch: None,
error: false,
});
self.queue.insert(now, host);
}
true
}
pub fn reenqueue(&mut self, host: Host, new_post_ratio: Option<f64>, mean_interval: Option<Duration>) {
let now = Instant::now();
let instance = self.instances.get_mut(&host).unwrap();
let last_interval = instance.last_fetch.map(|last_fetch| now - last_fetch);
instance.last_fetch = Some(now);
instance.error = false;
let next_interval = match (new_post_ratio, mean_interval, last_interval) {
(Some(new_post_ratio), Some(mean_interval), _) if new_post_ratio > 0. =>
mean_interval,
(_, _, Some(last_interval)) => {
let a = thread_rng().gen_range(2. .. 3.);
last_interval.mul_f64(a)
}
_ =>
DEFAULT_INTERVAL,
}.max(MIN_INTERVAL).min(MAX_INTERVAL);
let mut next = now + next_interval;
let mut d = 1;
// avoid timestamp collision in self.queue
while self.queue.get(&next).is_some() {
d *= 2;
next += Duration::from_micros(d);
}
self.queue.insert(next, host);
}
pub fn dequeue(&mut self) -> Result<Host, Duration> {
let now = Instant::now();
if let Some(time) = self.queue.keys().next().cloned() {
if time <= now {
self.queue.remove(&time)
.ok_or(Duration::from_secs(1))
.map(|host| {
if let Some(last_fetch) = self.instances.get(&host).and_then(|i| i.last_fetch) {
tracing::debug!("Fetch {} - last before {:.0?}", host, now - last_fetch);
} else {
tracing::debug!("Fetch {} - NEW", host);
}
host
})
} else {
Err(time - now)
}
} else {
tracing::warn!("empty queue");
Err(Duration::from_secs(60))
}
}
}