hunter: Arcify host
This commit is contained in:
parent
69ba3ed657
commit
51a21d3fbc
|
@ -1,4 +1,5 @@
|
|||
use std::collections::{HashMap, BTreeMap};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use rand::{thread_rng, Rng};
|
||||
use tokio::time::Instant;
|
||||
|
@ -7,6 +8,8 @@ const MIN_INTERVAL: Duration = Duration::from_secs(30);
|
|||
const MAX_INTERVAL: Duration = Duration::from_secs(7200);
|
||||
const DEFAULT_INTERVAL: Duration = Duration::from_secs(120);
|
||||
|
||||
pub type Host = Arc<String>;
|
||||
|
||||
pub struct Instance {
|
||||
last_fetch: Option<Instant>,
|
||||
error: bool,
|
||||
|
@ -14,8 +17,8 @@ pub struct Instance {
|
|||
|
||||
/// Scheduler
|
||||
pub struct Scheduler {
|
||||
instances: HashMap<String, Instance>,
|
||||
queue: BTreeMap<Instant, String>,
|
||||
instances: HashMap<Host, Instance>,
|
||||
queue: BTreeMap<Instant, Host>,
|
||||
}
|
||||
|
||||
impl Scheduler {
|
||||
|
@ -36,6 +39,7 @@ impl Scheduler {
|
|||
|
||||
pub async fn introduce(&mut self, host: String) {
|
||||
let now = Instant::now();
|
||||
let host = Arc::new(host);
|
||||
|
||||
if self.instances.get(&host).is_none() {
|
||||
self.instances.insert(host.clone(), Instance {
|
||||
|
@ -46,7 +50,7 @@ impl Scheduler {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn reenqueue(&mut self, host: String, new_post_ratio: Option<f64>, mean_interval: Option<Duration>) {
|
||||
pub fn reenqueue(&mut self, host: Host, new_post_ratio: Option<f64>, mean_interval: Option<Duration>) {
|
||||
let now = Instant::now();
|
||||
|
||||
let instance = self.instances.get_mut(&host).unwrap();
|
||||
|
@ -74,7 +78,7 @@ impl Scheduler {
|
|||
self.queue.insert(next, host);
|
||||
}
|
||||
|
||||
pub fn dequeue(&mut self) -> Result<String, Duration> {
|
||||
pub fn dequeue(&mut self) -> Result<Host, Duration> {
|
||||
let now = Instant::now();
|
||||
|
||||
if let Some(time) = self.queue.keys().next().cloned() {
|
||||
|
|
|
@ -8,6 +8,7 @@ use cave::{
|
|||
};
|
||||
use futures::{StreamExt, future};
|
||||
use crate::posts_cache::PostsCache;
|
||||
use crate::scheduler::Host;
|
||||
use crate::trend_setter::UpdateSet;
|
||||
|
||||
#[derive(Clone)]
|
||||
|
@ -18,7 +19,7 @@ pub struct RobotsTxt {
|
|||
impl RobotsTxt {
|
||||
pub async fn fetch(
|
||||
client: &reqwest::Client,
|
||||
host: &str,
|
||||
host: &Host,
|
||||
) -> Self {
|
||||
let url = format!("https://{}/robots.txt", host);
|
||||
let robot = async {
|
||||
|
@ -56,7 +57,7 @@ impl RobotsTxt {
|
|||
pub enum Message {
|
||||
WorkerDone,
|
||||
Fetched {
|
||||
host: String,
|
||||
host: Host,
|
||||
new_post_ratio: Option<f64>,
|
||||
mean_interval: Option<Duration>,
|
||||
},
|
||||
|
@ -69,7 +70,7 @@ pub async fn run(
|
|||
trend_setter_tx: crate::trend_setter::Tx,
|
||||
posts_cache: PostsCache,
|
||||
client: reqwest::Client,
|
||||
host: String,
|
||||
host: Host,
|
||||
) {
|
||||
// Fetch /robots.txt
|
||||
let robots_txt = RobotsTxt::fetch(&client, &host).await;
|
||||
|
@ -105,7 +106,7 @@ pub async fn run(
|
|||
}
|
||||
}
|
||||
message_tx.send(Message::Fetched {
|
||||
host: host.to_string(),
|
||||
host: host,
|
||||
new_post_ratio,
|
||||
mean_interval,
|
||||
}).unwrap();
|
||||
|
@ -118,7 +119,7 @@ async fn fetch_timeline(
|
|||
posts_cache: &PostsCache,
|
||||
client: &reqwest::Client,
|
||||
robots_txt: RobotsTxt,
|
||||
host: &str,
|
||||
host: &Host,
|
||||
) -> (Option<f64>, Option<Duration>) {
|
||||
let url = format!("https://{}/api/v1/timelines/public?limit=40", host);
|
||||
if ! robots_txt.allowed(&url) {
|
||||
|
@ -153,7 +154,7 @@ async fn process_posts(
|
|||
store: &mut Store,
|
||||
trend_setter_tx: &crate::trend_setter::Tx,
|
||||
posts_cache: &PostsCache,
|
||||
host: &str,
|
||||
host: &Host,
|
||||
posts: impl Iterator<Item = Post>,
|
||||
) -> (Option<f64>, HashSet<String>) {
|
||||
// introduce new hosts, validate posts
|
||||
|
@ -177,7 +178,7 @@ async fn process_posts(
|
|||
let update_set = UpdateSet::from(&post);
|
||||
// send away to redis
|
||||
if store.save_post(post).await == Ok(true) {
|
||||
log::debug!("new from {} {} {}", host, if host == author_host { "==" } else { "!=" }, author_host);
|
||||
// log::debug!("new from {} {} {}", host, if host == author_host { "==" } else { "!=" }, author_host);
|
||||
new_posts += 1;
|
||||
|
||||
if ! update_set.is_empty() {
|
||||
|
@ -213,7 +214,7 @@ async fn open_stream(
|
|||
posts_cache: &PostsCache,
|
||||
client: &reqwest::Client,
|
||||
robots_txt: RobotsTxt,
|
||||
host: String,
|
||||
host: Host,
|
||||
) -> Result<impl Future<Output = ()>, ()> {
|
||||
let url = format!("https://{}/api/v1/streaming/public", host);
|
||||
if ! robots_txt.allowed(&url) {
|
||||
|
|
Loading…
Reference in New Issue