use std::time::Duration; use futures::{StreamExt, pin_mut}; use metrics_util::MetricKindMask; use metrics_exporter_prometheus::PrometheusBuilder; use tokio::time::timeout; use cave::{ block_list::BlockList, config::LoadConfig, }; mod config; mod scheduler; mod worker; mod webfinger; use scheduler::InstanceHost; use worker::Message; #[global_allocator] static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; fn main() -> Result<(), Box> { tokio_uring::start(async { run().await; Ok(()) }) } async fn run() { cave::systemd::extend_timeout(100_000); cave::init::exit_on_panic(); cave::init::init_logger(5555); let config = config::Config::load(); PrometheusBuilder::new() .with_http_listener(([0; 8], config.prometheus_port)) .add_global_label("application", env!("CARGO_PKG_NAME")) .idle_timeout(MetricKindMask::ALL, Some(Duration::from_secs(6 * 3600))) .install() .unwrap(); let db = cave::db::Database::connect(&config.database).await; let mut store = cave::store::Store::new(16, config.redis, config.redis_password_file).await; let posts_cache = cave::posts_cache::PostsCache::new(65536); let block_list = BlockList::new(&config.blocklist).await; cave::systemd::status("Starting scheduler"); let mut scheduler = scheduler::Scheduler::new(block_list.clone()); cave::systemd::status("Loading known hosts from config"); for host in config.hosts.into_iter() { scheduler.introduce(InstanceHost::just_host(host)).await; } #[cfg(not(dev))] { cave::systemd::status("Loading known hosts from redis"); let mut n = 1; let mut store_ = store.clone(); let hosts = store_.get_hosts() .await.expect("get_hosts"); pin_mut!(hosts); while let Some(host) = hosts.next().await { if scheduler.introduce(InstanceHost::just_host(host.clone())).await == false { tracing::debug!("Remove host {}", host); store.remove_host(&host).await.expect("remove_host"); } n += 1; if n > 1000 { cave::systemd::extend_timeout(10_000_000); n = 0; } } } cave::systemd::status("Starting HTTP client"); let client = reqwest::Client::builder() .timeout(Duration::from_secs(30)) .tcp_keepalive(Duration::from_secs(300)) .pool_max_idle_per_host(0) .redirect(reqwest::redirect::Policy::limited(2)) .user_agent( format!("{}/{} (+https://fedi.buzz/)", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION")) ) .deflate(true) .gzip(true) .build() .expect("reqwest::Client"); cave::systemd::ready(); let mut workers_active = 0usize; let (message_tx, mut message_rx) = tokio::sync::mpsc::unbounded_channel(); loop { tracing::trace!("{} workers active, queued {} of {}", workers_active, scheduler.queue_len(), scheduler.size()); cave::systemd::status(&format!("{} workers active, queued {} of {}", workers_active, scheduler.queue_len(), scheduler.size())); metrics::gauge!("hunter_workers", workers_active as f64, "worker" => "active"); metrics::gauge!("hunter_workers", scheduler.queue_len() as f64, "worker" => "queued"); metrics::gauge!("hunter_workers", scheduler.size() as f64, "worker" => "total"); let next_task = if workers_active < config.max_workers { scheduler.dequeue() } else { Err(Duration::from_secs(5)) }; match next_task { Err(duration) => { let _ = timeout(duration, async { let message = message_rx.recv().await.unwrap(); match message { Message::WorkerDone => { workers_active -= 1; } Message::Fetched { host, mean_interval, new_post_ratio } => { scheduler.reenqueue(host, new_post_ratio, mean_interval); } Message::IntroduceHost(introduce_host) => { scheduler.introduce(introduce_host).await; } } }).await; } Ok(host) => { workers_active += 1; tokio::task::Builder::new() .name(&format!("{} worker", host.host)) .spawn(worker::run( message_tx.clone(), store.clone(), db.clone(), posts_cache.clone(), block_list.clone(), client.clone(), host )).unwrap(); metrics::counter!("hunter_worker_starts", 1); cave::systemd::watchdog(); } } } }