config, max_workers

This commit is contained in:
Astro 2022-11-02 22:06:43 +01:00
parent 9b4ae05fc5
commit 7b7c803ef5
7 changed files with 102 additions and 39 deletions

28
Cargo.lock generated
View File

@ -703,6 +703,19 @@ dependencies = [
"serde",
]
[[package]]
name = "serde_yaml"
version = "0.9.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d232d893b10de3eb7258ff01974d6ee20663d8e833263c99409d4b13a0209da"
dependencies = [
"indexmap",
"itoa",
"ryu",
"serde",
"unsafe-libyaml",
]
[[package]]
name = "signal-hook-registry"
version = "1.4.0"
@ -737,15 +750,6 @@ dependencies = [
"winapi",
]
[[package]]
name = "sshlogd"
version = "0.1.0"
dependencies = [
"reqwest",
"serde",
"tokio",
]
[[package]]
name = "syn"
version = "1.0.103"
@ -894,6 +898,12 @@ dependencies = [
"tinyvec",
]
[[package]]
name = "unsafe-libyaml"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1e5fa573d8ac5f1a856f8d7be41d390ee973daf97c806b2c1a465e4e1406e68"
[[package]]
name = "url"
version = "2.3.1"

View File

@ -7,3 +7,4 @@ edition = "2021"
tokio = { version = "1", features = ["full"] }
reqwest = { version = "0.11", features = ["json", "deflate", "gzip"] }
serde = { version = "1", features = ["derive"] }
serde_yaml = "0.9"

16
config.yaml Normal file
View File

@ -0,0 +1,16 @@
indexer: http://10.233.12.2:7700/
hosts:
- chaos.social
- dresden.network
- mastodon.social
- social.librem.one
- fosstodon.org
- nixnet.social
- mastodon.technology
- o3o.ca
- troet.cafe
- mastodon.top
interval_after_error: 7200
max_workers: 8

16
src/config.rs Normal file
View File

@ -0,0 +1,16 @@
#[derive(Debug, serde::Deserialize)]
pub struct Config {
pub indexer: String,
pub hosts: Vec<String>,
pub interval_after_error: u64,
pub max_workers: usize,
}
impl Config {
pub fn load_file(path: &str) -> Self {
let config_file = std::fs::read_to_string(path)
.expect(path);
serde_yaml::from_str(&config_file)
.expect("config")
}
}

View File

@ -1,6 +1,7 @@
use std::time::Duration;
use tokio::time::{Instant, timeout};
use tokio::time::timeout;
mod config;
mod world;
mod feed;
mod worker;
@ -9,9 +10,17 @@ use worker::Message;
#[tokio::main]
async fn main() {
let mut world = world::World::new();
world.introduce("chaos.social".to_owned());
let config = config::Config::load_file(
&std::env::args()
.skip(1)
.next()
.expect("Call with config.yaml")
);
let mut world = world::World::new();
for host in config.hosts.into_iter() {
world.introduce(host);
}
let client = reqwest::Client::builder()
.user_agent(concat!(
@ -24,18 +33,28 @@ async fn main() {
.build()
.expect("reqwest::Client");
let mut workers_active = 0usize;
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
loop {
match world.dequeue() {
Err(time) => {
let duration = time - Instant::now();
println!("{} workers active, queued {} of {}", workers_active, world.queue_len(), world.size());
let next_task = if workers_active < config.max_workers {
world.dequeue()
} else {
Err(Duration::from_secs(3600))
};
match next_task {
Err(duration) => {
let _ = timeout(duration, async {
let message = rx.recv().await.unwrap();
match message {
Message::Fetched { host } =>
world.set_fetched(host, Duration::from_secs(60)),
Message::Error { host } =>
world.set_error(host),
Message::Fetched { host, next_interval } => {
workers_active -= 1;
world.enqueue(host, next_interval);
}
Message::Error { host } => {
workers_active -= 1;
world.enqueue(host, Duration::from_secs(config.interval_after_error));
}
Message::IntroduceHosts { hosts } => {
for host in hosts.into_iter() {
world.introduce(host);
@ -47,6 +66,7 @@ async fn main() {
}
Ok(host) => {
println!("Fetch {}", host);
workers_active += 1;
worker::fetch_and_process(tx.clone(), client.clone(), host);
}
}

View File

@ -1,9 +1,10 @@
use std::collections::HashSet;
use std::time::Duration;
use crate::feed;
#[derive(Debug)]
pub enum Message {
Fetched { host: String },
Fetched { host: String, next_interval: Duration },
Error { host: String },
IntroduceHosts { hosts: Vec<String> },
Posts { posts: Vec<feed::Post> },
@ -18,8 +19,6 @@ pub fn fetch_and_process(
tokio::spawn(async move {
match feed::Feed::fetch(&client, &url).await {
Ok(feed) => {
tx.send(Message::Fetched { host: host.clone() }).unwrap();
// Analyze
let mut posts = Vec::with_capacity(feed.posts.len());
let mut hosts = HashSet::new();
@ -34,6 +33,11 @@ pub fn fetch_and_process(
let hosts = hosts.into_iter().collect();
tx.send(Message::IntroduceHosts { hosts }).unwrap();
tx.send(Message::Posts { posts }).unwrap();
tx.send(Message::Fetched {
host: host.clone(),
next_interval: Duration::from_secs(10),
}).unwrap();
}
Err(e) => {
println!("Failed fetching {}: {}", host, e);

View File

@ -2,8 +2,6 @@ use std::collections::{HashMap, BTreeMap};
use std::time::Duration;
use tokio::time::Instant;
const ERROR_INTERVAL: Duration = Duration::from_secs(86400u64);
pub struct Instance {
last_fetch: Option<Instant>,
error: bool,
@ -23,6 +21,14 @@ impl World {
}
}
pub fn size(&self) -> usize {
self.instances.len()
}
pub fn queue_len(&self) -> usize {
self.queue.len()
}
pub fn introduce(&mut self, host: String) {
let now = Instant::now();
@ -35,7 +41,7 @@ impl World {
}
}
pub fn set_fetched(&mut self, host: String, next_interval: Duration) {
pub fn enqueue(&mut self, host: String, next_interval: Duration) {
let now = Instant::now();
let instance = self.instances.get_mut(&host).unwrap();
@ -45,30 +51,20 @@ impl World {
self.queue.insert(now + next_interval, host);
}
pub fn set_error(&mut self, host: String) {
let now = Instant::now();
let instance = self.instances.get_mut(&host).unwrap();
instance.last_fetch = Some(now);
instance.error = true;
println!("enqueue at {:?}", now + ERROR_INTERVAL);
self.queue.insert(now + ERROR_INTERVAL, host);
}
pub fn dequeue(&mut self) -> Result<String, Instant> {
pub fn dequeue(&mut self) -> Result<String, Duration> {
let now = Instant::now();
if let Some(time) = self.queue.keys().next().cloned() {
if time <= now {
self.queue.remove(&time)
.map(|host| Ok(host))
.unwrap_or(Err(now + Duration::from_secs(1)))
.unwrap_or(Err(Duration::from_secs(1)))
} else {
Err(time)
Err(time - now)
}
} else {
println!("empty queue");
Err(now + Duration::from_secs(60))
Err(Duration::from_secs(60))
}
}
}