143 lines
4.8 KiB
Rust
143 lines
4.8 KiB
Rust
use std::time::Duration;
|
|
use futures::{StreamExt, pin_mut};
|
|
use metrics_util::MetricKindMask;
|
|
use metrics_exporter_prometheus::PrometheusBuilder;
|
|
use tokio::time::timeout;
|
|
use cave::config::LoadConfig;
|
|
|
|
mod config;
|
|
mod scheduler;
|
|
mod worker;
|
|
mod posts_cache;
|
|
mod trend_setter;
|
|
mod tag_trimmer;
|
|
|
|
use worker::Message;
|
|
|
|
#[cfg(not(target_env = "msvc"))]
|
|
#[global_allocator]
|
|
static GLOBAL: jemallocator::Jemalloc = jemallocator::Jemalloc;
|
|
|
|
#[tokio::main]
|
|
async fn main() {
|
|
cave::systemd::extend_timeout(100_000);
|
|
|
|
cave::init::exit_on_panic();
|
|
cave::init::init_logger(5555);
|
|
|
|
let config = config::Config::load();
|
|
|
|
PrometheusBuilder::new()
|
|
.with_http_listener(([0; 8], config.prometheus_port))
|
|
.add_global_label("application", env!("CARGO_PKG_NAME"))
|
|
.idle_timeout(MetricKindMask::ALL, Some(Duration::from_secs(600)))
|
|
.install()
|
|
.unwrap();
|
|
|
|
let mut store = cave::store::Store::new(16, config.redis).await;
|
|
let posts_cache = posts_cache::PostsCache::new(65536);
|
|
|
|
cave::systemd::status("Starting trend_setter");
|
|
let trend_setter_tx = trend_setter::start(store.clone());
|
|
|
|
cave::systemd::status("Starting tag_trimmer");
|
|
tag_trimmer::start(store.clone());
|
|
|
|
cave::systemd::status("Starting scheduler");
|
|
let mut scheduler = scheduler::Scheduler::new();
|
|
cave::systemd::status("Loading known hosts from config");
|
|
for host in config.hosts.into_iter() {
|
|
scheduler.introduce(host).await;
|
|
}
|
|
#[cfg(not(debug))]
|
|
{
|
|
cave::systemd::status("Loading known hosts from redis");
|
|
let mut n = 1;
|
|
|
|
let mut store_ = store.clone();
|
|
let hosts = store_.get_hosts()
|
|
.await.expect("get_hosts");
|
|
pin_mut!(hosts);
|
|
while let Some(host) = hosts.next().await {
|
|
if scheduler.introduce(host.clone()).await == false {
|
|
tracing::debug!("Remove host {}", host);
|
|
store.remove_host(&host).await.expect("remove_host");
|
|
}
|
|
|
|
n += 1;
|
|
if n > 1000 {
|
|
cave::systemd::extend_timeout(10_000_000);
|
|
n = 0;
|
|
}
|
|
}
|
|
}
|
|
|
|
cave::systemd::status("Starting HTTP client");
|
|
let client = reqwest::Client::builder()
|
|
.timeout(Duration::from_secs(30))
|
|
.tcp_keepalive(Duration::from_secs(300))
|
|
.pool_max_idle_per_host(0)
|
|
.user_agent(concat!(
|
|
env!("CARGO_PKG_NAME"),
|
|
"/",
|
|
env!("CARGO_PKG_VERSION"),
|
|
))
|
|
.deflate(true)
|
|
.gzip(true)
|
|
.trust_dns(true)
|
|
.build()
|
|
.expect("reqwest::Client");
|
|
|
|
cave::systemd::ready();
|
|
|
|
let mut workers_active = 0usize;
|
|
let (message_tx, mut message_rx) = tokio::sync::mpsc::unbounded_channel();
|
|
loop {
|
|
tracing::trace!("{} workers active, queued {} of {}", workers_active, scheduler.queue_len(), scheduler.size());
|
|
cave::systemd::status(&format!("{} workers active, queued {} of {}", workers_active, scheduler.queue_len(), scheduler.size()));
|
|
metrics::gauge!("hunter_workers", workers_active as f64, "worker" => "active");
|
|
metrics::gauge!("hunter_workers", scheduler.queue_len() as f64, "worker" => "queued");
|
|
metrics::gauge!("hunter_workers", scheduler.size() as f64, "worker" => "total");
|
|
let next_task = if workers_active < config.max_workers {
|
|
scheduler.dequeue()
|
|
} else {
|
|
Err(Duration::from_secs(5))
|
|
};
|
|
match next_task {
|
|
Err(duration) => {
|
|
let _ = timeout(duration, async {
|
|
let message = message_rx.recv().await.unwrap();
|
|
match message {
|
|
Message::WorkerDone => {
|
|
workers_active -= 1;
|
|
}
|
|
Message::Fetched { host, mean_interval, new_post_ratio } => {
|
|
scheduler.reenqueue(host, new_post_ratio, mean_interval);
|
|
}
|
|
Message::IntroduceHosts { hosts } => {
|
|
for host in hosts.into_iter() {
|
|
scheduler.introduce(host).await;
|
|
}
|
|
}
|
|
}
|
|
}).await;
|
|
}
|
|
Ok(host) => {
|
|
workers_active += 1;
|
|
tokio::task::Builder::new()
|
|
.name(&format!("{} worker", host))
|
|
.spawn(worker::run(
|
|
message_tx.clone(),
|
|
store.clone(),
|
|
trend_setter_tx.clone(),
|
|
posts_cache.clone(),
|
|
client.clone(),
|
|
host
|
|
)).unwrap();
|
|
metrics::counter!("hunter_worker_starts", 1);
|
|
cave::systemd::watchdog();
|
|
}
|
|
}
|
|
}
|
|
}
|