From 51a21d3fbc445601c0c99ebe4d3bb3115b92b1cf Mon Sep 17 00:00:00 2001 From: Astro Date: Fri, 2 Dec 2022 00:50:01 +0100 Subject: [PATCH] hunter: Arcify host --- hunter/src/scheduler.rs | 12 ++++++++---- hunter/src/worker.rs | 17 +++++++++-------- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/hunter/src/scheduler.rs b/hunter/src/scheduler.rs index a0dc838..27dbe81 100644 --- a/hunter/src/scheduler.rs +++ b/hunter/src/scheduler.rs @@ -1,4 +1,5 @@ use std::collections::{HashMap, BTreeMap}; +use std::sync::Arc; use std::time::Duration; use rand::{thread_rng, Rng}; use tokio::time::Instant; @@ -7,6 +8,8 @@ const MIN_INTERVAL: Duration = Duration::from_secs(30); const MAX_INTERVAL: Duration = Duration::from_secs(7200); const DEFAULT_INTERVAL: Duration = Duration::from_secs(120); +pub type Host = Arc; + pub struct Instance { last_fetch: Option, error: bool, @@ -14,8 +17,8 @@ pub struct Instance { /// Scheduler pub struct Scheduler { - instances: HashMap, - queue: BTreeMap, + instances: HashMap, + queue: BTreeMap, } impl Scheduler { @@ -36,6 +39,7 @@ impl Scheduler { pub async fn introduce(&mut self, host: String) { let now = Instant::now(); + let host = Arc::new(host); if self.instances.get(&host).is_none() { self.instances.insert(host.clone(), Instance { @@ -46,7 +50,7 @@ impl Scheduler { } } - pub fn reenqueue(&mut self, host: String, new_post_ratio: Option, mean_interval: Option) { + pub fn reenqueue(&mut self, host: Host, new_post_ratio: Option, mean_interval: Option) { let now = Instant::now(); let instance = self.instances.get_mut(&host).unwrap(); @@ -74,7 +78,7 @@ impl Scheduler { self.queue.insert(next, host); } - pub fn dequeue(&mut self) -> Result { + pub fn dequeue(&mut self) -> Result { let now = Instant::now(); if let Some(time) = self.queue.keys().next().cloned() { diff --git a/hunter/src/worker.rs b/hunter/src/worker.rs index 3a4803e..0501568 100644 --- a/hunter/src/worker.rs +++ b/hunter/src/worker.rs @@ -8,6 +8,7 @@ use cave::{ }; use futures::{StreamExt, future}; use crate::posts_cache::PostsCache; +use crate::scheduler::Host; use crate::trend_setter::UpdateSet; #[derive(Clone)] @@ -18,7 +19,7 @@ pub struct RobotsTxt { impl RobotsTxt { pub async fn fetch( client: &reqwest::Client, - host: &str, + host: &Host, ) -> Self { let url = format!("https://{}/robots.txt", host); let robot = async { @@ -56,7 +57,7 @@ impl RobotsTxt { pub enum Message { WorkerDone, Fetched { - host: String, + host: Host, new_post_ratio: Option, mean_interval: Option, }, @@ -69,7 +70,7 @@ pub async fn run( trend_setter_tx: crate::trend_setter::Tx, posts_cache: PostsCache, client: reqwest::Client, - host: String, + host: Host, ) { // Fetch /robots.txt let robots_txt = RobotsTxt::fetch(&client, &host).await; @@ -105,7 +106,7 @@ pub async fn run( } } message_tx.send(Message::Fetched { - host: host.to_string(), + host: host, new_post_ratio, mean_interval, }).unwrap(); @@ -118,7 +119,7 @@ async fn fetch_timeline( posts_cache: &PostsCache, client: &reqwest::Client, robots_txt: RobotsTxt, - host: &str, + host: &Host, ) -> (Option, Option) { let url = format!("https://{}/api/v1/timelines/public?limit=40", host); if ! robots_txt.allowed(&url) { @@ -153,7 +154,7 @@ async fn process_posts( store: &mut Store, trend_setter_tx: &crate::trend_setter::Tx, posts_cache: &PostsCache, - host: &str, + host: &Host, posts: impl Iterator, ) -> (Option, HashSet) { // introduce new hosts, validate posts @@ -177,7 +178,7 @@ async fn process_posts( let update_set = UpdateSet::from(&post); // send away to redis if store.save_post(post).await == Ok(true) { - log::debug!("new from {} {} {}", host, if host == author_host { "==" } else { "!=" }, author_host); + // log::debug!("new from {} {} {}", host, if host == author_host { "==" } else { "!=" }, author_host); new_posts += 1; if ! update_set.is_empty() { @@ -213,7 +214,7 @@ async fn open_stream( posts_cache: &PostsCache, client: &reqwest::Client, robots_txt: RobotsTxt, - host: String, + host: Host, ) -> Result, ()> { let url = format!("https://{}/api/v1/streaming/public", host); if ! robots_txt.allowed(&url) {