2022-11-02 21:12:16 +01:00
|
|
|
use std::collections::HashSet;
|
2022-11-11 21:52:52 +01:00
|
|
|
use std::future::Future;
|
2022-12-02 23:05:35 +01:00
|
|
|
use std::ops::Deref;
|
2022-11-18 20:13:34 +01:00
|
|
|
use std::sync::{Arc, RwLock};
|
2022-12-26 03:44:42 +01:00
|
|
|
use std::time::{Duration, Instant};
|
2022-11-08 00:43:46 +01:00
|
|
|
use cave::{
|
2022-12-02 22:02:37 +01:00
|
|
|
feed::{Feed, EncodablePost},
|
2022-11-08 00:43:46 +01:00
|
|
|
store::Store,
|
|
|
|
};
|
2022-11-11 21:52:52 +01:00
|
|
|
use futures::{StreamExt, future};
|
2022-11-23 23:59:35 +01:00
|
|
|
use crate::posts_cache::PostsCache;
|
2022-12-02 00:50:01 +01:00
|
|
|
use crate::scheduler::Host;
|
2022-11-08 00:43:46 +01:00
|
|
|
use crate::trend_setter::UpdateSet;
|
2022-11-02 21:12:16 +01:00
|
|
|
|
2022-11-18 20:13:34 +01:00
|
|
|
#[derive(Clone)]
|
2022-11-11 19:00:37 +01:00
|
|
|
pub struct RobotsTxt {
|
2022-11-18 20:13:34 +01:00
|
|
|
robot: Arc<RwLock<Option<texting_robots::Robot>>>,
|
2022-11-11 19:00:37 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
impl RobotsTxt {
|
|
|
|
pub async fn fetch(
|
|
|
|
client: &reqwest::Client,
|
2022-12-02 00:50:01 +01:00
|
|
|
host: &Host,
|
2022-11-11 19:00:37 +01:00
|
|
|
) -> Self {
|
|
|
|
let url = format!("https://{}/robots.txt", host);
|
2022-12-26 04:14:10 +01:00
|
|
|
metrics::increment_gauge!("hunter_requests", 1.0, "type" => "robotstxt");
|
2022-11-11 19:00:37 +01:00
|
|
|
let robot = async {
|
|
|
|
let body = client.get(url)
|
|
|
|
.send().await
|
|
|
|
.ok()?
|
|
|
|
.text().await
|
|
|
|
.ok()?;
|
|
|
|
texting_robots::Robot::new(
|
|
|
|
env!("CARGO_PKG_NAME"),
|
|
|
|
body.as_bytes(),
|
|
|
|
).ok()
|
|
|
|
}.await;
|
2022-12-26 04:14:10 +01:00
|
|
|
metrics::decrement_gauge!("hunter_requests", 1.0, "type" => "robotstxt");
|
2022-11-18 20:13:34 +01:00
|
|
|
RobotsTxt {
|
|
|
|
robot: Arc::new(RwLock::new(robot)),
|
|
|
|
}
|
2022-11-11 19:00:37 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
pub fn allowed(&self, url: &str) -> bool {
|
2022-11-18 20:13:34 +01:00
|
|
|
if let Some(robot) = self.robot.read().unwrap().as_ref() {
|
2022-11-11 19:00:37 +01:00
|
|
|
robot.allowed(url)
|
|
|
|
} else {
|
|
|
|
true
|
|
|
|
}
|
|
|
|
}
|
2022-11-11 19:21:44 +01:00
|
|
|
|
|
|
|
pub fn delay(&self) -> Option<Duration> {
|
2022-11-18 20:13:34 +01:00
|
|
|
self.robot.read().unwrap().as_ref()
|
2022-11-11 19:21:44 +01:00
|
|
|
.and_then(|robot| robot.delay)
|
|
|
|
.map(|delay| Duration::from_secs(delay as u64))
|
|
|
|
}
|
2022-11-11 19:00:37 +01:00
|
|
|
}
|
2022-11-02 22:42:43 +01:00
|
|
|
|
2022-11-02 21:12:16 +01:00
|
|
|
#[derive(Debug)]
|
|
|
|
pub enum Message {
|
2022-11-11 21:52:52 +01:00
|
|
|
WorkerDone,
|
2022-11-02 23:10:59 +01:00
|
|
|
Fetched {
|
2022-12-02 00:50:01 +01:00
|
|
|
host: Host,
|
2022-11-07 00:58:28 +01:00
|
|
|
new_post_ratio: Option<f64>,
|
|
|
|
mean_interval: Option<Duration>,
|
2022-11-02 23:10:59 +01:00
|
|
|
},
|
2022-11-02 21:12:16 +01:00
|
|
|
IntroduceHosts { hosts: Vec<String> },
|
|
|
|
}
|
|
|
|
|
2022-11-11 18:07:59 +01:00
|
|
|
pub async fn run(
|
2022-11-03 01:21:53 +01:00
|
|
|
message_tx: tokio::sync::mpsc::UnboundedSender<Message>,
|
2022-11-11 18:07:59 +01:00
|
|
|
store: Store,
|
2022-11-08 00:43:46 +01:00
|
|
|
trend_setter_tx: crate::trend_setter::Tx,
|
2022-11-23 23:59:35 +01:00
|
|
|
posts_cache: PostsCache,
|
2022-11-02 21:12:16 +01:00
|
|
|
client: reqwest::Client,
|
2022-12-02 00:50:01 +01:00
|
|
|
host: Host,
|
2022-11-02 21:12:16 +01:00
|
|
|
) {
|
2022-11-11 21:52:52 +01:00
|
|
|
// Fetch /robots.txt
|
2022-11-11 19:00:37 +01:00
|
|
|
let robots_txt = RobotsTxt::fetch(&client, &host).await;
|
2022-11-18 20:13:34 +01:00
|
|
|
let robots_delay = robots_txt.delay();
|
2022-11-11 21:52:52 +01:00
|
|
|
// Fetch posts and open stream
|
|
|
|
let ((new_post_ratio, mut mean_interval), stream_result) = future::join(
|
2022-11-11 18:07:59 +01:00
|
|
|
fetch_timeline(
|
|
|
|
message_tx.clone(), store.clone(),
|
2022-11-23 23:59:35 +01:00
|
|
|
trend_setter_tx.clone(), &posts_cache,
|
2022-11-18 20:13:34 +01:00
|
|
|
&client, robots_txt.clone(), &host
|
2022-11-11 21:52:52 +01:00
|
|
|
),
|
|
|
|
open_stream(
|
|
|
|
message_tx.clone(), store.clone(),
|
2022-11-23 23:59:35 +01:00
|
|
|
trend_setter_tx.clone(), &posts_cache,
|
2022-11-18 20:13:34 +01:00
|
|
|
&client, robots_txt, host.clone()
|
2022-11-11 21:52:52 +01:00
|
|
|
),
|
|
|
|
).await;
|
|
|
|
|
|
|
|
// Next worker can start
|
|
|
|
message_tx.send(Message::WorkerDone).unwrap();
|
|
|
|
|
|
|
|
// Process stream
|
|
|
|
if let Ok(stream) = stream_result {
|
2022-12-01 01:39:38 +01:00
|
|
|
tracing::info!("Processing stream for {}", host);
|
2022-12-26 03:44:42 +01:00
|
|
|
metrics::increment_gauge!("hunter_requests", 1.0, "type" => "stream");
|
2022-11-11 21:52:52 +01:00
|
|
|
stream.await;
|
2022-12-26 03:44:42 +01:00
|
|
|
metrics::decrement_gauge!("hunter_requests", 1.0, "type" => "stream");
|
2022-12-01 01:39:38 +01:00
|
|
|
tracing::warn!("Ended stream for {}", host);
|
2022-11-11 21:52:52 +01:00
|
|
|
}
|
2022-11-11 19:21:44 +01:00
|
|
|
|
2022-11-11 21:52:52 +01:00
|
|
|
// Ready for reenqueue
|
2022-11-11 19:21:44 +01:00
|
|
|
if let Some(mean_interval) = &mut mean_interval {
|
2022-11-18 20:13:34 +01:00
|
|
|
if let Some(robots_delay) = robots_delay {
|
|
|
|
*mean_interval = (*mean_interval).max(robots_delay);
|
2022-11-11 19:21:44 +01:00
|
|
|
}
|
|
|
|
}
|
2022-11-11 19:00:37 +01:00
|
|
|
message_tx.send(Message::Fetched {
|
2022-12-02 00:50:01 +01:00
|
|
|
host: host,
|
2022-11-11 19:00:37 +01:00
|
|
|
new_post_ratio,
|
|
|
|
mean_interval,
|
|
|
|
}).unwrap();
|
2022-11-11 18:07:59 +01:00
|
|
|
}
|
2022-11-02 22:42:43 +01:00
|
|
|
|
2022-11-11 18:07:59 +01:00
|
|
|
async fn fetch_timeline(
|
|
|
|
message_tx: tokio::sync::mpsc::UnboundedSender<Message>,
|
|
|
|
mut store: Store,
|
|
|
|
trend_setter_tx: crate::trend_setter::Tx,
|
2022-11-23 23:59:35 +01:00
|
|
|
posts_cache: &PostsCache,
|
2022-11-11 21:52:52 +01:00
|
|
|
client: &reqwest::Client,
|
2022-11-18 20:13:34 +01:00
|
|
|
robots_txt: RobotsTxt,
|
2022-12-02 00:50:01 +01:00
|
|
|
host: &Host,
|
2022-11-11 19:00:37 +01:00
|
|
|
) -> (Option<f64>, Option<Duration>) {
|
2022-11-11 18:07:59 +01:00
|
|
|
let url = format!("https://{}/api/v1/timelines/public?limit=40", host);
|
2022-11-11 19:00:37 +01:00
|
|
|
if ! robots_txt.allowed(&url) {
|
2022-12-01 01:39:38 +01:00
|
|
|
tracing::warn!("Timeline of {} forbidden by robots.txt", host);
|
2022-11-11 19:00:37 +01:00
|
|
|
return (None, None);
|
|
|
|
}
|
2022-11-18 20:13:34 +01:00
|
|
|
// free as early as possible
|
|
|
|
drop(robots_txt);
|
2022-11-03 03:42:13 +01:00
|
|
|
|
2022-12-26 03:44:42 +01:00
|
|
|
metrics::increment_gauge!("hunter_requests", 1.0, "type" => "timeline");
|
|
|
|
let t1 = Instant::now();
|
|
|
|
let result = Feed::fetch(&client, &url).await;
|
|
|
|
let t2 = Instant::now();
|
|
|
|
metrics::decrement_gauge!("hunter_requests", 1.0, "type" => "timeline");
|
|
|
|
metrics::histogram!("hunter_fetch_seconds", t2 - t1, "result" => if result.is_ok() { "ok" } else { "error" });
|
|
|
|
|
|
|
|
match result {
|
2022-11-11 18:07:59 +01:00
|
|
|
Ok(feed) => {
|
|
|
|
let mean_interval = feed.mean_post_interval();
|
2022-11-08 00:43:46 +01:00
|
|
|
|
2022-11-23 23:59:35 +01:00
|
|
|
let (new_post_ratio, introduce_hosts) = process_posts(&mut store, &trend_setter_tx, posts_cache, &host, feed.posts.into_iter()).await;
|
2022-11-11 18:07:59 +01:00
|
|
|
let _ = message_tx.send(Message::IntroduceHosts {
|
|
|
|
hosts: introduce_hosts.into_iter().collect(),
|
|
|
|
});
|
2022-11-03 03:42:13 +01:00
|
|
|
|
2022-11-11 18:07:59 +01:00
|
|
|
// successfully fetched, save for future run
|
|
|
|
store.save_host(&host).await.unwrap();
|
2022-11-07 00:58:28 +01:00
|
|
|
|
2022-11-11 19:00:37 +01:00
|
|
|
(new_post_ratio, mean_interval)
|
2022-11-11 18:07:59 +01:00
|
|
|
}
|
|
|
|
Err(e) => {
|
2022-12-01 01:39:38 +01:00
|
|
|
tracing::error!("Failed fetching {}: {}", host, e);
|
2022-11-11 19:00:37 +01:00
|
|
|
(None, None)
|
2022-11-11 18:07:59 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2022-11-02 22:06:43 +01:00
|
|
|
|
2022-11-11 18:07:59 +01:00
|
|
|
async fn process_posts(
|
|
|
|
store: &mut Store,
|
|
|
|
trend_setter_tx: &crate::trend_setter::Tx,
|
2022-11-23 23:59:35 +01:00
|
|
|
posts_cache: &PostsCache,
|
2022-12-02 00:50:01 +01:00
|
|
|
host: &Host,
|
2022-12-02 22:02:37 +01:00
|
|
|
posts: impl Iterator<Item = EncodablePost>,
|
2022-11-11 18:07:59 +01:00
|
|
|
) -> (Option<f64>, HashSet<String>) {
|
|
|
|
// introduce new hosts, validate posts
|
|
|
|
let mut introduce_hosts = HashSet::new();
|
|
|
|
let mut new_posts = 0;
|
|
|
|
let mut posts_len = 0;
|
|
|
|
for post in posts {
|
|
|
|
posts_len += 1;
|
2022-11-03 21:17:21 +01:00
|
|
|
|
2022-12-02 23:05:35 +01:00
|
|
|
// potentially save a round-trip to redis with an in-process cache
|
|
|
|
if ! posts_cache.insert(post.uri.clone()) {
|
2022-12-26 03:44:42 +01:00
|
|
|
let t1 = Instant::now();
|
|
|
|
|
2022-12-02 23:05:35 +01:00
|
|
|
// introduce instances from mentions
|
|
|
|
for mention in &post.mentions {
|
|
|
|
if let Some(user_host) = mention.user_host() {
|
|
|
|
introduce_hosts.insert(user_host);
|
|
|
|
}
|
2022-11-02 21:12:16 +01:00
|
|
|
}
|
2022-11-11 18:07:59 +01:00
|
|
|
|
2022-12-02 23:05:35 +01:00
|
|
|
// check if it's an actual post, not a repost
|
|
|
|
if let Some(author_host) = post.account.host() {
|
|
|
|
let update_set = UpdateSet::from(post.deref());
|
2022-11-23 23:59:35 +01:00
|
|
|
// send away to redis
|
|
|
|
if store.save_post(post).await == Ok(true) {
|
|
|
|
new_posts += 1;
|
|
|
|
|
|
|
|
if ! update_set.is_empty() {
|
|
|
|
trend_setter_tx.send(update_set).await.unwrap();
|
|
|
|
}
|
2022-11-11 18:07:59 +01:00
|
|
|
}
|
|
|
|
|
2022-12-02 23:05:35 +01:00
|
|
|
// introduce instances from authors
|
|
|
|
introduce_hosts.insert(author_host);
|
|
|
|
} else {
|
2022-12-01 01:39:38 +01:00
|
|
|
tracing::warn!("drop repost ({:?} on {})", post.account.host(), host);
|
2022-12-02 23:05:35 +01:00
|
|
|
}
|
2022-12-26 03:44:42 +01:00
|
|
|
|
|
|
|
let t2 = Instant::now();
|
|
|
|
metrics::histogram!("hunter_post_process_seconds", t2 - t1)
|
2022-11-02 21:12:16 +01:00
|
|
|
}
|
2022-11-11 18:07:59 +01:00
|
|
|
}
|
2022-12-01 01:39:38 +01:00
|
|
|
tracing::info!("{}: {}/{} new posts", host, new_posts, posts_len);
|
2022-12-27 19:03:09 +01:00
|
|
|
metrics::counter!("hunter_posts", new_posts, "type" => "new");
|
|
|
|
metrics::counter!("hunter_posts", posts_len, "type" => "total");
|
2022-11-11 18:07:59 +01:00
|
|
|
let new_post_ratio = if posts_len > 0 {
|
2022-12-27 19:03:09 +01:00
|
|
|
let ratio = (new_posts as f64) / (posts_len as f64);
|
|
|
|
metrics::histogram!("hunter_new_post_ratio", ratio);
|
|
|
|
Some(ratio)
|
2022-11-11 18:07:59 +01:00
|
|
|
} else {
|
|
|
|
None
|
|
|
|
};
|
|
|
|
|
|
|
|
let introduce_hosts = introduce_hosts.into_iter()
|
|
|
|
.map(|host| host.to_owned())
|
|
|
|
.collect();
|
|
|
|
|
|
|
|
(new_post_ratio, introduce_hosts)
|
2022-11-02 21:12:16 +01:00
|
|
|
}
|
2022-11-11 21:52:52 +01:00
|
|
|
|
|
|
|
async fn open_stream(
|
|
|
|
message_tx: tokio::sync::mpsc::UnboundedSender<Message>,
|
|
|
|
store: Store,
|
|
|
|
trend_setter_tx: crate::trend_setter::Tx,
|
2022-11-23 23:59:35 +01:00
|
|
|
posts_cache: &PostsCache,
|
2022-11-11 21:52:52 +01:00
|
|
|
client: &reqwest::Client,
|
2022-11-18 20:13:34 +01:00
|
|
|
robots_txt: RobotsTxt,
|
2022-12-02 00:50:01 +01:00
|
|
|
host: Host,
|
2022-11-11 21:52:52 +01:00
|
|
|
) -> Result<impl Future<Output = ()>, ()> {
|
|
|
|
let url = format!("https://{}/api/v1/streaming/public", host);
|
|
|
|
if ! robots_txt.allowed(&url) {
|
2022-12-01 01:39:38 +01:00
|
|
|
tracing::warn!("Streaming of {} forbidden by robots.txt", host);
|
2022-11-11 21:52:52 +01:00
|
|
|
return Err(());
|
|
|
|
}
|
2022-11-18 20:13:34 +01:00
|
|
|
// free as early as possible
|
|
|
|
drop(robots_txt);
|
2022-11-23 23:59:35 +01:00
|
|
|
let posts_cache = posts_cache.clone();
|
2022-11-11 21:52:52 +01:00
|
|
|
|
2022-12-26 03:44:42 +01:00
|
|
|
metrics::increment_gauge!("hunter_requests", 1.0, "type" => "stream_open");
|
2022-12-26 04:14:10 +01:00
|
|
|
let stream = Feed::stream(client, &url).await;
|
2022-12-26 03:44:42 +01:00
|
|
|
metrics::decrement_gauge!("hunter_requests", 1.0, "type" => "stream_open");
|
2022-12-26 04:14:10 +01:00
|
|
|
let stream = stream.map_err(|e| {
|
|
|
|
tracing::error!("Stream error for {}: {}", host, e);
|
|
|
|
})?;
|
2022-11-11 21:52:52 +01:00
|
|
|
|
|
|
|
Ok(stream.for_each(move |post| {
|
|
|
|
let message_tx = message_tx.clone();
|
|
|
|
let mut store = store.clone();
|
|
|
|
let trend_setter_tx = trend_setter_tx.clone();
|
2022-11-23 23:59:35 +01:00
|
|
|
let posts_cache = posts_cache.clone();
|
2022-11-11 21:52:52 +01:00
|
|
|
let host = host.clone();
|
|
|
|
async move {
|
|
|
|
let (_, introduce_hosts) =
|
2022-11-23 23:59:35 +01:00
|
|
|
process_posts(&mut store, &trend_setter_tx, &posts_cache, &host, [post].into_iter()).await;
|
2022-11-11 21:52:52 +01:00
|
|
|
let _ = message_tx.send(Message::IntroduceHosts {
|
|
|
|
hosts: introduce_hosts.into_iter().collect(),
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}))
|
|
|
|
}
|