2022-11-09 18:11:02 +01:00
|
|
|
use redis::{Value, RedisError};
|
|
|
|
use crate::{feed::Post, trend_tag::TrendTag};
|
2022-11-07 22:07:07 +01:00
|
|
|
|
|
|
|
const POST_EXPIRE: usize = 86400;
|
|
|
|
const TAG_EXPIRE: i64 = 30 * 24;
|
|
|
|
|
2022-11-09 18:11:02 +01:00
|
|
|
pub const TREND_POOL_SIZE: usize = 30;
|
|
|
|
|
|
|
|
pub type Error = RedisError;
|
|
|
|
|
2022-11-07 22:07:07 +01:00
|
|
|
#[derive(Clone)]
|
|
|
|
pub struct Store {
|
|
|
|
man: redis::aio::ConnectionManager,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Store {
|
|
|
|
pub async fn new(redis_url: &str) -> Self {
|
|
|
|
crate::systemd::status("Starting redis client");
|
|
|
|
let client = redis::Client::open(redis_url)
|
|
|
|
.expect("redis::Client");
|
|
|
|
let man = redis::aio::ConnectionManager::new(client).await
|
|
|
|
.expect("redis::aio::ConnectionManager");
|
|
|
|
Self { man }
|
|
|
|
}
|
|
|
|
|
2022-11-09 18:11:02 +01:00
|
|
|
pub async fn save_post(&mut self, post: Post) -> Result<bool, RedisError> {
|
2022-11-07 22:07:07 +01:00
|
|
|
let post_key = format!("p:{}", post.uri);
|
|
|
|
let check = redis::pipe()
|
|
|
|
.getset(&post_key, "1")
|
|
|
|
.expire(post_key, POST_EXPIRE)
|
|
|
|
.ignore()
|
2022-11-09 18:11:02 +01:00
|
|
|
.query_async::<_, Value>(&mut self.man)
|
|
|
|
.await?;
|
|
|
|
if check != Value::Bulk(vec![Value::Nil]) {
|
2022-11-07 22:07:07 +01:00
|
|
|
// post is not new
|
2022-11-09 18:11:02 +01:00
|
|
|
return Ok(false);
|
2022-11-07 22:07:07 +01:00
|
|
|
}
|
|
|
|
log::info!("New post ({}{} tags): {}",
|
|
|
|
if post.account.bot { "bot, " } else { "" },
|
|
|
|
post.tags.len(), post.uri);
|
|
|
|
|
|
|
|
self.save_post_tags(post).await;
|
|
|
|
|
|
|
|
// post was new
|
2022-11-09 18:11:02 +01:00
|
|
|
Ok(true)
|
2022-11-07 22:07:07 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
async fn save_post_tags(&mut self, post: Post) {
|
|
|
|
if post.account.bot || post.tags.is_empty() {
|
|
|
|
// irrelevant
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
let host = match post.uri_host() {
|
|
|
|
Some(host) => host,
|
|
|
|
None => {
|
|
|
|
log::warn!("no uri_host");
|
|
|
|
return;
|
|
|
|
},
|
|
|
|
};
|
|
|
|
|
|
|
|
let timestamp = match post.timestamp() {
|
|
|
|
Some(timestamp) => timestamp,
|
|
|
|
None => {
|
|
|
|
log::warn!("no timestamp");
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
};
|
|
|
|
let hour = timestamp.naive_utc().timestamp() / 3600;
|
|
|
|
let until = chrono::offset::Utc::now().naive_utc().timestamp() / 3600;
|
|
|
|
if hour > until {
|
|
|
|
log::warn!("future post from {}", timestamp);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
let from = until - TAG_EXPIRE;
|
|
|
|
if hour < from {
|
|
|
|
log::warn!("ancient post from {}", timestamp);
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
// clip "en-us" to "en"
|
2022-11-08 00:43:46 +01:00
|
|
|
let language = post.lang();
|
2022-11-07 22:07:07 +01:00
|
|
|
|
|
|
|
let mut cmd = redis::pipe();
|
|
|
|
if let Some(language) = &language {
|
2022-11-10 02:47:09 +01:00
|
|
|
cmd.hincr("r", language, 1)
|
2022-11-07 22:07:07 +01:00
|
|
|
.ignore();
|
|
|
|
}
|
2022-11-08 00:43:46 +01:00
|
|
|
let mut store_tags = |spellings, tag_key| {
|
2022-11-07 22:07:07 +01:00
|
|
|
// by hour
|
|
|
|
cmd.hincr(
|
|
|
|
&tag_key,
|
|
|
|
format!("t:{}", hour),
|
|
|
|
1
|
|
|
|
).ignore();
|
|
|
|
// by spelling
|
2022-11-08 00:43:46 +01:00
|
|
|
for spelling in spellings {
|
|
|
|
cmd.hincr(
|
|
|
|
&tag_key,
|
|
|
|
format!("s:{}", spelling),
|
|
|
|
1
|
|
|
|
).ignore();
|
|
|
|
}
|
2022-11-07 22:07:07 +01:00
|
|
|
// by instance
|
|
|
|
cmd.hincr(
|
|
|
|
tag_key,
|
|
|
|
format!("h:{}", host),
|
|
|
|
1
|
|
|
|
).ignore();
|
|
|
|
};
|
2022-11-08 00:43:46 +01:00
|
|
|
for (name, spellings) in post.tags_set() {
|
|
|
|
log::debug!("tag {}", name);
|
2022-11-07 22:07:07 +01:00
|
|
|
// global
|
2022-11-08 00:43:46 +01:00
|
|
|
store_tags(spellings.clone(), format!("g:{}", name));
|
2022-11-07 22:07:07 +01:00
|
|
|
// by language
|
|
|
|
if let Some(language) = &language {
|
2022-11-08 00:43:46 +01:00
|
|
|
store_tags(spellings, format!("l:{}:{}", language, name));
|
2022-11-07 22:07:07 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
match cmd.query_async(&mut self.man).await {
|
|
|
|
Ok(()) => {}
|
|
|
|
Err(e) => {
|
|
|
|
log::error!("redis error: {:?}", e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-11-09 18:11:02 +01:00
|
|
|
pub async fn save_host(&mut self, host: &str) -> Result<(), RedisError> {
|
2022-11-07 22:07:07 +01:00
|
|
|
redis::Cmd::set(format!("h:{}", host), "1")
|
|
|
|
.query_async::<_, ()>(&mut self.man)
|
|
|
|
.await
|
|
|
|
}
|
|
|
|
|
2022-11-09 18:11:02 +01:00
|
|
|
pub async fn get_hosts(&mut self) -> Result<Vec<String>, RedisError> {
|
2022-11-07 22:07:07 +01:00
|
|
|
let mut results = vec![];
|
2022-11-09 18:11:02 +01:00
|
|
|
self.scan("h:*", |key| results.push(key[2..].to_string())).await?;
|
|
|
|
Ok(results)
|
2022-11-07 22:07:07 +01:00
|
|
|
}
|
|
|
|
|
2022-11-10 02:47:09 +01:00
|
|
|
pub async fn get_languages(&mut self) -> Result<Vec<(String, usize)>, RedisError> {
|
|
|
|
let values: Vec<String> = redis::Cmd::hgetall("r")
|
|
|
|
.query_async(&mut self.man)
|
|
|
|
.await?;
|
|
|
|
let mut results = Vec::with_capacity(values.len() / 2);
|
|
|
|
let mut key = None;
|
|
|
|
for value in values {
|
|
|
|
if let Some(key) = key.take() {
|
|
|
|
if let Ok(count) = str::parse(&value) {
|
|
|
|
results.push((key, count));
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
key = Some(value);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Ok(results)
|
|
|
|
}
|
|
|
|
|
2022-11-07 22:07:07 +01:00
|
|
|
async fn scan<F: FnMut(String)>(
|
|
|
|
&mut self,
|
|
|
|
pattern: &str,
|
|
|
|
mut f: F,
|
2022-11-09 18:11:02 +01:00
|
|
|
) -> Result<(), RedisError> {
|
2022-11-07 22:07:07 +01:00
|
|
|
let mut cmd = redis::cmd("SCAN");
|
2022-11-10 02:47:09 +01:00
|
|
|
cmd.cursor_arg(0)
|
|
|
|
.arg("MATCH").arg(pattern)
|
|
|
|
.arg("COUNT").arg(100);
|
2022-11-07 22:07:07 +01:00
|
|
|
let mut iter = cmd.iter_async::<String>(&mut self.man)
|
2022-11-09 18:11:02 +01:00
|
|
|
.await?;
|
2022-11-07 22:07:07 +01:00
|
|
|
while let Some(key) = iter.next_item().await {
|
|
|
|
f(key);
|
|
|
|
}
|
2022-11-09 18:11:02 +01:00
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn get_trend_tags(
|
|
|
|
&mut self,
|
|
|
|
language: &Option<String>,
|
2022-11-09 19:03:03 +01:00
|
|
|
names_in: impl Iterator<Item = String>,
|
2022-11-09 18:11:02 +01:00
|
|
|
) -> Result<Vec<TrendTag>, RedisError> {
|
|
|
|
let mut cmd = redis::pipe();
|
|
|
|
let prefix = match language {
|
|
|
|
Some(language) => format!("l:{}:", language),
|
|
|
|
None => "g:".to_string(),
|
|
|
|
};
|
2022-11-09 19:03:03 +01:00
|
|
|
let names: Vec<String> = names_in.map(|name| {
|
2022-11-09 18:11:02 +01:00
|
|
|
cmd.hgetall(format!("{}{}", prefix, name));
|
2022-11-09 19:03:03 +01:00
|
|
|
name
|
|
|
|
}).collect();
|
2022-11-09 18:11:02 +01:00
|
|
|
let hashes = cmd.query_async::<_, Vec<Vec<String>>>(&mut self.man).await?;
|
|
|
|
let results = names.into_iter()
|
|
|
|
.zip(hashes)
|
2022-11-09 19:03:03 +01:00
|
|
|
.map(|(name, hash_values)| TrendTag::from_hash(name, hash_values))
|
2022-11-09 18:11:02 +01:00
|
|
|
.collect();
|
|
|
|
Ok(results)
|
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn get_trend_pools(
|
|
|
|
&mut self,
|
|
|
|
language: &Option<String>,
|
|
|
|
periods: &[u64],
|
|
|
|
) -> Result<Vec<(u64, Vec<String>)>, RedisError> {
|
|
|
|
let mut cmd = redis::pipe();
|
|
|
|
for period in periods {
|
|
|
|
cmd.smembers(pool_key(language, *period));
|
|
|
|
}
|
|
|
|
let sets: Vec<Vec<String>> = cmd.query_async(&mut self.man)
|
|
|
|
.await?;
|
|
|
|
let results = periods.iter().cloned()
|
|
|
|
.zip(sets.into_iter())
|
|
|
|
.collect();
|
|
|
|
Ok(results)
|
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn update_trend_pools(
|
|
|
|
&mut self,
|
|
|
|
remove: impl Iterator<Item = (&Option<String>, u64, Vec<&str>)>,
|
|
|
|
add: impl Iterator<Item = (&Option<String>, u64, Vec<&str>)>,
|
|
|
|
) -> Result<(), RedisError> {
|
|
|
|
let mut cmd = redis::pipe();
|
|
|
|
for (language, period, tags) in remove {
|
|
|
|
if ! tags.is_empty() {
|
2022-11-09 19:03:03 +01:00
|
|
|
let pool_key = pool_key(language, period);
|
|
|
|
cmd.srem(&pool_key, tags)
|
|
|
|
.ignore()
|
|
|
|
.expire(pool_key, period as usize * 3600)
|
2022-11-09 18:11:02 +01:00
|
|
|
.ignore();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
for (language, period, tags) in add {
|
|
|
|
if ! tags.is_empty() {
|
2022-11-09 19:03:03 +01:00
|
|
|
let pool_key = pool_key(language, period);
|
|
|
|
cmd.sadd(&pool_key, tags)
|
|
|
|
.ignore()
|
|
|
|
.expire(pool_key, period as usize * 3600)
|
2022-11-09 18:11:02 +01:00
|
|
|
.ignore();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
cmd.query_async(&mut self.man)
|
|
|
|
.await?;
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn pool_key(language: &Option<String>, period: u64) -> String {
|
|
|
|
match language {
|
|
|
|
Some(language) =>
|
|
|
|
format!("q:{}:{}", period, language),
|
|
|
|
None =>
|
|
|
|
format!("q:{}", period),
|
2022-11-07 22:07:07 +01:00
|
|
|
}
|
|
|
|
}
|