caveman/cave/src/store.rs

393 lines
11 KiB
Rust
Raw Normal View History

2022-11-10 15:22:45 +01:00
use std::pin::Pin;
use bb8::ManageConnection;
2022-11-12 01:02:44 +01:00
use futures::{Future, Stream, stream::unfold, StreamExt};
2022-11-10 15:22:45 +01:00
use redis::{Value, RedisError, aio::ConnectionLike};
use crate::{feed::Post, trend_tag::TrendTag};
const POST_EXPIRE: usize = 86400;
const TAG_EXPIRE: i64 = 30 * 24;
pub const TREND_POOL_SIZE: usize = 30;
pub type Error = RedisError;
2022-11-10 15:22:45 +01:00
/// wrapper so we can impl ManageConnection
struct RedisPool {
redis_url: String,
}
impl ManageConnection for RedisPool {
type Connection = redis::aio::ConnectionManager;
type Error = Error;
fn connect<'life0, 'async_trait>(
&'life0 self
) -> Pin<Box<dyn Future<Output = Result<Self::Connection, Self::Error>> + Send + 'async_trait>>
where
'life0: 'async_trait,
Self: 'async_trait
{
Box::pin(async {
let client = redis::Client::open(&self.redis_url[..])
.expect("redis::Client");
let manager = redis::aio::ConnectionManager::new(client)
.await
.expect("redis::Client");
Ok(manager)
})
}
fn is_valid<'life0, 'life1, 'async_trait>(
&'life0 self,
conn: &'life1 mut Self::Connection
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send + 'async_trait>>
where
'life0: 'async_trait,
'life1: 'async_trait,
Self: 'async_trait
{
Box::pin(async {
redis::cmd("PING")
.query_async::<_, ()>(conn)
.await
})
}
fn has_broken(&self, _conn: &mut Self::Connection) -> bool {
false
}
}
#[derive(Clone)]
pub struct Store {
2022-11-10 15:22:45 +01:00
pool: bb8::Pool<RedisPool>,
}
impl ConnectionLike for Store {
fn req_packed_command<'a>(
&'a mut self,
cmd: &'a redis::Cmd
) -> redis::RedisFuture<'a, Value> {
Box::pin(async move {
let mut conn = self.pool.get().await.unwrap();
conn.req_packed_command(cmd).await
})
}
fn req_packed_commands<'a>(
&'a mut self,
cmd: &'a redis::Pipeline,
offset: usize,
count: usize
) -> redis::RedisFuture<'a, Vec<Value>> {
Box::pin(async move {
let mut conn = self.pool.get().await.unwrap();
conn.req_packed_commands(cmd, offset, count).await
})
}
fn get_db(&self) -> i64 {
// wrong ;-)
0
}
}
impl Store {
2022-11-12 02:47:21 +01:00
pub async fn new(pool_max_size: u32, redis_url: String) -> Self {
crate::systemd::status("Starting redis client");
2022-11-10 15:22:45 +01:00
let pool = bb8::Pool::builder()
2022-11-12 02:47:21 +01:00
.max_size(pool_max_size)
2022-11-10 15:22:45 +01:00
.build(RedisPool { redis_url })
.await
.unwrap();
Self { pool }
}
pub async fn save_post(&mut self, post: Post) -> Result<bool, RedisError> {
let post_key = format!("p:{}", post.uri);
let check = redis::pipe()
.getset(&post_key, "1")
.expire(post_key, POST_EXPIRE)
.ignore()
2022-11-10 15:22:45 +01:00
.query_async::<_, Value>(self)
.await?;
if check != Value::Bulk(vec![Value::Nil]) {
// post is not new
return Ok(false);
}
log::info!("New post ({}{} tags): {}",
if post.account.bot { "bot, " } else { "" },
post.tags.len(), post.uri);
self.save_post_tags(post).await;
// post was new
Ok(true)
}
async fn save_post_tags(&mut self, post: Post) {
if post.account.bot || post.tags.is_empty() {
// irrelevant
return;
}
let host = match post.uri_host() {
Some(host) => host,
None => {
log::warn!("no uri_host");
return;
},
};
let timestamp = match post.timestamp() {
Some(timestamp) => timestamp,
None => {
log::warn!("no timestamp");
return;
}
};
let hour = timestamp.naive_utc().timestamp() / 3600;
let until = chrono::offset::Utc::now().naive_utc().timestamp() / 3600;
if hour > until {
log::warn!("future post from {}", timestamp);
return;
}
let from = until - TAG_EXPIRE;
if hour < from {
log::warn!("ancient post from {}", timestamp);
return;
}
// clip "en-us" to "en"
2022-11-08 00:43:46 +01:00
let language = post.lang();
let mut cmd = redis::pipe();
2022-11-08 00:43:46 +01:00
let mut store_tags = |spellings, tag_key| {
// by hour
cmd.hincr(
&tag_key,
format!("t:{}", hour),
1
).ignore();
// by spelling
2022-11-08 00:43:46 +01:00
for spelling in spellings {
cmd.hincr(
&tag_key,
format!("s:{}", spelling),
1
).ignore();
}
// by instance
cmd.hincr(
tag_key,
format!("h:{}", host),
1
).ignore();
};
2022-11-08 00:43:46 +01:00
for (name, spellings) in post.tags_set() {
log::debug!("tag {}", name);
// global
2022-11-08 00:43:46 +01:00
store_tags(spellings.clone(), format!("g:{}", name));
// by language
if let Some(language) = &language {
2022-11-08 00:43:46 +01:00
store_tags(spellings, format!("l:{}:{}", language, name));
}
}
2022-11-10 15:22:45 +01:00
match cmd.query_async(self).await {
Ok(()) => {}
Err(e) => {
log::error!("redis error: {:?}", e);
}
}
}
pub async fn save_host(&mut self, host: &str) -> Result<(), RedisError> {
redis::Cmd::set(format!("h:{}", host), "1")
2022-11-10 15:22:45 +01:00
.query_async::<_, ()>(self)
.await
}
2022-11-12 01:02:44 +01:00
pub async fn get_hosts(&mut self) -> Result<impl Stream<Item = String> + '_, RedisError> {
self.scan_prefix("h:")
.await
}
pub async fn get_languages(&mut self) -> Result<Vec<String>, RedisError> {
redis::Cmd::hkeys("r")
2022-11-10 15:22:45 +01:00
.query_async(self)
.await
2022-11-10 02:47:09 +01:00
}
2022-11-12 01:02:44 +01:00
pub async fn get_tags_global(&mut self) -> Result<impl Stream<Item = String> + '_, RedisError> {
let global = self.scan_prefix("g:")
.await?
.map(|tag| tag);
Ok(global)
}
pub async fn get_tags_by_language(&mut self) -> Result<impl Stream<Item = (Option<String>, String)> + '_, RedisError> {
let by_language = self.scan("l:")
.await?
.filter_map(|key| async move {
let s = &key[2..];
if let Some(i) = s.find(':') {
let language = s[..i].to_string();
let tag = s[i + 1..].to_string();
Some((Some(language), tag))
} else {
None
}
});
Ok(by_language)
}
pub async fn scan_prefix<'a>(&'a mut self, prefix: &'a str) -> Result<impl Stream<Item = String> + '_, RedisError> {
let keys = self.scan(&format!("{}*", prefix))
.await?
.map(|key| key[prefix.len()..].to_string());
Ok(keys)
}
pub async fn scan(&mut self, pattern: &str) -> Result<impl Stream<Item = String> + '_, RedisError> {
let mut cmd = redis::cmd("SCAN");
2022-11-10 02:47:09 +01:00
cmd.cursor_arg(0)
2022-11-12 01:02:44 +01:00
.arg("MATCH").arg(pattern);
let iter = cmd.iter_async::<String>(self)
.await?;
2022-11-12 01:02:44 +01:00
let stream = unfold(iter, |mut iter| async move {
iter.next_item().await
.map(|tag| (tag, iter))
});
Ok(stream)
}
pub async fn get_trend_tags(
&mut self,
language: &Option<String>,
2022-11-09 19:03:03 +01:00
names_in: impl Iterator<Item = String>,
) -> Result<Vec<TrendTag>, RedisError> {
let mut cmd = redis::pipe();
let prefix = match language {
Some(language) => format!("l:{}:", language),
None => "g:".to_string(),
};
2022-11-09 19:03:03 +01:00
let names: Vec<String> = names_in.map(|name| {
cmd.hgetall(format!("{}{}", prefix, name));
2022-11-09 19:03:03 +01:00
name
}).collect();
2022-11-10 15:22:45 +01:00
let hashes = cmd.query_async::<_, Vec<Vec<String>>>(self).await?;
let results = names.into_iter()
.zip(hashes)
2022-11-09 19:03:03 +01:00
.map(|(name, hash_values)| TrendTag::from_hash(name, hash_values))
.collect();
Ok(results)
}
pub async fn get_trend_pools(
&mut self,
language: &Option<String>,
periods: &[u64],
) -> Result<Vec<(u64, Vec<String>)>, RedisError> {
let mut cmd = redis::pipe();
for period in periods {
cmd.smembers(pool_key(language, *period));
}
2022-11-10 15:22:45 +01:00
let sets: Vec<Vec<String>> = cmd.query_async(self)
.await?;
let results = periods.iter().cloned()
.zip(sets.into_iter())
.collect();
Ok(results)
}
pub async fn update_trend_pools(
&mut self,
language: &Option<String>,
remove: impl Iterator<Item = (u64, Vec<&str>)>,
add: impl Iterator<Item = (u64, Vec<&str>)>,
pool_sizes: impl Iterator<Item = (u64, usize)>,
) -> Result<(), RedisError> {
let mut cmd = redis::pipe();
for (period, tags) in remove {
if ! tags.is_empty() {
2022-11-09 19:03:03 +01:00
let pool_key = pool_key(language, period);
cmd.srem(&pool_key, tags)
.ignore()
.expire(pool_key, period as usize * 3600)
.ignore();
}
}
for (period, tags) in add {
if ! tags.is_empty() {
2022-11-09 19:03:03 +01:00
let pool_key = pool_key(language, period);
cmd.sadd(&pool_key, tags)
.ignore()
.expire(pool_key, period as usize * 3600)
.ignore();
}
}
if let Some(language) = language {
let max_pool_size = pool_sizes.map(|(_, pool_size)| pool_size)
.max()
.unwrap_or(0);
if max_pool_size > 0 {
cmd.hset("r", language, max_pool_size)
.ignore();
} else {
cmd.hdel("r", language)
.ignore();
}
}
2022-11-10 15:22:45 +01:00
cmd.query_async(self)
.await?;
Ok(())
}
2022-11-12 01:02:44 +01:00
pub async fn delete_tag_hours(
&mut self,
language: &Option<String>,
tag: &str,
hours: impl Iterator<Item = u64>,
) -> Result<(), RedisError> {
let key = match language {
Some(language) => format!("l:{}:{}", language, tag),
None => format!("g:{}", tag),
};
let hash_keys = hours.map(|hour| format!("t:{}", hour))
.collect::<Vec<_>>();
if hash_keys.len() > 0 {
redis::Cmd::hdel(key, hash_keys)
.query_async(self)
.await
} else {
Ok(())
}
}
pub async fn delete_tag(
&mut self,
language: &Option<String>,
tag: &str,
) -> Result<(), RedisError> {
let key = match language {
Some(language) => format!("l:{}:{}", language, tag),
None => format!("g:{}", tag),
};
redis::Cmd::del(key)
.query_async(self)
.await
}
}
fn pool_key(language: &Option<String>, period: u64) -> String {
match language {
Some(language) =>
format!("q:{}:{}", period, language),
None =>
format!("q:{}", period),
}
}