use std::time::Duration; use tokio::time::timeout; use cave::config::LoadConfig; mod config; mod scheduler; mod worker; mod trend_setter; use worker::Message; #[tokio::main] async fn main() { cave::init::exit_on_panic(); cave::init::init_logger(); let config = config::Config::load(); let mut store = cave::store::Store::new(config.redis).await; cave::systemd::status("Starting trend_setter"); let trend_setter_tx = trend_setter::start(store.clone()); cave::systemd::status("Starting scheduler"); let mut scheduler = scheduler::Scheduler::new(); cave::systemd::status("Loading known hosts from config"); for host in config.hosts.into_iter() { scheduler.introduce(host).await; } cave::systemd::status("Loading known hosts from redis"); for host in store.get_hosts().await.unwrap().into_iter() { scheduler.introduce(host).await; } cave::systemd::status("Starting HTTP client"); let client = reqwest::Client::builder() .timeout(Duration::from_secs(30)) .pool_max_idle_per_host(0) .user_agent(concat!( env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"), )) .deflate(true) .gzip(true) .build() .expect("reqwest::Client"); cave::systemd::ready(); let mut workers_active = 0usize; let (message_tx, mut message_rx) = tokio::sync::mpsc::unbounded_channel(); loop { log::trace!("{} workers active, queued {} of {}", workers_active, scheduler.queue_len(), scheduler.size()); cave::systemd::status(&format!("{} workers active, queued {} of {}", workers_active, scheduler.queue_len(), scheduler.size())); let next_task = if workers_active < config.max_workers { scheduler.dequeue() } else { Err(Duration::from_secs(5)) }; match next_task { Err(duration) => { let _ = timeout(duration, async { let message = message_rx.recv().await.unwrap(); match message { Message::Fetched { host, mean_interval, new_post_ratio } => { workers_active -= 1; scheduler.reenqueue(host, new_post_ratio, mean_interval); } Message::IntroduceHosts { hosts } => { for host in hosts.into_iter() { scheduler.introduce(host).await; } } } }).await; } Ok(host) => { workers_active += 1; tokio::spawn(worker::run( message_tx.clone(), store.clone(), trend_setter_tx.clone(), client.clone(), host )); cave::systemd::watchdog(); } } } }