hunter: estimate stream_avg_interval

This commit is contained in:
Astro 2023-03-03 20:49:34 +01:00
parent 2f7b9a4b23
commit 6231884d36
1 changed files with 14 additions and 3 deletions

View File

@ -96,9 +96,19 @@ pub async fn run(
if let Ok(stream) = stream_result {
tracing::info!("Processing stream for {}", host);
metrics::increment_gauge!("hunter_requests", 1.0, "type" => "stream");
stream.await;
let start_time = Instant::now();
let post_count = stream.await;
let end_time = Instant::now();
metrics::decrement_gauge!("hunter_requests", 1.0, "type" => "stream");
tracing::warn!("Ended stream for {}", host);
let stream_avg_interval = Duration::from_secs_f64(
(post_count as f64) / (end_time - start_time).as_secs_f64().max(1.0)
);
if mean_interval.map_or(true, |mean_interval| stream_avg_interval < mean_interval) {
mean_interval = Some(stream_avg_interval);
}
}
// Ready for reenqueue
@ -224,7 +234,7 @@ async fn open_stream(
client: &reqwest::Client,
robots_txt: RobotsTxt,
host: Host,
) -> Result<impl Future<Output = ()>, ()> {
) -> Result<impl Future<Output = usize>, ()> {
let url = format!("https://{}/api/v1/streaming/public", host);
if ! robots_txt.allowed(&url) {
tracing::warn!("Streaming of {} forbidden by robots.txt", host);
@ -241,7 +251,7 @@ async fn open_stream(
tracing::error!("Stream error for {}: {}", host, e);
})?;
Ok(stream.for_each(move |post| {
Ok(stream.fold(0, move |post_count, post| {
let message_tx = message_tx.clone();
let mut store = store.clone();
let posts_cache = posts_cache.clone();
@ -252,6 +262,7 @@ async fn open_stream(
let _ = message_tx.send(Message::IntroduceHosts {
hosts: introduce_hosts.into_iter().collect(),
});
post_count + 1
}
}))
}