Compare commits

...

5 Commits

Author SHA1 Message Date
Astro d23bb61cb6 hunter/worker: log with stats_key to differentiate "stream_token" 2023-10-30 00:36:01 +01:00
Astro 57da2139c1 butcher/trend_setter: tweak algorithm to make min_after_mentions depend on period 2023-10-30 00:35:23 +01:00
Astro c567cb62e9 hunter/main: increase metric timeout
stream_token has low fluctuation, it was disappearing from the stats.
2023-10-29 20:51:04 +01:00
Astro c4f2b1f640 hunter/main: limit http client redirect policy 2023-10-29 20:12:39 +01:00
Astro 7b6e480576 buzzback: output Ok response body 2023-10-29 20:03:32 +01:00
6 changed files with 22 additions and 15 deletions

View File

@ -15,7 +15,7 @@ use cave::{
};
#[cfg(not(debug))]
const MIN_INTERVAL: Duration = Duration::from_secs(10);
const MIN_INTERVAL: Duration = Duration::from_secs(30);
#[cfg(debug)]
const MIN_INTERVAL: Duration = Duration::from_secs(5);
@ -157,8 +157,8 @@ async fn run(
let old_tags = old_tags.into_iter().collect::<HashSet<String>>();
let mut keep = TREND_POOL_SIZE.min(scores.len());
// shrink sorted set of tags as long as score is 0
while keep > 0 && scores[keep - 1].0 <= 0. {
// shrink sorted set of tags as long as score is less than 1
while keep > 0 && scores[keep - 1].0 < 1.0 {
keep -= 1;
}
let remove = scores[keep..].iter()

View File

@ -34,8 +34,8 @@ async fn follow_back(
&action,
).await;
match result {
Ok(()) => {
tracing::info!("Ok {}", follow.id);
Ok(body) => {
tracing::info!("Ok {}: {:?}", follow.id, body);
}
Err(e) => {
tracing::error!("POST: {:?}", e);
@ -64,7 +64,7 @@ async fn main() {
let client = Arc::new(
reqwest::Client::builder()
.timeout(Duration::from_secs(5))
.timeout(Duration::from_secs(10))
.user_agent(concat!(
env!("CARGO_PKG_NAME"),
"/",

View File

@ -13,7 +13,7 @@ pub async fn send<T: Serialize>(
key_id: &str,
private_key: &PrivateKey,
body: &T,
) -> Result<(), Error> {
) -> Result<String, Error> {
let body = Arc::new(
serde_json::to_vec(body)
.map_err(Error::Json)?
@ -27,7 +27,7 @@ pub async fn send_raw(
key_id: &str,
private_key: &PrivateKey,
body: Arc<Vec<u8>>,
) -> Result<(), Error> {
) -> Result<String, Error> {
let url = reqwest::Url::parse(uri)
.map_err(|_| Error::InvalidUri)?;
let host = format!("{}", url.host().ok_or(Error::InvalidUri)?);
@ -49,7 +49,7 @@ pub async fn send_raw(
let res = client.execute(req)
.await?;
if res.status() >= StatusCode::OK && res.status() < StatusCode::MULTIPLE_CHOICES {
Ok(())
Ok(res.text().await?)
} else {
tracing::error!("send_raw {} response HTTP {}", url, res.status());
let response = res.text().await?;

View File

@ -1,7 +1,11 @@
use std::collections::BTreeSet;
use crate::PERIOD_COMPARE_WINDOW;
const MIN_AFTER_MENTIONS: usize = 5;
const MIN_AFTER_MENTIONS: &[(u64, usize)] = &[
(4, 7),
(24, 17),
(168, 57)
];
#[derive(Debug)]
pub struct TrendTag {
@ -53,8 +57,10 @@ impl TrendTag {
}
}
if after_mentions < MIN_AFTER_MENTIONS {
return 0.;
for (min_period, min_after_mentions) in MIN_AFTER_MENTIONS {
if period >= *min_period && after_mentions < *min_after_mentions {
return 0.;
}
}
let before = if before_hours > 0 && before_mentions > 0 {

View File

@ -37,7 +37,7 @@ async fn run() {
PrometheusBuilder::new()
.with_http_listener(([0; 8], config.prometheus_port))
.add_global_label("application", env!("CARGO_PKG_NAME"))
.idle_timeout(MetricKindMask::ALL, Some(Duration::from_secs(600)))
.idle_timeout(MetricKindMask::ALL, Some(Duration::from_secs(6 * 3600)))
.install()
.unwrap();
@ -81,6 +81,7 @@ async fn run() {
.timeout(Duration::from_secs(30))
.tcp_keepalive(Duration::from_secs(300))
.pool_max_idle_per_host(0)
.redirect(reqwest::redirect::Policy::limited(2))
.user_agent(
format!("{}/{} (+https://fedi.buzz/)", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION"))
)

View File

@ -128,14 +128,14 @@ pub async fn run(
// Process stream
let (mut new_post_ratio, mut mean_interval) = timeline_result.unwrap_or((None, None));
if let Ok((stats_key, stream)) = stream_result {
tracing::info!("Processing stream for {}", &host.host);
tracing::info!("Processing {stats_key} for {}", &host.host);
metrics::increment_gauge!("hunter_requests", 1.0, "type" => stats_key);
let start_time = Instant::now();
let post_count = stream.await;
let end_time = Instant::now();
metrics::decrement_gauge!("hunter_requests", 1.0, "type" => stats_key);
tracing::warn!("Ended stream for {}. {} posts in {:?}", &host.host, post_count, end_time - start_time);
tracing::warn!("Ended {stats_key} for {}. {} posts in {:?}", &host.host, post_count, end_time - start_time);
if post_count > 0 {
if let Some(ref mut new_post_ratio) = new_post_ratio {