save hosts to/load from redis
This commit is contained in:
parent
d0a5f11040
commit
ca66146eb3
|
@ -36,13 +36,16 @@ async fn main() {
|
|||
systemd_status("Starting redis client");
|
||||
let redis_client = redis::Client::open(config.redis)
|
||||
.expect("redis::Client");
|
||||
let redis_man = redis::aio::ConnectionManager::new(redis_client).await
|
||||
let mut redis_man = redis::aio::ConnectionManager::new(redis_client).await
|
||||
.expect("redis::aio::ConnectionManager");
|
||||
|
||||
systemd_status("Starting scheduler");
|
||||
let mut scheduler = scheduler::Scheduler::new();
|
||||
for host in config.hosts.into_iter() {
|
||||
scheduler.introduce(host);
|
||||
scheduler.introduce(&mut redis_man, host).await;
|
||||
}
|
||||
for host in crate::redis_store::get_hosts(&mut redis_man).await.into_iter() {
|
||||
scheduler.introduce(&mut redis_man, host).await;
|
||||
}
|
||||
|
||||
systemd_status("Starting HTTP client");
|
||||
|
@ -87,7 +90,7 @@ async fn main() {
|
|||
}
|
||||
Message::IntroduceHosts { hosts } => {
|
||||
for host in hosts.into_iter() {
|
||||
scheduler.introduce(host);
|
||||
scheduler.introduce(&mut redis_man, host).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -81,3 +81,33 @@ async fn save_post_tags(man: &mut redis::aio::ConnectionManager, post: Post) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn save_host(man: &mut redis::aio::ConnectionManager, host: &str) {
|
||||
redis::Cmd::set(format!("h:{}", host), "1")
|
||||
.query_async::<_, ()>(man)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
pub async fn get_hosts(
|
||||
man: &mut redis::aio::ConnectionManager,
|
||||
) -> Vec<String> {
|
||||
let mut results = vec![];
|
||||
scan(man, "h:*", |key| results.push(key[2..].to_string())).await;
|
||||
results
|
||||
}
|
||||
|
||||
async fn scan<F: FnMut(String)>(
|
||||
man: &mut redis::aio::ConnectionManager,
|
||||
pattern: &str,
|
||||
mut f: F,
|
||||
) {
|
||||
let mut cmd = redis::cmd("SCAN");
|
||||
cmd.cursor_arg(0).arg("MATCH").arg(pattern);
|
||||
let mut iter = cmd.iter_async::<String>(man)
|
||||
.await
|
||||
.unwrap();
|
||||
while let Some(key) = iter.next_item().await {
|
||||
f(key);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,10 +30,13 @@ impl Scheduler {
|
|||
self.queue.len()
|
||||
}
|
||||
|
||||
pub fn introduce(&mut self, host: String) {
|
||||
pub async fn introduce(&mut self, redis_man: &mut redis::aio::ConnectionManager, host: String) {
|
||||
let now = Instant::now();
|
||||
|
||||
if self.instances.get(&host).is_none() {
|
||||
// save for later
|
||||
crate::redis_store::save_host(redis_man, &host).await;
|
||||
|
||||
self.instances.insert(host.clone(), Instance {
|
||||
last_fetch: None,
|
||||
no_updates: 0,
|
||||
|
|
Loading…
Reference in New Issue