hunter/worker: refactor, prepare stream, add robots.txt support

This commit is contained in:
Astro 2022-11-11 19:00:37 +01:00
parent e620c69a32
commit 53ae6e6ba1
5 changed files with 143 additions and 14 deletions

60
Cargo.lock generated
View File

@ -26,6 +26,12 @@ dependencies = [
"libc",
]
[[package]]
name = "anyhow"
version = "1.0.66"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "216261ddc8289130e551ddcd5ce8a064710c0d064a4d2895c67151c92b5443f6"
[[package]]
name = "arc-swap"
version = "1.5.1"
@ -224,6 +230,17 @@ version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "bstr"
version = "0.2.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba3569f383e8f1598449f1a423e72e99569137b47740b1da11ef19af3d5c3223"
dependencies = [
"lazy_static",
"memchr",
"regex-automata",
]
[[package]]
name = "build-env"
version = "0.3.1"
@ -288,6 +305,7 @@ dependencies = [
"rand",
"reqwest",
"serde",
"texting_robots",
"tokio",
]
@ -1210,6 +1228,12 @@ dependencies = [
"regex-syntax",
]
[[package]]
name = "regex-automata"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
[[package]]
name = "regex-syntax"
version = "0.6.28"
@ -1466,6 +1490,42 @@ dependencies = [
"winapi-util",
]
[[package]]
name = "texting_robots"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "828e9d94f6124477a527edec483720cb9ed71ae5dcbf9c1c46c27c76c2081c7c"
dependencies = [
"anyhow",
"bstr",
"lazy_static",
"nom",
"percent-encoding",
"regex",
"thiserror",
"url",
]
[[package]]
name = "thiserror"
version = "1.0.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10deb33631e3c9018b9baf9dcbbc4f737320d2b576bac10f6aefa048fa407e3e"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "982d17546b47146b28f7c22e3d08465f6b8903d0ea13c1660d9d84a6e7adcdbb"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "time"
version = "0.1.44"

View File

@ -5,6 +5,7 @@ version = "0.0.0"
edition = "2021"
[dependencies]
futures = "0.3"
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
serde_yaml = "0.9"
@ -14,4 +15,5 @@ redis = { version = "0.22", features = ["tokio-comp", "connection-manager"] }
log = "0.4"
env_logger = "0.9"
systemd = "0.10"
reqwest = { version = "0.11", features = ["json", "deflate", "gzip"] }
reqwest = { version = "0.11", features = ["json", "deflate", "gzip", "stream"] }
eventsource-stream = "0.2"

View File

@ -1,5 +1,7 @@
use std::collections::{HashMap, HashSet};
use std::{collections::{HashMap, HashSet}, time::Duration};
use chrono::{DateTime, FixedOffset};
use futures::Stream;
use eventsource_stream::Eventsource;
#[derive(Debug, serde::Deserialize)]
pub struct Account {
@ -121,6 +123,24 @@ pub struct Feed {
}
impl Feed {
/// Analyze time intervals between posts to estimate when to fetch
/// next
pub fn mean_post_interval(&self) -> Option<Duration> {
let mut timestamps = self.posts.iter()
.filter_map(|post| post.timestamp())
.collect::<Vec<_>>();
timestamps.sort();
if timestamps.len() > 2 {
Some(
((*timestamps.last().unwrap() - timestamps[0]) / (timestamps.len() as i32 - 1)
).to_std().unwrap()
)
} else {
None
}
}
pub async fn fetch(client: &reqwest::Client, url: &str) -> Result<Self, reqwest::Error> {
let posts: Vec<Post> = client.get(url)
.send()
@ -130,4 +150,12 @@ impl Feed {
log::trace!("{} {} posts", url, posts.len());
Ok(Feed { posts })
}
pub async fn stream(client: &reqwest::Client, url: &str) -> Result<impl Stream, reqwest::Error> {
let res = client.get(url)
.send()
.await?;
let src = res.bytes_stream().eventsource();
Ok(src)
}
}

View File

@ -12,3 +12,4 @@ chrono = "0.4"
log = "0.4"
cave = { path = "../cave" }
rand = "0.8"
texting_robots = "0.2"

View File

@ -6,6 +6,38 @@ use cave::{
};
use crate::trend_setter::UpdateSet;
pub struct RobotsTxt {
pub robot: Option<texting_robots::Robot>,
}
impl RobotsTxt {
pub async fn fetch(
client: &reqwest::Client,
host: &str,
) -> Self {
let url = format!("https://{}/robots.txt", host);
let robot = async {
let body = client.get(url)
.send().await
.ok()?
.text().await
.ok()?;
texting_robots::Robot::new(
env!("CARGO_PKG_NAME"),
body.as_bytes(),
).ok()
}.await;
RobotsTxt { robot }
}
pub fn allowed(&self, url: &str) -> bool {
if let Some(robot) = &self.robot {
robot.allowed(url)
} else {
true
}
}
}
#[derive(Debug)]
pub enum Message {
@ -24,14 +56,23 @@ pub async fn run(
client: reqwest::Client,
host: String,
) {
let robots_txt = RobotsTxt::fetch(&client, &host).await;
// TODO: eval robot.delay
let (new_post_ratio, mean_interval) =
// let (_, stream) = future::join(
fetch_timeline(
message_tx.clone(), store.clone(),
trend_setter_tx.clone(),
client.clone(), &host
client.clone(), &robots_txt, &host
// ),
// open_stream(&host),
).await;
message_tx.send(Message::Fetched {
host: host.to_string(),
new_post_ratio,
mean_interval,
}).unwrap();
}
async fn fetch_timeline(
@ -39,9 +80,14 @@ async fn fetch_timeline(
mut store: Store,
trend_setter_tx: crate::trend_setter::Tx,
client: reqwest::Client,
robots_txt: &RobotsTxt,
host: &str,
) {
) -> (Option<f64>, Option<Duration>) {
let url = format!("https://{}/api/v1/timelines/public?limit=40", host);
if ! robots_txt.allowed(&url) {
log::warn!("Timeline of {} forbidden by robots.txt", host);
return (None, None);
}
match Feed::fetch(&client, &url).await {
Ok(feed) => {
@ -55,19 +101,11 @@ async fn fetch_timeline(
// successfully fetched, save for future run
store.save_host(&host).await.unwrap();
message_tx.send(Message::Fetched {
host: host.to_string(),
new_post_ratio,
mean_interval,
}).unwrap();
(new_post_ratio, mean_interval)
}
Err(e) => {
log::error!("Failed fetching {}: {}", host, e);
message_tx.send(Message::Fetched {
host: host.to_string(),
new_post_ratio: None,
mean_interval: None,
}).unwrap();
(None, None)
}
}
}