2022-11-10 15:22:45 +01:00
|
|
|
use std::pin::Pin;
|
|
|
|
|
|
|
|
use bb8::ManageConnection;
|
2022-11-12 01:02:44 +01:00
|
|
|
use futures::{Future, Stream, stream::unfold, StreamExt};
|
2022-11-14 03:14:12 +01:00
|
|
|
use redis::{Value, RedisError, aio::ConnectionLike, FromRedisValue};
|
2022-12-02 22:02:37 +01:00
|
|
|
use crate::{
|
|
|
|
feed::{EncodablePost, Post},
|
|
|
|
trend_tag::TrendTag,
|
|
|
|
PERIOD_COMPARE_WINDOW,
|
|
|
|
current_hour,
|
|
|
|
PERIODS,
|
|
|
|
};
|
2022-11-07 22:07:07 +01:00
|
|
|
|
|
|
|
const POST_EXPIRE: usize = 86400;
|
2022-11-14 03:14:12 +01:00
|
|
|
const TAG_EXPIRE: u64 = 30 * 24;
|
2022-12-26 02:49:10 +01:00
|
|
|
const HOST_EXPIRE: usize = 30 * 86400;
|
2022-11-07 22:07:07 +01:00
|
|
|
|
2022-11-16 03:12:00 +01:00
|
|
|
pub const TREND_POOL_SIZE: usize = 20;
|
2022-11-29 01:51:22 +01:00
|
|
|
pub const IMAGES_PER_TAG: usize = 8;
|
|
|
|
|
2022-11-09 18:11:02 +01:00
|
|
|
pub type Error = RedisError;
|
|
|
|
|
2022-11-10 15:22:45 +01:00
|
|
|
/// wrapper so we can impl ManageConnection
|
|
|
|
struct RedisPool {
|
|
|
|
redis_url: String,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl ManageConnection for RedisPool {
|
|
|
|
type Connection = redis::aio::ConnectionManager;
|
|
|
|
type Error = Error;
|
|
|
|
|
|
|
|
fn connect<'life0, 'async_trait>(
|
|
|
|
&'life0 self
|
|
|
|
) -> Pin<Box<dyn Future<Output = Result<Self::Connection, Self::Error>> + Send + 'async_trait>>
|
|
|
|
where
|
|
|
|
'life0: 'async_trait,
|
|
|
|
Self: 'async_trait
|
|
|
|
{
|
|
|
|
Box::pin(async {
|
|
|
|
let client = redis::Client::open(&self.redis_url[..])
|
|
|
|
.expect("redis::Client");
|
|
|
|
let manager = redis::aio::ConnectionManager::new(client)
|
|
|
|
.await
|
|
|
|
.expect("redis::Client");
|
|
|
|
Ok(manager)
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
fn is_valid<'life0, 'life1, 'async_trait>(
|
|
|
|
&'life0 self,
|
|
|
|
conn: &'life1 mut Self::Connection
|
|
|
|
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send + 'async_trait>>
|
|
|
|
where
|
|
|
|
'life0: 'async_trait,
|
|
|
|
'life1: 'async_trait,
|
|
|
|
Self: 'async_trait
|
|
|
|
{
|
|
|
|
Box::pin(async {
|
|
|
|
redis::cmd("PING")
|
|
|
|
.query_async::<_, ()>(conn)
|
|
|
|
.await
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
fn has_broken(&self, _conn: &mut Self::Connection) -> bool {
|
|
|
|
false
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-11-07 22:07:07 +01:00
|
|
|
#[derive(Clone)]
|
|
|
|
pub struct Store {
|
2022-11-10 15:22:45 +01:00
|
|
|
pool: bb8::Pool<RedisPool>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl ConnectionLike for Store {
|
|
|
|
fn req_packed_command<'a>(
|
|
|
|
&'a mut self,
|
|
|
|
cmd: &'a redis::Cmd
|
|
|
|
) -> redis::RedisFuture<'a, Value> {
|
|
|
|
Box::pin(async move {
|
|
|
|
let mut conn = self.pool.get().await.unwrap();
|
|
|
|
conn.req_packed_command(cmd).await
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
fn req_packed_commands<'a>(
|
|
|
|
&'a mut self,
|
|
|
|
cmd: &'a redis::Pipeline,
|
|
|
|
offset: usize,
|
|
|
|
count: usize
|
|
|
|
) -> redis::RedisFuture<'a, Vec<Value>> {
|
|
|
|
Box::pin(async move {
|
|
|
|
let mut conn = self.pool.get().await.unwrap();
|
|
|
|
conn.req_packed_commands(cmd, offset, count).await
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
fn get_db(&self) -> i64 {
|
|
|
|
// wrong ;-)
|
|
|
|
0
|
|
|
|
}
|
2022-11-07 22:07:07 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
impl Store {
|
2022-11-12 02:47:21 +01:00
|
|
|
pub async fn new(pool_max_size: u32, redis_url: String) -> Self {
|
2022-11-07 22:07:07 +01:00
|
|
|
crate::systemd::status("Starting redis client");
|
2022-11-10 15:22:45 +01:00
|
|
|
let pool = bb8::Pool::builder()
|
2022-11-12 02:47:21 +01:00
|
|
|
.max_size(pool_max_size)
|
2022-11-10 15:22:45 +01:00
|
|
|
.build(RedisPool { redis_url })
|
|
|
|
.await
|
|
|
|
.unwrap();
|
|
|
|
Self { pool }
|
2022-11-07 22:07:07 +01:00
|
|
|
}
|
|
|
|
|
2022-12-02 22:02:37 +01:00
|
|
|
pub async fn save_post(&mut self, mut post: EncodablePost) -> Result<bool, RedisError> {
|
2022-11-07 22:07:07 +01:00
|
|
|
let post_key = format!("p:{}", post.uri);
|
|
|
|
let check = redis::pipe()
|
|
|
|
.getset(&post_key, "1")
|
|
|
|
.expire(post_key, POST_EXPIRE)
|
|
|
|
.ignore()
|
2022-11-10 15:22:45 +01:00
|
|
|
.query_async::<_, Value>(self)
|
2022-11-09 18:11:02 +01:00
|
|
|
.await?;
|
|
|
|
if check != Value::Bulk(vec![Value::Nil]) {
|
2022-11-07 22:07:07 +01:00
|
|
|
// post is not new
|
2022-11-09 18:11:02 +01:00
|
|
|
return Ok(false);
|
2022-11-07 22:07:07 +01:00
|
|
|
}
|
2022-11-15 02:41:16 +01:00
|
|
|
|
2022-12-01 01:39:38 +01:00
|
|
|
tracing::info!("New post ({}{} tags): {}",
|
2022-11-07 22:07:07 +01:00
|
|
|
if post.account.bot { "bot, " } else { "" },
|
|
|
|
post.tags.len(), post.uri);
|
|
|
|
|
2022-12-02 22:02:37 +01:00
|
|
|
match post.encode() {
|
2023-10-01 23:19:00 +02:00
|
|
|
Ok(encoded_post) => {
|
|
|
|
let mut data = Vec::<u8>::with_capacity(
|
|
|
|
post.event_type.len() + 1 + encoded_post.len()
|
|
|
|
);
|
|
|
|
data.extend(post.event_type.as_bytes());
|
|
|
|
data.extend(&[0]);
|
|
|
|
data.extend(&encoded_post);
|
|
|
|
redis::Cmd::publish("firehose", data)
|
2022-12-02 22:02:37 +01:00
|
|
|
.query_async::<_, Value>(self)
|
|
|
|
.await?;
|
|
|
|
}
|
|
|
|
Err(e) =>
|
2022-12-01 01:39:38 +01:00
|
|
|
tracing::error!("cannot encode post: {:?}", e),
|
2022-12-02 22:02:37 +01:00
|
|
|
}
|
2022-11-07 22:07:07 +01:00
|
|
|
|
|
|
|
// post was new
|
2022-11-09 18:11:02 +01:00
|
|
|
Ok(true)
|
2022-11-07 22:07:07 +01:00
|
|
|
}
|
|
|
|
|
2023-02-24 19:39:30 +01:00
|
|
|
pub async fn save_post_tags(&mut self, post: &Post, tagged_unsafe: bool) {
|
2022-11-07 22:07:07 +01:00
|
|
|
if post.account.bot || post.tags.is_empty() {
|
|
|
|
// irrelevant
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
|
|
|
let host = match post.uri_host() {
|
|
|
|
Some(host) => host,
|
|
|
|
None => {
|
2022-12-01 01:39:38 +01:00
|
|
|
tracing::warn!("no uri_host");
|
2022-11-07 22:07:07 +01:00
|
|
|
return;
|
|
|
|
},
|
|
|
|
};
|
|
|
|
|
|
|
|
let timestamp = match post.timestamp() {
|
|
|
|
Some(timestamp) => timestamp,
|
|
|
|
None => {
|
2022-12-01 01:39:38 +01:00
|
|
|
tracing::warn!("no timestamp");
|
2022-11-07 22:07:07 +01:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
};
|
2022-11-14 03:14:12 +01:00
|
|
|
let hour = timestamp.naive_utc().timestamp() as u64 / 3600;
|
|
|
|
let until = current_hour();
|
2022-11-07 22:07:07 +01:00
|
|
|
if hour > until {
|
2022-12-01 01:39:38 +01:00
|
|
|
tracing::warn!("future post from {}", timestamp);
|
2022-11-07 22:07:07 +01:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
let from = until - TAG_EXPIRE;
|
|
|
|
if hour < from {
|
2022-12-01 01:39:38 +01:00
|
|
|
tracing::warn!("ancient post from {}", timestamp);
|
2022-11-07 22:07:07 +01:00
|
|
|
return;
|
|
|
|
}
|
|
|
|
|
2022-11-14 01:13:49 +01:00
|
|
|
let user_id = post.user_id();
|
2022-11-07 22:07:07 +01:00
|
|
|
// clip "en-us" to "en"
|
2022-11-08 00:43:46 +01:00
|
|
|
let language = post.lang();
|
2022-11-07 22:07:07 +01:00
|
|
|
|
|
|
|
let mut cmd = redis::pipe();
|
2022-11-25 02:43:28 +01:00
|
|
|
let store_tags = |cmd: &mut redis::Pipeline, spellings, tag_key, user_key| {
|
2022-11-07 22:07:07 +01:00
|
|
|
// by spelling
|
2022-11-08 00:43:46 +01:00
|
|
|
for spelling in spellings {
|
|
|
|
cmd.hincr(
|
|
|
|
&tag_key,
|
|
|
|
format!("s:{}", spelling),
|
|
|
|
1
|
|
|
|
).ignore();
|
|
|
|
}
|
2022-11-07 22:07:07 +01:00
|
|
|
// by instance
|
|
|
|
cmd.hincr(
|
|
|
|
tag_key,
|
|
|
|
format!("h:{}", host),
|
|
|
|
1
|
|
|
|
).ignore();
|
2022-11-14 01:13:49 +01:00
|
|
|
if let Some(user_id) = &user_id {
|
|
|
|
// users by tag/hour
|
|
|
|
cmd.sadd(&user_key, &user_id).ignore()
|
2023-01-16 04:16:40 +01:00
|
|
|
.expire(&user_key, TAG_EXPIRE as usize * 3600)
|
2022-11-25 02:43:28 +01:00
|
|
|
.ignore();
|
2022-11-14 01:13:49 +01:00
|
|
|
}
|
2022-11-07 22:07:07 +01:00
|
|
|
};
|
2023-06-11 02:57:48 +02:00
|
|
|
let tags = post.tags_set();
|
|
|
|
let images = if !tagged_unsafe && tags.len() < 3 {
|
2022-11-25 02:43:28 +01:00
|
|
|
post.media_attachments.iter()
|
|
|
|
.filter(|a| a.media_type == "image")
|
|
|
|
.filter_map(|a| a.remote_url.as_ref())
|
2022-11-25 03:06:39 +01:00
|
|
|
.filter(|url| !url.contains(char::is_whitespace))
|
2022-11-25 02:43:28 +01:00
|
|
|
.take(2)
|
|
|
|
.collect::<Vec<&String>>()
|
|
|
|
} else {
|
2023-06-11 02:57:48 +02:00
|
|
|
tracing::warn!("unsafe: {:?}/{:?}", post.sensitive, tags.keys());
|
2022-11-25 02:43:28 +01:00
|
|
|
// ignore disturbing porn images from sensitive posts
|
|
|
|
vec![]
|
|
|
|
};
|
|
|
|
let mut image_keys = vec![];
|
2023-06-11 02:57:48 +02:00
|
|
|
for (name, spellings) in tags.into_iter() {
|
2022-11-07 22:07:07 +01:00
|
|
|
// global
|
2022-11-25 02:43:28 +01:00
|
|
|
store_tags(&mut cmd,
|
2022-11-14 01:13:49 +01:00
|
|
|
spellings.clone(),
|
|
|
|
format!("g:{}", name),
|
|
|
|
format!("u::{}:{}", hour, name),
|
|
|
|
);
|
2022-11-07 22:07:07 +01:00
|
|
|
// by language
|
|
|
|
if let Some(language) = &language {
|
2022-11-25 02:43:28 +01:00
|
|
|
store_tags(&mut cmd,
|
2022-11-14 01:13:49 +01:00
|
|
|
spellings,
|
|
|
|
format!("l:{}:{}", language, name),
|
|
|
|
format!("u:{}:{}:{}", language, hour, name),
|
|
|
|
);
|
2022-11-07 22:07:07 +01:00
|
|
|
}
|
2022-11-25 02:43:28 +01:00
|
|
|
|
|
|
|
for image in &images {
|
|
|
|
let image_key = format!("i:{}", name);
|
|
|
|
cmd.sadd(&image_key, image)
|
|
|
|
.ignore()
|
2023-01-16 04:16:40 +01:00
|
|
|
.expire(&image_key, TAG_EXPIRE as usize * 3600)
|
2022-11-25 02:43:28 +01:00
|
|
|
.ignore()
|
|
|
|
.scard(&image_key);
|
|
|
|
image_keys.push(image_key);
|
|
|
|
}
|
2022-11-07 22:07:07 +01:00
|
|
|
}
|
|
|
|
|
2022-11-25 02:43:28 +01:00
|
|
|
match cmd.query_async::<_, Vec<usize>>(self).await {
|
|
|
|
Ok(image_key_sizes) => {
|
|
|
|
assert_eq!(image_keys.len(), image_key_sizes.len());
|
|
|
|
let mut cmd = redis::pipe();
|
|
|
|
for (image_key, size) in image_keys.into_iter().zip(image_key_sizes.into_iter()) {
|
|
|
|
let excess = size.saturating_sub(IMAGES_PER_TAG);
|
|
|
|
if excess > 0 {
|
|
|
|
cmd.spop(image_key).arg(excess)
|
|
|
|
.ignore();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
let _ = cmd.query_async::<_, ()>(self).await;
|
|
|
|
}
|
2022-11-07 22:07:07 +01:00
|
|
|
Err(e) => {
|
2022-12-01 01:39:38 +01:00
|
|
|
tracing::error!("redis error: {:?}", e);
|
2022-11-07 22:07:07 +01:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-11-09 18:11:02 +01:00
|
|
|
pub async fn save_host(&mut self, host: &str) -> Result<(), RedisError> {
|
2022-12-26 02:49:10 +01:00
|
|
|
let key = format!("h:{}", host);
|
|
|
|
redis::pipe()
|
|
|
|
.set(&key, "1")
|
|
|
|
.ignore()
|
|
|
|
.expire(&key, HOST_EXPIRE)
|
|
|
|
.ignore()
|
2022-11-10 15:22:45 +01:00
|
|
|
.query_async::<_, ()>(self)
|
2022-11-07 22:07:07 +01:00
|
|
|
.await
|
|
|
|
}
|
|
|
|
|
2022-12-26 02:49:43 +01:00
|
|
|
pub async fn remove_host(&mut self, host: &str) -> Result<(), RedisError> {
|
|
|
|
redis::Cmd::del(format!("h:{}", host))
|
|
|
|
.query_async::<_, ()>(self)
|
|
|
|
.await
|
|
|
|
}
|
|
|
|
|
2022-11-12 01:02:44 +01:00
|
|
|
pub async fn get_hosts(&mut self) -> Result<impl Stream<Item = String> + '_, RedisError> {
|
|
|
|
self.scan_prefix("h:")
|
|
|
|
.await
|
2022-11-07 22:07:07 +01:00
|
|
|
}
|
|
|
|
|
2022-11-11 17:11:13 +01:00
|
|
|
pub async fn get_languages(&mut self) -> Result<Vec<String>, RedisError> {
|
|
|
|
redis::Cmd::hkeys("r")
|
2022-11-10 15:22:45 +01:00
|
|
|
.query_async(self)
|
2022-11-11 17:11:13 +01:00
|
|
|
.await
|
2022-11-10 02:47:09 +01:00
|
|
|
}
|
|
|
|
|
2022-11-29 01:51:22 +01:00
|
|
|
pub async fn get_tag_images(&mut self, tag: &str) -> Result<Vec<String>, RedisError> {
|
|
|
|
redis::Cmd::smembers(format!("i:{}", tag))
|
|
|
|
.query_async(self)
|
|
|
|
.await
|
|
|
|
}
|
|
|
|
|
2022-11-12 01:02:44 +01:00
|
|
|
pub async fn get_tags_global(&mut self) -> Result<impl Stream<Item = String> + '_, RedisError> {
|
|
|
|
let global = self.scan_prefix("g:")
|
|
|
|
.await?
|
|
|
|
.map(|tag| tag);
|
|
|
|
Ok(global)
|
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn get_tags_by_language(&mut self) -> Result<impl Stream<Item = (Option<String>, String)> + '_, RedisError> {
|
|
|
|
let by_language = self.scan("l:")
|
|
|
|
.await?
|
|
|
|
.filter_map(|key| async move {
|
|
|
|
let s = &key[2..];
|
|
|
|
if let Some(i) = s.find(':') {
|
|
|
|
let language = s[..i].to_string();
|
|
|
|
let tag = s[i + 1..].to_string();
|
|
|
|
Some((Some(language), tag))
|
|
|
|
} else {
|
|
|
|
None
|
|
|
|
}
|
|
|
|
});
|
|
|
|
Ok(by_language)
|
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn scan_prefix<'a>(&'a mut self, prefix: &'a str) -> Result<impl Stream<Item = String> + '_, RedisError> {
|
|
|
|
let keys = self.scan(&format!("{}*", prefix))
|
|
|
|
.await?
|
|
|
|
.map(|key| key[prefix.len()..].to_string());
|
|
|
|
Ok(keys)
|
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn scan(&mut self, pattern: &str) -> Result<impl Stream<Item = String> + '_, RedisError> {
|
2022-11-07 22:07:07 +01:00
|
|
|
let mut cmd = redis::cmd("SCAN");
|
2022-11-10 02:47:09 +01:00
|
|
|
cmd.cursor_arg(0)
|
2022-11-18 17:44:01 +01:00
|
|
|
.arg("MATCH").arg(pattern)
|
|
|
|
.arg("COUNT").arg(10000);
|
2022-11-12 01:02:44 +01:00
|
|
|
let iter = cmd.iter_async::<String>(self)
|
2022-11-09 18:11:02 +01:00
|
|
|
.await?;
|
2022-11-12 01:02:44 +01:00
|
|
|
|
|
|
|
let stream = unfold(iter, |mut iter| async move {
|
|
|
|
iter.next_item().await
|
|
|
|
.map(|tag| (tag, iter))
|
|
|
|
});
|
|
|
|
Ok(stream)
|
2022-11-09 18:11:02 +01:00
|
|
|
}
|
|
|
|
|
2022-11-14 03:14:12 +01:00
|
|
|
pub async fn clean_trend_tag(&self, language: &Option<String>, tag: &TrendTag) -> Result<(), RedisError> {
|
|
|
|
if ! tag.other.iter().any(|(name, _)| &name[..2] == "t:") {
|
|
|
|
return Ok(());
|
|
|
|
}
|
|
|
|
|
|
|
|
let mut cmd = redis::pipe();
|
|
|
|
for (name, _) in &tag.other {
|
2022-11-14 20:55:48 +01:00
|
|
|
if &name[..2] == "t:" {
|
|
|
|
cmd.hdel(tag_key(language, &tag.name), name)
|
|
|
|
.ignore();
|
|
|
|
}
|
2022-11-14 03:14:12 +01:00
|
|
|
}
|
|
|
|
cmd.query_async(&mut self.clone()).await
|
|
|
|
}
|
|
|
|
|
2022-11-09 18:11:02 +01:00
|
|
|
pub async fn get_trend_tags(
|
2022-11-14 03:14:12 +01:00
|
|
|
&self,
|
2022-11-09 18:11:02 +01:00
|
|
|
language: &Option<String>,
|
2022-11-14 03:14:12 +01:00
|
|
|
names: impl Iterator<Item = String>,
|
2022-11-09 18:11:02 +01:00
|
|
|
) -> Result<Vec<TrendTag>, RedisError> {
|
2022-11-14 03:14:12 +01:00
|
|
|
let until = current_hour();
|
|
|
|
let from = until - PERIODS.last().unwrap() * (1 + PERIOD_COMPARE_WINDOW);
|
|
|
|
|
2022-11-09 18:11:02 +01:00
|
|
|
let mut cmd = redis::pipe();
|
2022-11-14 03:14:12 +01:00
|
|
|
let names = names.map(|name| {
|
|
|
|
cmd.hgetall(tag_key(language, &name));
|
|
|
|
for hour in from..=until {
|
|
|
|
cmd.scard(format!("u:{}:{}:{}", language.as_ref().map_or("", |l| &l), hour, name));
|
|
|
|
}
|
2022-11-09 19:03:03 +01:00
|
|
|
name
|
2022-11-14 03:14:12 +01:00
|
|
|
}).collect::<Vec<String>>();
|
|
|
|
let mut values = cmd.query_async::<_, Vec<Value>>(&mut self.clone()).await?
|
|
|
|
.into_iter();
|
|
|
|
|
|
|
|
let mut results = Vec::with_capacity(names.len());
|
|
|
|
for name in names.into_iter() {
|
|
|
|
let hash_values = if let Some(Value::Bulk(hash_values)) = values.next() {
|
|
|
|
hash_values
|
|
|
|
} else {
|
|
|
|
panic!("hash_values");
|
|
|
|
};
|
|
|
|
let hash_values = hash_values.iter()
|
|
|
|
.map(|value| String::from_redis_value(value).expect("hash_values value"))
|
|
|
|
.collect();
|
|
|
|
let hour_users = (from..=until).map(|hour| {
|
|
|
|
let users = usize::from_redis_value(&values.next().unwrap()).expect("hour_users");
|
|
|
|
(hour, users)
|
|
|
|
}).collect();
|
|
|
|
results.push(TrendTag::from_hash(name.to_string(), hash_values, hour_users));
|
|
|
|
}
|
2022-11-09 18:11:02 +01:00
|
|
|
Ok(results)
|
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn get_trend_pools(
|
|
|
|
&mut self,
|
|
|
|
language: &Option<String>,
|
|
|
|
periods: &[u64],
|
|
|
|
) -> Result<Vec<(u64, Vec<String>)>, RedisError> {
|
|
|
|
let mut cmd = redis::pipe();
|
|
|
|
for period in periods {
|
|
|
|
cmd.smembers(pool_key(language, *period));
|
|
|
|
}
|
2022-11-10 15:22:45 +01:00
|
|
|
let sets: Vec<Vec<String>> = cmd.query_async(self)
|
2022-11-09 18:11:02 +01:00
|
|
|
.await?;
|
|
|
|
let results = periods.iter().cloned()
|
|
|
|
.zip(sets.into_iter())
|
|
|
|
.collect();
|
|
|
|
Ok(results)
|
|
|
|
}
|
|
|
|
|
|
|
|
pub async fn update_trend_pools(
|
|
|
|
&mut self,
|
2022-11-11 17:11:13 +01:00
|
|
|
language: &Option<String>,
|
|
|
|
remove: impl Iterator<Item = (u64, Vec<&str>)>,
|
|
|
|
add: impl Iterator<Item = (u64, Vec<&str>)>,
|
|
|
|
pool_sizes: impl Iterator<Item = (u64, usize)>,
|
2022-11-09 18:11:02 +01:00
|
|
|
) -> Result<(), RedisError> {
|
|
|
|
let mut cmd = redis::pipe();
|
2022-11-11 17:11:13 +01:00
|
|
|
for (period, tags) in remove {
|
2022-11-09 18:11:02 +01:00
|
|
|
if ! tags.is_empty() {
|
2022-11-09 19:03:03 +01:00
|
|
|
let pool_key = pool_key(language, period);
|
|
|
|
cmd.srem(&pool_key, tags)
|
|
|
|
.ignore()
|
|
|
|
.expire(pool_key, period as usize * 3600)
|
2022-11-09 18:11:02 +01:00
|
|
|
.ignore();
|
|
|
|
}
|
|
|
|
}
|
2022-11-11 17:11:13 +01:00
|
|
|
for (period, tags) in add {
|
2022-11-09 18:11:02 +01:00
|
|
|
if ! tags.is_empty() {
|
2022-11-09 19:03:03 +01:00
|
|
|
let pool_key = pool_key(language, period);
|
|
|
|
cmd.sadd(&pool_key, tags)
|
|
|
|
.ignore()
|
|
|
|
.expire(pool_key, period as usize * 3600)
|
2022-11-09 18:11:02 +01:00
|
|
|
.ignore();
|
|
|
|
}
|
|
|
|
}
|
2022-11-11 17:11:13 +01:00
|
|
|
if let Some(language) = language {
|
|
|
|
let max_pool_size = pool_sizes.map(|(_, pool_size)| pool_size)
|
|
|
|
.max()
|
|
|
|
.unwrap_or(0);
|
|
|
|
if max_pool_size > 0 {
|
|
|
|
cmd.hset("r", language, max_pool_size)
|
|
|
|
.ignore();
|
|
|
|
} else {
|
|
|
|
cmd.hdel("r", language)
|
|
|
|
.ignore();
|
|
|
|
}
|
|
|
|
}
|
2022-11-10 15:22:45 +01:00
|
|
|
cmd.query_async(self)
|
2022-11-09 18:11:02 +01:00
|
|
|
.await?;
|
|
|
|
Ok(())
|
|
|
|
}
|
2022-11-12 01:02:44 +01:00
|
|
|
|
|
|
|
pub async fn delete_tag(
|
|
|
|
&mut self,
|
|
|
|
language: &Option<String>,
|
|
|
|
tag: &str,
|
|
|
|
) -> Result<(), RedisError> {
|
|
|
|
let key = match language {
|
|
|
|
Some(language) => format!("l:{}:{}", language, tag),
|
|
|
|
None => format!("g:{}", tag),
|
|
|
|
};
|
|
|
|
redis::Cmd::del(key)
|
|
|
|
.query_async(self)
|
|
|
|
.await
|
|
|
|
}
|
2022-11-09 18:11:02 +01:00
|
|
|
}
|
|
|
|
|
2022-11-14 03:14:12 +01:00
|
|
|
fn tag_key(language: &Option<String>, name: &str) -> String {
|
|
|
|
match language {
|
|
|
|
Some(language) => format!("l:{}:{}", language, name),
|
|
|
|
None => format!("g:{}", name),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-11-09 18:11:02 +01:00
|
|
|
fn pool_key(language: &Option<String>, period: u64) -> String {
|
|
|
|
match language {
|
|
|
|
Some(language) =>
|
|
|
|
format!("q:{}:{}", period, language),
|
|
|
|
None =>
|
|
|
|
format!("q:{}", period),
|
2022-11-07 22:07:07 +01:00
|
|
|
}
|
|
|
|
}
|