hunter: refactor worker to prepare for more fetch steps

This commit is contained in:
Astro 2022-11-11 18:07:59 +01:00
parent 51a4f7c5ea
commit e620c69a32
3 changed files with 97 additions and 77 deletions

View File

@ -4,6 +4,7 @@ version = "0.0.0"
edition = "2021"
[dependencies]
futures = "0.3"
tokio = { version = "1", features = ["full"] }
reqwest = { version = "0.11", features = ["json", "deflate", "gzip"] }
serde = { version = "1", features = ["derive"] }

View File

@ -16,7 +16,7 @@ async fn main() {
let config = config::Config::load();
let store = cave::store::Store::new(config.redis).await;
let mut store = cave::store::Store::new(config.redis).await;
cave::systemd::status("Starting trend_setter");
let trend_setter_tx = trend_setter::start(store.clone());
@ -28,7 +28,7 @@ async fn main() {
scheduler.introduce(host).await;
}
cave::systemd::status("Loading known hosts from redis");
for host in store.get_hosts().await.into_iter() {
for host in store.get_hosts().await.unwrap().into_iter() {
scheduler.introduce(host).await;
}
@ -77,13 +77,13 @@ async fn main() {
}
Ok(host) => {
workers_active += 1;
worker::fetch_and_process(
tokio::spawn(worker::run(
message_tx.clone(),
store.clone(),
trend_setter_tx.clone(),
client.clone(),
host
);
));
cave::systemd::watchdog();
}
}

View File

@ -1,7 +1,7 @@
use std::collections::HashSet;
use std::time::Duration;
use cave::{
feed::Feed,
feed::{Feed, Post},
store::Store,
};
use crate::trend_setter::UpdateSet;
@ -17,90 +17,109 @@ pub enum Message {
IntroduceHosts { hosts: Vec<String> },
}
pub fn fetch_and_process(
pub async fn run(
message_tx: tokio::sync::mpsc::UnboundedSender<Message>,
mut store: Store,
store: Store,
trend_setter_tx: crate::trend_setter::Tx,
client: reqwest::Client,
host: String,
) {
// let (_, stream) = future::join(
fetch_timeline(
message_tx.clone(), store.clone(),
trend_setter_tx.clone(),
client.clone(), &host
// ),
// open_stream(&host),
).await;
}
async fn fetch_timeline(
message_tx: tokio::sync::mpsc::UnboundedSender<Message>,
mut store: Store,
trend_setter_tx: crate::trend_setter::Tx,
client: reqwest::Client,
host: &str,
) {
let url = format!("https://{}/api/v1/timelines/public?limit=40", host);
tokio::spawn(async move {
match Feed::fetch(&client, &url).await {
Ok(feed) => {
// Analyze time intervals between posts to estimate when to fetch next
let mut timestamps = feed.posts.iter()
.filter_map(|post| post.timestamp())
.collect::<Vec<_>>();
timestamps.sort();
let mean_interval = if timestamps.len() > 2 {
Some(
((*timestamps.last().unwrap() - timestamps[0]) / (timestamps.len() as i32 - 1)
).to_std().unwrap()
)
} else {
None
};
// introduce new hosts, validate posts
let mut hosts = HashSet::new();
let mut new_posts = 0;
let posts_len = feed.posts.len();
for post in feed.posts.into_iter() {
// introduce instances from mentions
for mention in &post.mentions {
if let Some(user_host) = mention.user_host() {
hosts.insert(user_host);
}
}
match Feed::fetch(&client, &url).await {
Ok(feed) => {
let mean_interval = feed.mean_post_interval();
// check if it's an actual post, not a repost
if let Some(author_host) = post.account.host() {
// send away to redis
let update_set = UpdateSet::from(&post);
if store.save_post(post).await == Ok(true) {
new_posts += 1;
let (new_post_ratio, introduce_hosts) = process_posts(&mut store, &trend_setter_tx, &host, feed.posts.into_iter()).await;
let _ = message_tx.send(Message::IntroduceHosts {
hosts: introduce_hosts.into_iter().collect(),
});
if ! update_set.is_empty() {
trend_setter_tx.send(update_set).await.unwrap();
}
}
// successfully fetched, save for future run
store.save_host(&host).await.unwrap();
// introduce instances from authors
hosts.insert(author_host);
} else {
log::warn!("drop repost ({:?} on {})", post.account.host(), host);
}
}
log::info!("{}: {}/{} new posts", host, new_posts, posts_len);
let new_post_ratio = if posts_len > 0 {
Some((new_posts as f64) / (posts_len as f64))
} else {
None
};
message_tx.send(Message::Fetched {
host: host.to_string(),
new_post_ratio,
mean_interval,
}).unwrap();
}
Err(e) => {
log::error!("Failed fetching {}: {}", host, e);
message_tx.send(Message::Fetched {
host: host.to_string(),
new_post_ratio: None,
mean_interval: None,
}).unwrap();
}
}
}
let hosts = hosts.into_iter()
.map(|host| host.to_owned())
.collect();
let _ = message_tx.send(Message::IntroduceHosts { hosts });
async fn process_posts(
store: &mut Store,
trend_setter_tx: &crate::trend_setter::Tx,
host: &str,
posts: impl Iterator<Item = Post>,
) -> (Option<f64>, HashSet<String>) {
// introduce new hosts, validate posts
let mut introduce_hosts = HashSet::new();
let mut new_posts = 0;
let mut posts_len = 0;
for post in posts {
posts_len += 1;
// successfully fetched, save for future run
store.save_host(&host).await.unwrap();
message_tx.send(Message::Fetched {
host: host.clone(),
new_post_ratio,
mean_interval,
}).unwrap();
}
Err(e) => {
log::error!("Failed fetching {}: {}", host, e);
message_tx.send(Message::Fetched {
host,
new_post_ratio: None,
mean_interval: None,
}).unwrap();
// introduce instances from mentions
for mention in &post.mentions {
if let Some(user_host) = mention.user_host() {
introduce_hosts.insert(user_host);
}
}
});
// check if it's an actual post, not a repost
if let Some(author_host) = post.account.host() {
// send away to redis
let update_set = UpdateSet::from(&post);
if store.save_post(post).await == Ok(true) {
new_posts += 1;
if ! update_set.is_empty() {
trend_setter_tx.send(update_set).await.unwrap();
}
}
// introduce instances from authors
introduce_hosts.insert(author_host);
} else {
log::warn!("drop repost ({:?} on {})", post.account.host(), host);
}
}
log::info!("{}: {}/{} new posts", host, new_posts, posts_len);
let new_post_ratio = if posts_len > 0 {
Some((new_posts as f64) / (posts_len as f64))
} else {
None
};
let introduce_hosts = introduce_hosts.into_iter()
.map(|host| host.to_owned())
.collect();
(new_post_ratio, introduce_hosts)
}