caveman/hunter/src/worker.rs

237 lines
6.9 KiB
Rust
Raw Normal View History

2022-11-02 21:12:16 +01:00
use std::collections::HashSet;
2022-11-11 21:52:52 +01:00
use std::future::Future;
2022-11-18 20:13:34 +01:00
use std::sync::{Arc, RwLock};
2022-11-02 22:06:43 +01:00
use std::time::Duration;
2022-11-08 00:43:46 +01:00
use cave::{
feed::{Feed, Post},
2022-11-08 00:43:46 +01:00
store::Store,
};
2022-11-11 21:52:52 +01:00
use futures::{StreamExt, future};
2022-11-08 00:43:46 +01:00
use crate::trend_setter::UpdateSet;
2022-11-02 21:12:16 +01:00
2022-11-18 20:13:34 +01:00
#[derive(Clone)]
pub struct RobotsTxt {
2022-11-18 20:13:34 +01:00
robot: Arc<RwLock<Option<texting_robots::Robot>>>,
}
impl RobotsTxt {
pub async fn fetch(
client: &reqwest::Client,
host: &str,
) -> Self {
let url = format!("https://{}/robots.txt", host);
let robot = async {
let body = client.get(url)
.send().await
.ok()?
.text().await
.ok()?;
texting_robots::Robot::new(
env!("CARGO_PKG_NAME"),
body.as_bytes(),
).ok()
}.await;
2022-11-18 20:13:34 +01:00
RobotsTxt {
robot: Arc::new(RwLock::new(robot)),
}
}
pub fn allowed(&self, url: &str) -> bool {
2022-11-18 20:13:34 +01:00
if let Some(robot) = self.robot.read().unwrap().as_ref() {
robot.allowed(url)
} else {
true
}
}
2022-11-11 19:21:44 +01:00
pub fn delay(&self) -> Option<Duration> {
2022-11-18 20:13:34 +01:00
self.robot.read().unwrap().as_ref()
2022-11-11 19:21:44 +01:00
.and_then(|robot| robot.delay)
.map(|delay| Duration::from_secs(delay as u64))
}
}
2022-11-02 22:42:43 +01:00
2022-11-02 21:12:16 +01:00
#[derive(Debug)]
pub enum Message {
2022-11-11 21:52:52 +01:00
WorkerDone,
2022-11-02 23:10:59 +01:00
Fetched {
host: String,
2022-11-07 00:58:28 +01:00
new_post_ratio: Option<f64>,
mean_interval: Option<Duration>,
2022-11-02 23:10:59 +01:00
},
2022-11-02 21:12:16 +01:00
IntroduceHosts { hosts: Vec<String> },
}
pub async fn run(
2022-11-03 01:21:53 +01:00
message_tx: tokio::sync::mpsc::UnboundedSender<Message>,
store: Store,
2022-11-08 00:43:46 +01:00
trend_setter_tx: crate::trend_setter::Tx,
2022-11-02 21:12:16 +01:00
client: reqwest::Client,
host: String,
) {
2022-11-11 21:52:52 +01:00
// Fetch /robots.txt
let robots_txt = RobotsTxt::fetch(&client, &host).await;
2022-11-18 20:13:34 +01:00
let robots_delay = robots_txt.delay();
2022-11-11 21:52:52 +01:00
// Fetch posts and open stream
let ((new_post_ratio, mut mean_interval), stream_result) = future::join(
fetch_timeline(
message_tx.clone(), store.clone(),
trend_setter_tx.clone(),
2022-11-18 20:13:34 +01:00
&client, robots_txt.clone(), &host
2022-11-11 21:52:52 +01:00
),
open_stream(
message_tx.clone(), store.clone(),
trend_setter_tx.clone(),
2022-11-18 20:13:34 +01:00
&client, robots_txt, host.clone()
2022-11-11 21:52:52 +01:00
),
).await;
// Next worker can start
message_tx.send(Message::WorkerDone).unwrap();
// Process stream
if let Ok(stream) = stream_result {
log::info!("Processing stream for {}", host);
stream.await;
log::warn!("Ended stream for {}", host);
}
2022-11-11 19:21:44 +01:00
2022-11-11 21:52:52 +01:00
// Ready for reenqueue
2022-11-11 19:21:44 +01:00
if let Some(mean_interval) = &mut mean_interval {
2022-11-18 20:13:34 +01:00
if let Some(robots_delay) = robots_delay {
*mean_interval = (*mean_interval).max(robots_delay);
2022-11-11 19:21:44 +01:00
}
}
message_tx.send(Message::Fetched {
host: host.to_string(),
new_post_ratio,
mean_interval,
}).unwrap();
}
2022-11-02 22:42:43 +01:00
async fn fetch_timeline(
message_tx: tokio::sync::mpsc::UnboundedSender<Message>,
mut store: Store,
trend_setter_tx: crate::trend_setter::Tx,
2022-11-11 21:52:52 +01:00
client: &reqwest::Client,
2022-11-18 20:13:34 +01:00
robots_txt: RobotsTxt,
host: &str,
) -> (Option<f64>, Option<Duration>) {
let url = format!("https://{}/api/v1/timelines/public?limit=40", host);
if ! robots_txt.allowed(&url) {
log::warn!("Timeline of {} forbidden by robots.txt", host);
return (None, None);
}
2022-11-18 20:13:34 +01:00
// free as early as possible
drop(robots_txt);
2022-11-03 03:42:13 +01:00
match Feed::fetch(&client, &url).await {
Ok(feed) => {
let mean_interval = feed.mean_post_interval();
2022-11-08 00:43:46 +01:00
let (new_post_ratio, introduce_hosts) = process_posts(&mut store, &trend_setter_tx, &host, feed.posts.into_iter()).await;
let _ = message_tx.send(Message::IntroduceHosts {
hosts: introduce_hosts.into_iter().collect(),
});
2022-11-03 03:42:13 +01:00
// successfully fetched, save for future run
store.save_host(&host).await.unwrap();
2022-11-07 00:58:28 +01:00
(new_post_ratio, mean_interval)
}
Err(e) => {
log::error!("Failed fetching {}: {}", host, e);
(None, None)
}
}
}
2022-11-02 22:06:43 +01:00
async fn process_posts(
store: &mut Store,
trend_setter_tx: &crate::trend_setter::Tx,
host: &str,
posts: impl Iterator<Item = Post>,
) -> (Option<f64>, HashSet<String>) {
// introduce new hosts, validate posts
let mut introduce_hosts = HashSet::new();
let mut new_posts = 0;
let mut posts_len = 0;
for post in posts {
posts_len += 1;
// introduce instances from mentions
for mention in &post.mentions {
if let Some(user_host) = mention.user_host() {
introduce_hosts.insert(user_host);
2022-11-02 21:12:16 +01:00
}
}
// check if it's an actual post, not a repost
if let Some(author_host) = post.account.host() {
// send away to redis
let update_set = UpdateSet::from(&post);
if store.save_post(post).await == Ok(true) {
new_posts += 1;
if ! update_set.is_empty() {
trend_setter_tx.send(update_set).await.unwrap();
}
2022-11-02 21:12:16 +01:00
}
// introduce instances from authors
introduce_hosts.insert(author_host);
} else {
log::warn!("drop repost ({:?} on {})", post.account.host(), host);
2022-11-02 21:12:16 +01:00
}
}
log::info!("{}: {}/{} new posts", host, new_posts, posts_len);
let new_post_ratio = if posts_len > 0 {
Some((new_posts as f64) / (posts_len as f64))
} else {
None
};
let introduce_hosts = introduce_hosts.into_iter()
.map(|host| host.to_owned())
.collect();
(new_post_ratio, introduce_hosts)
2022-11-02 21:12:16 +01:00
}
2022-11-11 21:52:52 +01:00
async fn open_stream(
message_tx: tokio::sync::mpsc::UnboundedSender<Message>,
store: Store,
trend_setter_tx: crate::trend_setter::Tx,
client: &reqwest::Client,
2022-11-18 20:13:34 +01:00
robots_txt: RobotsTxt,
2022-11-11 21:52:52 +01:00
host: String,
) -> Result<impl Future<Output = ()>, ()> {
let url = format!("https://{}/api/v1/streaming/public", host);
if ! robots_txt.allowed(&url) {
log::warn!("Streaming of {} forbidden by robots.txt", host);
return Err(());
}
2022-11-18 20:13:34 +01:00
// free as early as possible
drop(robots_txt);
2022-11-11 21:52:52 +01:00
let stream = Feed::stream(client, &url).await
.map_err(|e| {
log::error!("Stream error for {}: {}", host, e);
})?;
Ok(stream.for_each(move |post| {
let message_tx = message_tx.clone();
let mut store = store.clone();
let trend_setter_tx = trend_setter_tx.clone();
let host = host.clone();
async move {
log::info!("Stream {} new post: {}", host, &post.uri);
let (_, introduce_hosts) =
process_posts(&mut store, &trend_setter_tx, &host, [post].into_iter()).await;
let _ = message_tx.send(Message::IntroduceHosts {
hosts: introduce_hosts.into_iter().collect(),
});
}
}))
}