use std::collections::{HashMap, BTreeMap}; use std::time::Duration; use rand::{thread_rng, Rng}; use tokio::time::Instant; const MIN_INTERVAL: Duration = Duration::from_secs(30); const MAX_INTERVAL: Duration = Duration::from_secs(7200); const DEFAULT_INTERVAL: Duration = Duration::from_secs(120); pub struct Instance { last_fetch: Option, error: bool, } /// Scheduler pub struct Scheduler { instances: HashMap, queue: BTreeMap, } impl Scheduler { pub fn new() -> Self { Scheduler { instances: HashMap::new(), queue: BTreeMap::new(), } } pub fn size(&self) -> usize { self.instances.len() } pub fn queue_len(&self) -> usize { self.queue.len() } pub async fn introduce(&mut self, host: String) { let now = Instant::now(); if self.instances.get(&host).is_none() { self.instances.insert(host.clone(), Instance { last_fetch: None, error: false, }); self.queue.insert(now, host); } } pub fn reenqueue(&mut self, host: String, new_post_ratio: Option, mean_interval: Option) { let now = Instant::now(); let instance = self.instances.get_mut(&host).unwrap(); let last_interval = instance.last_fetch.map(|last_fetch| now - last_fetch); instance.last_fetch = Some(now); instance.error = false; let next_interval = match (new_post_ratio, mean_interval, last_interval) { (Some(new_post_ratio), Some(mean_interval), _) if new_post_ratio > 0. => mean_interval, (_, _, Some(last_interval)) => { let a = thread_rng().gen_range(2. .. 3.); last_interval.mul_f64(a) } _ => DEFAULT_INTERVAL, }.max(MIN_INTERVAL).min(MAX_INTERVAL); let mut next = now + next_interval; let mut d = 1; // avoid timestamp collision in self.queue while self.queue.get(&next).is_some() { d *= 2; next += Duration::from_micros(d); } self.queue.insert(next, host); } pub fn dequeue(&mut self) -> Result { let now = Instant::now(); if let Some(time) = self.queue.keys().next().cloned() { if time <= now { self.queue.remove(&time) .map(|host| Ok(host)) .unwrap_or(Err(Duration::from_secs(1))) .map(|host| { if let Some(last_fetch) = self.instances.get(&host).and_then(|i| i.last_fetch) { log::debug!("Fetch {} - last before {:.0?}", host, now - last_fetch); } else { log::debug!("Fetch {} - NEW", host); } host }) } else { Err(time - now) } } else { log::warn!("empty queue"); Err(Duration::from_secs(60)) } } }