caveman/hunter/src/worker.rs

87 lines
3.1 KiB
Rust
Raw Normal View History

2022-11-02 21:12:16 +01:00
use std::collections::HashSet;
2022-11-02 22:06:43 +01:00
use std::time::Duration;
2022-11-03 16:17:04 +01:00
use crate::feed::Feed;
2022-11-02 21:12:16 +01:00
2022-11-02 22:42:43 +01:00
const DEFAULT_INTERVAL: Duration = Duration::from_secs(3600);
2022-11-02 23:10:59 +01:00
const MIN_INTERVAL: Duration = Duration::from_secs(10);
2022-11-02 22:42:43 +01:00
2022-11-02 21:12:16 +01:00
#[derive(Debug)]
pub enum Message {
2022-11-02 23:10:59 +01:00
Fetched {
host: String,
2022-11-03 16:17:04 +01:00
new_posts: usize,
2022-11-02 23:10:59 +01:00
next_interval: Duration,
},
2022-11-02 21:12:16 +01:00
Error { host: String },
IntroduceHosts { hosts: Vec<String> },
}
pub fn fetch_and_process(
2022-11-03 01:21:53 +01:00
message_tx: tokio::sync::mpsc::UnboundedSender<Message>,
2022-11-03 16:17:04 +01:00
mut redis_man: redis::aio::ConnectionManager,
2022-11-02 21:12:16 +01:00
client: reqwest::Client,
host: String,
) {
let url = format!("https://{}/api/v1/timelines/public", host);
tokio::spawn(async move {
2022-11-03 01:21:53 +01:00
match Feed::fetch(&client, &url).await {
2022-11-02 21:12:16 +01:00
Ok(feed) => {
2022-11-02 22:42:43 +01:00
// Analyze time intervals between posts to estimate when to fetch next
let mut timestamps = feed.posts.iter()
.filter_map(|post| post.timestamp())
.collect::<Vec<_>>();
timestamps.sort();
2022-11-02 23:20:53 +01:00
let next_interval = if timestamps.len() > 1 {
((*timestamps.last().unwrap() - timestamps[0]) / (timestamps.len() as i32)
2022-11-03 00:27:16 +01:00
).to_std().unwrap_or(DEFAULT_INTERVAL)
2022-11-02 23:20:53 +01:00
.min(DEFAULT_INTERVAL)
2022-11-02 23:10:59 +01:00
.max(MIN_INTERVAL)
2022-11-02 22:42:43 +01:00
} else {
DEFAULT_INTERVAL
};
// introduce new hosts, validate posts
2022-11-02 21:12:16 +01:00
let mut hosts = HashSet::new();
2022-11-03 16:17:04 +01:00
let mut new_posts = 0;
2022-11-02 21:12:16 +01:00
for post in feed.posts.into_iter() {
2022-11-03 03:42:13 +01:00
// introduce instances from mentions
for mention in &post.mentions {
if let Some(user_host) = mention.user_host() {
hosts.insert(user_host);
}
}
2022-11-02 21:12:16 +01:00
if let Some(author_host) = post.account.host() {
2022-11-03 16:17:04 +01:00
if author_host == host && post.url_host().map(|s| s == host).unwrap_or(false) {
2022-11-03 01:21:53 +01:00
// send away to redis
2022-11-03 16:17:04 +01:00
if crate::redis_store::save_post(&mut redis_man, &host, post).await {
new_posts += 1;
}
2022-11-02 21:12:16 +01:00
}
2022-11-03 03:42:13 +01:00
// introduce instances from authors
2022-11-02 21:12:16 +01:00
hosts.insert(author_host);
}
}
2022-11-03 16:17:04 +01:00
let hosts = hosts.into_iter()
.map(|host| host.to_owned())
.collect();
2022-11-03 01:21:53 +01:00
let _ = message_tx.send(Message::IntroduceHosts { hosts });
2022-11-02 22:06:43 +01:00
// successfully fetched, save for future run
crate::redis_store::save_host(&mut redis_man, &host).await;
2022-11-03 01:21:53 +01:00
message_tx.send(Message::Fetched {
2022-11-02 22:06:43 +01:00
host: host.clone(),
2022-11-03 16:17:04 +01:00
new_posts,
2022-11-02 22:42:43 +01:00
next_interval,
2022-11-02 22:06:43 +01:00
}).unwrap();
2022-11-02 21:12:16 +01:00
}
Err(e) => {
2022-11-03 17:22:21 +01:00
log::error!("Failed fetching {}: {}", host, e);
2022-11-03 01:21:53 +01:00
message_tx.send(Message::Error { host }).unwrap();
2022-11-02 21:12:16 +01:00
}
}
});
}