cave: init

This commit is contained in:
Astro 2022-11-05 20:51:18 +01:00
parent c3039f96b6
commit bd7604f639
16 changed files with 125 additions and 1101 deletions

2
.gitignore vendored
View File

@ -1,2 +1,2 @@
/*/target
/target
/result*

View File

@ -104,13 +104,40 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec8a7b6a70fde80372154c65702f00a0f56f3e1c36abbc6c440484be248856db"
[[package]]
name = "caveman-hunter"
name = "cave"
version = "0.0.0"
dependencies = [
"chrono",
"env_logger",
"log",
"redis",
"serde",
"serde_yaml",
"systemd",
"tokio",
]
[[package]]
name = "caveman-gatherer"
version = "0.0.0"
dependencies = [
"cave",
"chrono",
"log",
"redis",
"serde",
"serde_yaml",
"tokio",
]
[[package]]
name = "caveman-hunter"
version = "0.0.0"
dependencies = [
"cave",
"chrono",
"log",
"redis",
"reqwest",
"serde",
"serde_yaml",
@ -743,9 +770,9 @@ dependencies = [
[[package]]
name = "num_cpus"
version = "1.13.1"
version = "1.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19e64526ebdee182341572e50e9ad03965aa510cd94427a4549448f285e957a1"
checksum = "f6058e64324c71e02bc2b150e4f3bc8286db6c83092132ffa3f6b1eab0f9def5"
dependencies = [
"hermit-abi",
"libc",
@ -900,9 +927,9 @@ dependencies = [
[[package]]
name = "regex"
version = "1.6.0"
version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c4eb3267174b8c6c2f654116623910a0fef09c4753f8dd83db29c48a0df988b"
checksum = "e076559ef8e241f2ae3479e36f97bd5741c0330689e217ad51ce2c76808b868a"
dependencies = [
"aho-corasick",
"memchr",
@ -911,9 +938,9 @@ dependencies = [
[[package]]
name = "regex-syntax"
version = "0.6.27"
version = "0.6.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3f87b73ce11b1619a3c6332f45341e0047173771e8b8b73f87bfeefb7b56244"
checksum = "456c603be3e8d448b072f410900c09faf164fbce2d480456f50eea6e25f9c848"
[[package]]
name = "remove_dir_all"

6
Cargo.toml Normal file
View File

@ -0,0 +1,6 @@
[workspace]
members = [
"cave",
"hunter",
"gatherer",
]

15
cave/Cargo.toml Normal file
View File

@ -0,0 +1,15 @@
[package]
name = "cave"
description = "Shared caveman functionality"
version = "0.0.0"
edition = "2021"
[dependencies]
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
serde_yaml = "0.9"
chrono = "0.4"
redis = { version = "0.22", features = ["tokio-comp", "connection-manager"] }
log = "0.4"
env_logger = "0.9"
systemd = "0.10"

18
cave/src/config.rs Normal file
View File

@ -0,0 +1,18 @@
pub trait LoadConfig {
fn load() -> Self;
}
impl<T: Sized + for<'a> serde::Deserialize<'a>> LoadConfig for T {
fn load() -> Self {
let path = std::env::args()
.skip(1)
.next()
.expect("Call with config.yaml");
crate::systemd::status(&format!("Loading config file {}", path));
let config_file = std::fs::read_to_string(path)
.expect("read config");
serde_yaml::from_str(&config_file)
.expect("parse config")
}
}

14
cave/src/init.rs Normal file
View File

@ -0,0 +1,14 @@
use std::{panic, process};
pub fn exit_on_panic() {
let orig_hook = panic::take_hook();
panic::set_hook(Box::new(move |panic_info| {
// invoke the default handler and exit the process
orig_hook(panic_info);
process::exit(1);
}));
}
pub fn init_logger() {
env_logger::init();
}

3
cave/src/lib.rs Normal file
View File

@ -0,0 +1,3 @@
pub mod init;
pub mod systemd;
pub mod config;

4
cave/src/systemd.rs Normal file
View File

@ -0,0 +1,4 @@
pub fn status(text: &str) {
systemd::daemon::notify(false, [(systemd::daemon::STATE_STATUS, text)].iter())
.unwrap();
}

1054
gatherer/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -10,5 +10,4 @@ serde_yaml = "0.9"
chrono = "0.4"
redis = { version = "0.22", features = ["tokio-comp", "connection-manager"] }
log = "0.4"
env_logger = "0.9"
systemd = "0.10"
cave = { path = "../cave" }

3
gatherer/config.yaml Normal file
View File

@ -0,0 +1,3 @@
redis: redis://10.233.12.2:6379/
listen_port: 8000

5
gatherer/src/config.rs Normal file
View File

@ -0,0 +1,5 @@
#[derive(Debug, serde::Deserialize)]
pub struct Config {
pub redis: String,
pub listen_port: u16,
}

View File

@ -1,10 +1,19 @@
use cave::config::LoadConfig;
mod config;
mod tag;
mod trends;
#[tokio::main]
async fn main() {
let redis_client = redis::Client::open("redis://10.233.12.2:6379/")
// let redis_client = redis::Client::open("redis://localhost:6378/")
cave::init::exit_on_panic();
cave::init::init_logger();
let config = config::Config::load();
cave::systemd::status("Starting redis client");
let redis_client = redis::Client::open(config.redis)
.expect("redis::Client");
let mut redis_man = redis::aio::ConnectionManager::new(redis_client).await
.expect("redis::aio::ConnectionManager");

View File

@ -11,5 +11,5 @@ serde_yaml = "0.9"
chrono = "0.4"
redis = { version = "0.22", features = ["tokio-comp", "connection-manager"] }
log = "0.4"
env_logger = "0.9"
systemd = "0.10"
cave = { path = "../cave" }

View File

@ -4,12 +4,3 @@ pub struct Config {
pub hosts: Vec<String>,
pub max_workers: usize,
}
impl Config {
pub fn load_file(path: &str) -> Self {
let config_file = std::fs::read_to_string(path)
.expect(path);
serde_yaml::from_str(&config_file)
.expect("config")
}
}

View File

@ -1,6 +1,6 @@
use std::time::Duration;
use std::{panic, process};
use tokio::time::timeout;
use cave::config::LoadConfig;
mod config;
mod scheduler;
@ -10,36 +10,20 @@ mod redis_store;
use worker::Message;
fn systemd_status(status: &str) {
systemd::daemon::notify(false, [(systemd::daemon::STATE_STATUS, status)].iter())
.unwrap();
}
#[tokio::main]
async fn main() {
let orig_hook = panic::take_hook();
panic::set_hook(Box::new(move |panic_info| {
// invoke the default handler and exit the process
orig_hook(panic_info);
process::exit(1);
}));
cave::init::exit_on_panic();
cave::init::init_logger();
env_logger::init();
let config = config::Config::load();
let config_file = std::env::args()
.skip(1)
.next()
.expect("Call with config.yaml");
systemd_status(&format!("Loading config file {}", config_file));
let config = config::Config::load_file(&config_file);
systemd_status("Starting redis client");
cave::systemd::status("Starting redis client");
let redis_client = redis::Client::open(config.redis)
.expect("redis::Client");
let mut redis_man = redis::aio::ConnectionManager::new(redis_client).await
.expect("redis::aio::ConnectionManager");
systemd_status("Starting scheduler");
cave::systemd::status("Starting scheduler");
let mut scheduler = scheduler::Scheduler::new();
for host in config.hosts.into_iter() {
scheduler.introduce(host).await;
@ -48,7 +32,7 @@ async fn main() {
scheduler.introduce(host).await;
}
systemd_status("Starting HTTP client");
cave::systemd::status("Starting HTTP client");
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(30))
.pool_max_idle_per_host(0)
@ -62,7 +46,7 @@ async fn main() {
.build()
.expect("reqwest::Client");
systemd_status("Ready");
cave::systemd::status("Ready");
systemd::daemon::notify(false, [(systemd::daemon::STATE_READY, "1")].iter())
.unwrap();
@ -70,7 +54,7 @@ async fn main() {
let (message_tx, mut message_rx) = tokio::sync::mpsc::unbounded_channel();
loop {
log::trace!("{} workers active, queued {} of {}", workers_active, scheduler.queue_len(), scheduler.size());
systemd_status(&format!("{} workers active, queued {} of {}", workers_active, scheduler.queue_len(), scheduler.size()));
cave::systemd::status(&format!("{} workers active, queued {} of {}", workers_active, scheduler.queue_len(), scheduler.size()));
let next_task = if workers_active < config.max_workers {
scheduler.dequeue()
} else {