caveman/hunter/src/scheduler.rs

96 lines
2.7 KiB
Rust
Raw Normal View History

2022-11-02 21:12:16 +01:00
use std::collections::{HashMap, BTreeMap};
use std::time::Duration;
use tokio::time::Instant;
pub struct Instance {
last_fetch: Option<Instant>,
2022-11-02 23:10:59 +01:00
no_updates: u32,
2022-11-02 21:12:16 +01:00
error: bool,
}
2022-11-03 02:54:28 +01:00
/// Scheduler
2022-11-03 15:40:20 +01:00
pub struct Scheduler {
2022-11-02 21:12:16 +01:00
instances: HashMap<String, Instance>,
queue: BTreeMap<Instant, String>,
}
2022-11-03 15:40:20 +01:00
impl Scheduler {
2022-11-02 21:12:16 +01:00
pub fn new() -> Self {
2022-11-03 15:40:20 +01:00
Scheduler {
2022-11-02 21:12:16 +01:00
instances: HashMap::new(),
queue: BTreeMap::new(),
}
}
2022-11-02 22:06:43 +01:00
pub fn size(&self) -> usize {
self.instances.len()
}
pub fn queue_len(&self) -> usize {
self.queue.len()
}
2022-11-03 18:58:37 +01:00
pub async fn introduce(&mut self, redis_man: &mut redis::aio::ConnectionManager, host: String) {
2022-11-02 21:12:16 +01:00
let now = Instant::now();
if self.instances.get(&host).is_none() {
2022-11-03 18:58:37 +01:00
// save for later
crate::redis_store::save_host(redis_man, &host).await;
2022-11-02 21:12:16 +01:00
self.instances.insert(host.clone(), Instance {
last_fetch: None,
2022-11-02 23:10:59 +01:00
no_updates: 0,
2022-11-02 21:12:16 +01:00
error: false,
});
self.queue.insert(now, host);
}
}
2022-11-03 16:17:04 +01:00
pub fn enqueue(&mut self, host: String, fetched_anything: bool, next_interval: Duration) {
2022-11-02 21:12:16 +01:00
let now = Instant::now();
let instance = self.instances.get_mut(&host).unwrap();
instance.last_fetch = Some(now);
instance.error = false;
2022-11-03 16:17:04 +01:00
if fetched_anything {
2022-11-02 23:10:59 +01:00
instance.no_updates = 0;
2022-11-03 16:17:04 +01:00
} else {
instance.no_updates += 1;
2022-11-02 23:10:59 +01:00
}
2022-11-03 02:54:28 +01:00
let mut next = now + (1 + instance.no_updates) * next_interval;
let mut d = 1;
// avoid timestamp collision in self.queue
while self.queue.get(&next).is_some() {
d *= 2;
next += Duration::from_micros(d);
}
2022-11-02 23:10:59 +01:00
self.queue.insert(next, host);
2022-11-02 21:12:16 +01:00
}
2022-11-02 22:06:43 +01:00
pub fn dequeue(&mut self) -> Result<String, Duration> {
2022-11-02 21:12:16 +01:00
let now = Instant::now();
if let Some(time) = self.queue.keys().next().cloned() {
if time <= now {
self.queue.remove(&time)
.map(|host| Ok(host))
2022-11-02 22:06:43 +01:00
.unwrap_or(Err(Duration::from_secs(1)))
2022-11-03 15:39:05 +01:00
.map(|host| {
if let Some(last_fetch) = self.instances.get(&host).and_then(|i| i.last_fetch) {
2022-11-03 17:22:21 +01:00
log::debug!("Fetch {} - last before {:?}", host, now - last_fetch);
2022-11-03 15:39:05 +01:00
} else {
2022-11-03 17:22:21 +01:00
log::debug!("Fetch {} - NEW", host);
2022-11-03 15:39:05 +01:00
}
host
})
2022-11-02 21:12:16 +01:00
} else {
2022-11-02 22:06:43 +01:00
Err(time - now)
2022-11-02 21:12:16 +01:00
}
} else {
2022-11-03 17:22:21 +01:00
log::warn!("empty queue");
2022-11-02 22:06:43 +01:00
Err(Duration::from_secs(60))
2022-11-02 21:12:16 +01:00
}
}
}