use std::time::Duration; use std::{panic, process}; use tokio::time::timeout; mod config; mod scheduler; mod feed; mod worker; mod redis_store; use worker::Message; #[tokio::main] async fn main() { let orig_hook = panic::take_hook(); panic::set_hook(Box::new(move |panic_info| { // invoke the default handler and exit the process orig_hook(panic_info); process::exit(1); })); let config = config::Config::load_file( &std::env::args() .skip(1) .next() .expect("Call with config.yaml") ); let redis_client = redis::Client::open(config.redis) .expect("redis::Client"); let redis_man = redis::aio::ConnectionManager::new(redis_client).await .expect("redis::aio::ConnectionManager"); let mut scheduler = scheduler::Scheduler::new(); for host in config.hosts.into_iter() { scheduler.introduce(host); } let client = reqwest::Client::builder() .timeout(Duration::from_secs(30)) .user_agent(concat!( env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"), )) .deflate(true) .gzip(true) .build() .expect("reqwest::Client"); let mut workers_active = 0usize; let (message_tx, mut message_rx) = tokio::sync::mpsc::unbounded_channel(); loop { println!("{} workers active, queued {} of {}", workers_active, scheduler.queue_len(), scheduler.size()); let next_task = if workers_active < config.max_workers { scheduler.dequeue() } else { Err(Duration::from_secs(5)) }; match next_task { Err(duration) => { let _ = timeout(duration, async { let message = message_rx.recv().await.unwrap(); match message { Message::Fetched { host, next_interval, new_posts } => { workers_active -= 1; scheduler.enqueue(host, new_posts > 0, next_interval); } Message::Error { host } => { workers_active -= 1; scheduler.enqueue(host, false, Duration::from_secs(config.interval_after_error)); } Message::IntroduceHosts { hosts } => { for host in hosts.into_iter() { scheduler.introduce(host); } } } }).await; } Ok(host) => { println!("Fetch {}", host); workers_active += 1; worker::fetch_and_process( message_tx.clone(), redis_man.clone(), client.clone(), host ); } } } }