caveman/src/main.rs

83 lines
2.5 KiB
Rust
Raw Normal View History

2022-11-02 21:12:16 +01:00
use std::time::Duration;
2022-11-02 22:06:43 +01:00
use tokio::time::timeout;
2022-11-02 21:12:16 +01:00
2022-11-02 22:06:43 +01:00
mod config;
2022-11-02 21:12:16 +01:00
mod world;
mod feed;
mod worker;
2022-11-03 01:21:53 +01:00
mod trends;
2022-11-02 21:12:16 +01:00
use worker::Message;
#[tokio::main]
async fn main() {
2022-11-02 22:06:43 +01:00
let config = config::Config::load_file(
&std::env::args()
.skip(1)
.next()
.expect("Call with config.yaml")
);
2022-11-02 21:12:16 +01:00
2022-11-02 22:06:43 +01:00
let mut world = world::World::new();
for host in config.hosts.into_iter() {
world.introduce(host);
}
2022-11-02 21:12:16 +01:00
let client = reqwest::Client::builder()
.user_agent(concat!(
env!("CARGO_PKG_NAME"),
"/",
env!("CARGO_PKG_VERSION"),
))
.deflate(true)
.gzip(true)
.build()
.expect("reqwest::Client");
2022-11-03 01:21:53 +01:00
let (posts_tx, posts_rx) = tokio::sync::mpsc::unbounded_channel();
trends::spawn(&config.redis, posts_rx).await;
2022-11-02 22:06:43 +01:00
let mut workers_active = 0usize;
2022-11-03 01:21:53 +01:00
let (message_tx, mut message_rx) = tokio::sync::mpsc::unbounded_channel();
2022-11-02 21:12:16 +01:00
loop {
2022-11-02 23:10:59 +01:00
// println!("{} workers active, queued {} of {}", workers_active, world.queue_len(), world.size());
2022-11-02 22:06:43 +01:00
let next_task = if workers_active < config.max_workers {
world.dequeue()
} else {
2022-11-03 00:27:16 +01:00
Err(Duration::from_secs(5))
2022-11-02 22:06:43 +01:00
};
match next_task {
Err(duration) => {
2022-11-02 21:49:37 +01:00
let _ = timeout(duration, async {
2022-11-03 01:21:53 +01:00
let message = message_rx.recv().await.unwrap();
2022-11-02 21:49:37 +01:00
match message {
2022-11-02 23:10:59 +01:00
Message::Fetched { host, next_interval, latest_timestamp } => {
2022-11-02 22:06:43 +01:00
workers_active -= 1;
2022-11-02 23:10:59 +01:00
world.enqueue(host, next_interval, latest_timestamp);
2022-11-02 22:06:43 +01:00
}
Message::Error { host } => {
workers_active -= 1;
2022-11-02 23:10:59 +01:00
world.enqueue(host, Duration::from_secs(config.interval_after_error), None);
2022-11-02 22:06:43 +01:00
}
2022-11-02 21:49:37 +01:00
Message::IntroduceHosts { hosts } => {
for host in hosts.into_iter() {
world.introduce(host);
2022-11-02 21:12:16 +01:00
}
}
}
}).await;
}
Ok(host) => {
println!("Fetch {}", host);
2022-11-02 22:06:43 +01:00
workers_active += 1;
2022-11-03 01:21:53 +01:00
worker::fetch_and_process(
message_tx.clone(),
posts_tx.clone(),
client.clone(),
host
);
2022-11-02 21:12:16 +01:00
}
}
}
}