hunter/worker: distinguish between stream+stream-token for stats

This commit is contained in:
Astro 2023-10-17 16:03:45 +02:00
parent 256f998d12
commit 2f66958a6a
1 changed files with 8 additions and 6 deletions

View File

@ -125,14 +125,14 @@ pub async fn run(
// Process stream
let (mut new_post_ratio, mut mean_interval) = timeline_result.unwrap_or((None, None));
if let Ok(stream) = stream_result {
if let Ok((stats_key, stream)) = stream_result {
tracing::info!("Processing stream for {}", &host.host);
metrics::increment_gauge!("hunter_requests", 1.0, "type" => "stream");
metrics::increment_gauge!("hunter_requests", 1.0, "type" => stats_key);
let start_time = Instant::now();
let post_count = stream.await;
let end_time = Instant::now();
metrics::decrement_gauge!("hunter_requests", 1.0, "type" => "stream");
metrics::decrement_gauge!("hunter_requests", 1.0, "type" => stats_key);
tracing::warn!("Ended stream for {}. {} posts in {:?}", &host.host, post_count, end_time - start_time);
if post_count > 0 {
@ -281,7 +281,7 @@ async fn open_stream(
client: &reqwest::Client,
robots_txt: RobotsTxt,
host: Host,
) -> Result<impl Future<Output = usize>, String> {
) -> Result<(&'static str, impl Future<Output = usize>), String> {
let url = format!("https://{}/api/v1/streaming/public", host);
if ! robots_txt.allowed(&url) {
return Err(format!("Streaming of {} forbidden by robots.txt", host));
@ -294,6 +294,7 @@ async fn open_stream(
let mut stream = Feed::stream(client, &url).await;
metrics::decrement_gauge!("hunter_requests", 1.0, "type" => "stream_open");
let mut stats_key = "stream";
let mut prev_token: Option<String> = None;
let mut token_tries = 0;
while let Err(StreamError::HttpStatus(StatusCode::UNAUTHORIZED)) = &stream {
@ -313,6 +314,7 @@ async fn open_stream(
metrics::increment_gauge!("hunter_requests", 1.0, "type" => "stream_open_token");
stream = Feed::stream(client, &url).await;
metrics::decrement_gauge!("hunter_requests", 1.0, "type" => "stream_open_token");
stats_key = "stream_token";
} else {
tracing::info!("No working token for {}", host);
break;
@ -330,7 +332,7 @@ async fn open_stream(
format!("Stream error for {}: {}", host, e)
})?;
Ok(stream.fold(0, move |post_count, post| {
Ok((stats_key, stream.fold(0, move |post_count, post| {
let message_tx = message_tx.clone();
let mut store = store.clone();
let posts_cache = posts_cache.clone();
@ -343,5 +345,5 @@ async fn open_stream(
}
post_count + 1
}
}))
})))
}