caveman/hunter/src/main.rs
2022-12-03 01:47:44 +01:00

117 lines
3.7 KiB
Rust

use std::time::Duration;
use futures::{StreamExt, pin_mut};
use tokio::time::timeout;
use cave::config::LoadConfig;
mod config;
mod scheduler;
mod worker;
mod posts_cache;
mod trend_setter;
mod tag_trimmer;
use worker::Message;
#[cfg(not(target_env = "msvc"))]
#[global_allocator]
static GLOBAL: jemallocator::Jemalloc = jemallocator::Jemalloc;
#[tokio::main]
async fn main() {
cave::systemd::extend_timeout(100_000);
cave::init::exit_on_panic();
cave::init::init_logger();
let config = config::Config::load();
let mut store = cave::store::Store::new(16, config.redis).await;
let posts_cache = posts_cache::PostsCache::new(65536);
cave::systemd::status("Starting trend_setter");
let trend_setter_tx = trend_setter::start(store.clone());
cave::systemd::status("Starting tag_trimmer");
tag_trimmer::start(store.clone());
cave::systemd::status("Starting scheduler");
let mut scheduler = scheduler::Scheduler::new();
cave::systemd::status("Loading known hosts from config");
for host in config.hosts.into_iter() {
scheduler.introduce(host).await;
}
#[cfg(not(debug))]
{
cave::systemd::status("Loading known hosts from redis");
let hosts = store.get_hosts()
.await.expect("get_hosts");
pin_mut!(hosts);
while let Some(host) = hosts.next().await {
scheduler.introduce(host).await;
cave::systemd::extend_timeout(10_000_000);
}
}
cave::systemd::status("Starting HTTP client");
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.pool_max_idle_per_host(0)
.user_agent(concat!(
env!("CARGO_PKG_NAME"),
"/",
env!("CARGO_PKG_VERSION"),
))
.deflate(true)
.gzip(true)
.trust_dns(true)
.build()
.expect("reqwest::Client");
cave::systemd::ready();
let mut workers_active = 0usize;
let (message_tx, mut message_rx) = tokio::sync::mpsc::unbounded_channel();
loop {
log::trace!("{} workers active, queued {} of {}", workers_active, scheduler.queue_len(), scheduler.size());
cave::systemd::status(&format!("{} workers active, queued {} of {}", workers_active, scheduler.queue_len(), scheduler.size()));
let next_task = if workers_active < config.max_workers {
scheduler.dequeue()
} else {
Err(Duration::from_secs(5))
};
match next_task {
Err(duration) => {
let _ = timeout(duration, async {
let message = message_rx.recv().await.unwrap();
match message {
Message::WorkerDone => {
workers_active -= 1;
}
Message::Fetched { host, mean_interval, new_post_ratio } => {
scheduler.reenqueue(host, new_post_ratio, mean_interval);
}
Message::IntroduceHosts { hosts } => {
for host in hosts.into_iter() {
scheduler.introduce(host).await;
}
}
}
}).await;
}
Ok(host) => {
workers_active += 1;
tokio::spawn(worker::run(
message_tx.clone(),
store.clone(),
trend_setter_tx.clone(),
posts_cache.clone(),
client.clone(),
host
));
cave::systemd::watchdog();
}
}
}
}