2022-11-02 21:12:16 +01:00
|
|
|
use std::collections::HashSet;
|
2022-11-02 22:06:43 +01:00
|
|
|
use std::time::Duration;
|
2022-11-02 23:10:59 +01:00
|
|
|
use chrono::{DateTime, FixedOffset};
|
2022-11-02 21:12:16 +01:00
|
|
|
use crate::feed;
|
|
|
|
|
2022-11-02 22:42:43 +01:00
|
|
|
const DEFAULT_INTERVAL: Duration = Duration::from_secs(3600);
|
2022-11-02 23:10:59 +01:00
|
|
|
const MIN_INTERVAL: Duration = Duration::from_secs(10);
|
2022-11-02 22:42:43 +01:00
|
|
|
|
2022-11-02 21:12:16 +01:00
|
|
|
#[derive(Debug)]
|
|
|
|
pub enum Message {
|
2022-11-02 23:10:59 +01:00
|
|
|
Fetched {
|
|
|
|
host: String,
|
|
|
|
next_interval: Duration,
|
|
|
|
latest_timestamp: Option<DateTime<FixedOffset>>,
|
|
|
|
},
|
2022-11-02 21:12:16 +01:00
|
|
|
Error { host: String },
|
|
|
|
IntroduceHosts { hosts: Vec<String> },
|
|
|
|
Posts { posts: Vec<feed::Post> },
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn fetch_and_process(
|
|
|
|
tx: tokio::sync::mpsc::UnboundedSender<Message>,
|
|
|
|
client: reqwest::Client,
|
|
|
|
host: String,
|
|
|
|
) {
|
|
|
|
let url = format!("https://{}/api/v1/timelines/public", host);
|
|
|
|
tokio::spawn(async move {
|
|
|
|
match feed::Feed::fetch(&client, &url).await {
|
|
|
|
Ok(feed) => {
|
2022-11-02 22:42:43 +01:00
|
|
|
// Analyze time intervals between posts to estimate when to fetch next
|
|
|
|
let mut timestamps = feed.posts.iter()
|
|
|
|
.filter_map(|post| post.timestamp())
|
|
|
|
.collect::<Vec<_>>();
|
|
|
|
timestamps.sort();
|
2022-11-02 23:20:53 +01:00
|
|
|
let next_interval = if timestamps.len() > 1 {
|
|
|
|
((*timestamps.last().unwrap() - timestamps[0]) / (timestamps.len() as i32)
|
|
|
|
).to_std().unwrap()
|
|
|
|
.min(DEFAULT_INTERVAL)
|
2022-11-02 23:10:59 +01:00
|
|
|
.max(MIN_INTERVAL)
|
2022-11-02 22:42:43 +01:00
|
|
|
} else {
|
|
|
|
DEFAULT_INTERVAL
|
|
|
|
};
|
2022-11-02 23:20:53 +01:00
|
|
|
let latest_timestamp = timestamps.iter().last().cloned();
|
2022-11-02 22:42:43 +01:00
|
|
|
|
|
|
|
// introduce new hosts, validate posts
|
2022-11-02 21:12:16 +01:00
|
|
|
let mut posts = Vec::with_capacity(feed.posts.len());
|
|
|
|
let mut hosts = HashSet::new();
|
|
|
|
for post in feed.posts.into_iter() {
|
|
|
|
if let Some(author_host) = post.account.host() {
|
2022-11-02 21:49:37 +01:00
|
|
|
if author_host == host && post.url_host().as_ref() == Some(&host) {
|
2022-11-02 21:12:16 +01:00
|
|
|
posts.push(post);
|
|
|
|
}
|
|
|
|
hosts.insert(author_host);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
let hosts = hosts.into_iter().collect();
|
|
|
|
tx.send(Message::IntroduceHosts { hosts }).unwrap();
|
|
|
|
tx.send(Message::Posts { posts }).unwrap();
|
2022-11-02 22:06:43 +01:00
|
|
|
|
|
|
|
tx.send(Message::Fetched {
|
|
|
|
host: host.clone(),
|
2022-11-02 22:42:43 +01:00
|
|
|
next_interval,
|
2022-11-02 23:10:59 +01:00
|
|
|
latest_timestamp,
|
2022-11-02 22:06:43 +01:00
|
|
|
}).unwrap();
|
2022-11-02 21:12:16 +01:00
|
|
|
}
|
|
|
|
Err(e) => {
|
|
|
|
println!("Failed fetching {}: {}", host, e);
|
|
|
|
tx.send(Message::Error { host }).unwrap();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
});
|
|
|
|
}
|