caveman/src/worker.rs

81 lines
2.9 KiB
Rust

use std::collections::HashSet;
use std::time::Duration;
use chrono::{DateTime, FixedOffset};
use crate::feed::{Feed, Post};
const DEFAULT_INTERVAL: Duration = Duration::from_secs(3600);
const MIN_INTERVAL: Duration = Duration::from_secs(10);
#[derive(Debug)]
pub enum Message {
Fetched {
host: String,
next_interval: Duration,
latest_timestamp: Option<DateTime<FixedOffset>>,
},
Error { host: String },
IntroduceHosts { hosts: Vec<String> },
}
pub fn fetch_and_process(
message_tx: tokio::sync::mpsc::UnboundedSender<Message>,
posts_tx: tokio::sync::mpsc::UnboundedSender<Post>,
client: reqwest::Client,
host: String,
) {
let url = format!("https://{}/api/v1/timelines/public", host);
tokio::spawn(async move {
match Feed::fetch(&client, &url).await {
Ok(feed) => {
// Analyze time intervals between posts to estimate when to fetch next
let mut timestamps = feed.posts.iter()
.filter_map(|post| post.timestamp())
.collect::<Vec<_>>();
timestamps.sort();
let next_interval = if timestamps.len() > 1 {
((*timestamps.last().unwrap() - timestamps[0]) / (timestamps.len() as i32)
).to_std().unwrap_or(DEFAULT_INTERVAL)
.min(DEFAULT_INTERVAL)
.max(MIN_INTERVAL)
} else {
DEFAULT_INTERVAL
};
let latest_timestamp = timestamps.iter().last().cloned();
// introduce new hosts, validate posts
let mut hosts = HashSet::new();
for post in feed.posts.into_iter() {
// introduce instances from mentions
for mention in &post.mentions {
if let Some(user_host) = mention.user_host() {
hosts.insert(user_host);
}
}
if let Some(author_host) = post.account.host() {
if author_host == host && post.url_host().as_ref() == Some(&host) {
// send away to redis
let _ = posts_tx.send(post);
}
// introduce instances from authors
hosts.insert(author_host);
}
}
let hosts = hosts.into_iter().collect();
let _ = message_tx.send(Message::IntroduceHosts { hosts });
message_tx.send(Message::Fetched {
host: host.clone(),
next_interval,
latest_timestamp,
}).unwrap();
}
Err(e) => {
println!("Failed fetching {}: {}", host, e);
message_tx.send(Message::Error { host }).unwrap();
}
}
});
}