caveman/cave/src/store.rs

157 lines
4.5 KiB
Rust
Raw Normal View History

2022-11-08 00:43:46 +01:00
use crate::feed::Post;
const POST_EXPIRE: usize = 86400;
const TAG_EXPIRE: i64 = 30 * 24;
const LANGUAGE_EXPIRE: usize = 86400;
#[derive(Clone)]
pub struct Store {
man: redis::aio::ConnectionManager,
}
impl Store {
pub async fn new(redis_url: &str) -> Self {
crate::systemd::status("Starting redis client");
let client = redis::Client::open(redis_url)
.expect("redis::Client");
let man = redis::aio::ConnectionManager::new(client).await
.expect("redis::aio::ConnectionManager");
Self { man }
}
pub async fn save_post(&mut self, post: Post) -> bool {
let post_key = format!("p:{}", post.uri);
let check = redis::pipe()
.getset(&post_key, "1")
.expire(post_key, POST_EXPIRE)
.ignore()
.query_async::<_, redis::Value>(&mut self.man)
.await
.unwrap();
if check != redis::Value::Bulk(vec![redis::Value::Nil]) {
// post is not new
return false;
}
log::info!("New post ({}{} tags): {}",
if post.account.bot { "bot, " } else { "" },
post.tags.len(), post.uri);
self.save_post_tags(post).await;
// post was new
true
}
async fn save_post_tags(&mut self, post: Post) {
if post.account.bot || post.tags.is_empty() {
// irrelevant
return;
}
let host = match post.uri_host() {
Some(host) => host,
None => {
log::warn!("no uri_host");
return;
},
};
let timestamp = match post.timestamp() {
Some(timestamp) => timestamp,
None => {
log::warn!("no timestamp");
return;
}
};
let hour = timestamp.naive_utc().timestamp() / 3600;
let until = chrono::offset::Utc::now().naive_utc().timestamp() / 3600;
if hour > until {
log::warn!("future post from {}", timestamp);
return;
}
let from = until - TAG_EXPIRE;
if hour < from {
log::warn!("ancient post from {}", timestamp);
return;
}
// clip "en-us" to "en"
2022-11-08 00:43:46 +01:00
let language = post.lang();
let mut cmd = redis::pipe();
if let Some(language) = &language {
let language_key = format!("r:{}", language);
cmd.incr(&language_key, 1)
.ignore()
.expire(&language_key, LANGUAGE_EXPIRE)
.ignore();
}
2022-11-08 00:43:46 +01:00
let mut store_tags = |spellings, tag_key| {
// by hour
cmd.hincr(
&tag_key,
format!("t:{}", hour),
1
).ignore();
// by spelling
2022-11-08 00:43:46 +01:00
for spelling in spellings {
cmd.hincr(
&tag_key,
format!("s:{}", spelling),
1
).ignore();
}
// by instance
cmd.hincr(
tag_key,
format!("h:{}", host),
1
).ignore();
};
2022-11-08 00:43:46 +01:00
for (name, spellings) in post.tags_set() {
log::debug!("tag {}", name);
// global
2022-11-08 00:43:46 +01:00
store_tags(spellings.clone(), format!("g:{}", name));
// by language
if let Some(language) = &language {
2022-11-08 00:43:46 +01:00
store_tags(spellings, format!("l:{}:{}", language, name));
}
}
match cmd.query_async(&mut self.man).await {
Ok(()) => {}
Err(e) => {
log::error!("redis error: {:?}", e);
}
}
}
pub async fn save_host(&mut self, host: &str) {
redis::Cmd::set(format!("h:{}", host), "1")
.query_async::<_, ()>(&mut self.man)
.await
.unwrap();
}
pub async fn get_hosts(&mut self) -> Vec<String> {
let mut results = vec![];
self.scan("h:*", |key| results.push(key[2..].to_string())).await;
results
}
async fn scan<F: FnMut(String)>(
&mut self,
pattern: &str,
mut f: F,
) {
let mut cmd = redis::cmd("SCAN");
cmd.cursor_arg(0).arg("MATCH").arg(pattern);
let mut iter = cmd.iter_async::<String>(&mut self.man)
.await
.unwrap();
while let Some(key) = iter.next_item().await {
f(key);
}
}
}