spider feeds

This commit is contained in:
Astro 2022-11-02 21:12:16 +01:00
commit 7ade8d1aae
6 changed files with 1373 additions and 0 deletions

1125
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

9
Cargo.toml Normal file
View File

@ -0,0 +1,9 @@
[package]
name = "caveman"
version = "0.0.0"
edition = "2021"
[dependencies]
tokio = { version = "1", features = ["full"] }
reqwest = { version = "0.11", features = ["json", "deflate", "gzip"] }
serde = { version = "1", features = ["derive"] }

64
src/feed.rs Normal file
View File

@ -0,0 +1,64 @@
#[derive(Debug, serde::Deserialize)]
pub struct Account {
pub username: String,
pub display_name: String,
pub url: String,
pub bot: bool,
pub avatar: String,
pub header: String,
}
impl Account {
pub fn host(&self) -> Option<String> {
reqwest::Url::parse(&self.url)
.ok()
.and_then(|url| url.domain()
.map(|s| s.to_owned())
)
}
}
#[derive(Debug, serde::Deserialize)]
pub struct Tag {
pub name: String,
}
#[derive(Debug, serde::Deserialize)]
pub struct Application {
pub name: String,
pub website: String,
}
#[derive(Debug, serde::Deserialize)]
pub struct Post {
pub created_at: String,
pub url: String,
pub content: String,
pub account: Account,
pub tags: Vec<Tag>,
pub application: Option<Application>,
pub sensitive: Option<bool>,
}
impl Post {
// fn time
// fn text_content
}
#[derive(Debug)]
pub struct Feed {
pub posts: Vec<Post>,
}
impl Feed {
pub async fn fetch(client: &reqwest::Client, url: &str) -> Result<Self, reqwest::Error> {
println!("GET {}", url);
let posts: Vec<Post> = client.get(url)
.send()
.await?
.json()
.await?;
println!("{}: {} posts", url, posts.len());
Ok(Feed { posts })
}
}

57
src/main.rs Normal file
View File

@ -0,0 +1,57 @@
use std::time::Duration;
use tokio::time::{Instant, timeout};
mod world;
mod feed;
mod worker;
use worker::Message;
#[tokio::main]
async fn main() {
let mut world = world::World::new();
world.introduce("chaos.social".to_owned());
let client = reqwest::Client::builder()
.user_agent(concat!(
env!("CARGO_PKG_NAME"),
"/",
env!("CARGO_PKG_VERSION"),
))
.deflate(true)
.gzip(true)
.build()
.expect("reqwest::Client");
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
loop {
match world.dequeue() {
Err(time) => {
let duration = time - Instant::now();
timeout(duration, async {
if let message = rx.recv().await.unwrap() {
match message {
Message::Fetched { host } =>
world.set_fetched(host, Duration::from_secs(60)),
Message::Error { host } =>
world.set_error(host),
Message::IntroduceHosts { hosts } => {
for host in hosts.into_iter() {
world.introduce(host);
}
}
Message::Posts { posts } => {}
}
} else {
println!("no rx?");
}
}).await;
}
Ok(host) => {
println!("Fetch {}", host);
worker::fetch_and_process(tx.clone(), client.clone(), host);
}
}
}
}

44
src/worker.rs Normal file
View File

@ -0,0 +1,44 @@
use std::collections::HashSet;
use crate::feed;
#[derive(Debug)]
pub enum Message {
Fetched { host: String },
Error { host: String },
IntroduceHosts { hosts: Vec<String> },
Posts { posts: Vec<feed::Post> },
}
pub fn fetch_and_process(
tx: tokio::sync::mpsc::UnboundedSender<Message>,
client: reqwest::Client,
host: String,
) {
let url = format!("https://{}/api/v1/timelines/public", host);
tokio::spawn(async move {
match feed::Feed::fetch(&client, &url).await {
Ok(feed) => {
tx.send(Message::Fetched { host: host.clone() }).unwrap();
// Analyze
let mut posts = Vec::with_capacity(feed.posts.len());
let mut hosts = HashSet::new();
for post in feed.posts.into_iter() {
if let Some(author_host) = post.account.host() {
if author_host == host {
posts.push(post);
}
hosts.insert(author_host);
}
}
let hosts = hosts.into_iter().collect();
tx.send(Message::IntroduceHosts { hosts }).unwrap();
tx.send(Message::Posts { posts }).unwrap();
}
Err(e) => {
println!("Failed fetching {}: {}", host, e);
tx.send(Message::Error { host }).unwrap();
}
}
});
}

74
src/world.rs Normal file
View File

@ -0,0 +1,74 @@
use std::collections::{HashMap, BTreeMap};
use std::time::Duration;
use tokio::time::Instant;
const ERROR_INTERVAL: Duration = Duration::from_secs(86400u64);
pub struct Instance {
last_fetch: Option<Instant>,
error: bool,
}
pub struct World {
instances: HashMap<String, Instance>,
queue: BTreeMap<Instant, String>,
}
// TODO: self.queue.insert() with key collision avoidance
impl World {
pub fn new() -> Self {
World {
instances: HashMap::new(),
queue: BTreeMap::new(),
}
}
pub fn introduce(&mut self, host: String) {
let now = Instant::now();
if self.instances.get(&host).is_none() {
self.instances.insert(host.clone(), Instance {
last_fetch: None,
error: false,
});
self.queue.insert(now, host);
}
}
pub fn set_fetched(&mut self, host: String, next_interval: Duration) {
let now = Instant::now();
let instance = self.instances.get_mut(&host).unwrap();
instance.last_fetch = Some(now);
instance.error = false;
println!("enqueue at {:?}", now + next_interval);
self.queue.insert(now + next_interval, host);
}
pub fn set_error(&mut self, host: String) {
let now = Instant::now();
let instance = self.instances.get_mut(&host).unwrap();
instance.last_fetch = Some(now);
instance.error = true;
println!("enqueue at {:?}", now + ERROR_INTERVAL);
self.queue.insert(now + ERROR_INTERVAL, host);
}
pub fn dequeue(&mut self) -> Result<String, Instant> {
let now = Instant::now();
if let Some(time) = self.queue.keys().next().cloned() {
if time <= now {
self.queue.remove(&time)
.map(|host| Ok(host))
.unwrap_or(Err(now + Duration::from_secs(1)))
} else {
Err(time)
}
} else {
println!("empty queue");
Err(now + Duration::from_secs(60))
}
}
}