use std::{collections::{HashMap, HashSet}, time::Duration, ops::Deref}; use chrono::{DateTime, FixedOffset}; use futures::{Stream, StreamExt}; use eventsource_stream::Eventsource; #[derive(Debug, serde::Serialize, serde::Deserialize)] pub struct Account { pub username: String, pub display_name: String, pub url: String, pub bot: bool, pub avatar: String, pub header: String, } impl Account { pub fn host(&self) -> Option { reqwest::Url::parse(&self.url) .ok() .and_then(|url| url.domain() .map(|s| s.to_lowercase()) ) } } #[derive(Debug, serde::Serialize, serde::Deserialize)] pub struct Tag { pub name: String, } #[derive(Debug, serde::Serialize, serde::Deserialize)] pub struct Application { pub name: String, pub website: Option, } #[derive(Debug, serde::Serialize, serde::Deserialize)] pub struct Mention { pub username: Option, pub url: String, pub acct: Option, } impl Mention { pub fn user_host(&self) -> Option { reqwest::Url::parse(&self.url) .ok() .and_then(|url| url.domain() .map(|host| host.to_lowercase()) ) } } #[derive(Debug, serde::Serialize, serde::Deserialize)] pub struct MediaAttachment { #[serde(rename = "type")] pub media_type: String, pub remote_url: Option, } #[derive(Debug, serde::Serialize, serde::Deserialize)] pub struct Post { pub created_at: String, pub uri: String, #[serde(default = "String::new")] pub content: String, pub account: Account, #[serde(default)] pub tags: Vec, pub application: Option, pub sensitive: Option, #[serde(default)] pub mentions: Vec, pub language: Option, #[serde(default)] pub media_attachments: Vec, #[serde(default)] pub reblog: Option>, } impl Post { pub fn uri_host(&self) -> Option { reqwest::Url::parse(&self.uri) .ok() .and_then(|uri| uri.domain() .map(|host| host.to_owned()) ) } pub fn user_id(&self) -> Option { let username = self.account.username.to_lowercase(); let host = self.uri_host()?; Some(format!("{}@{}", username, host)) } pub fn timestamp(&self) -> Option> { DateTime::parse_from_rfc3339(&self.created_at) .ok() } /// clip "en-us" to "en" pub fn lang(&self) -> Option { let language = match &self.language { Some(language) => language, None => return None, }; if language.len() < 2 { None } else if language.len() == 2 { Some(language.to_lowercase()) } else { Some(language[..2].to_lowercase()) } } pub fn tags_set(&self) -> HashMap> { let mut result: HashMap> = HashMap::with_capacity(self.tags.len()); for tag in &self.tags { let name = tag.name.to_lowercase(); if name.contains(char::is_whitespace) { continue; } match result.entry(name) { std::collections::hash_map::Entry::Vacant(entry) => { let mut r = HashSet::new(); r.insert(tag.name.clone()); entry.insert(r); } std::collections::hash_map::Entry::Occupied(mut entry) => { entry.get_mut().insert(tag.name.clone()); } } } result } } #[derive(Debug)] enum EncodedPost { Value(serde_json::Value), Bytes(Vec), Stolen, } /// Wraps a `Post` along with a serializable form that is most close /// to the original incoming data #[derive(Debug)] pub struct EncodablePost { post: Post, encoded: EncodedPost, } impl Deref for EncodablePost { type Target = Post; fn deref(&self) -> &Self::Target { &self.post } } impl EncodablePost { pub fn from_value(value: serde_json::Value) -> Result { let post = serde_json::from_value(value.clone())?; Ok(EncodablePost { post, encoded: EncodedPost::Value(value), }) } pub fn from_bytes(bytes: Vec) -> Result { let post = serde_json::from_slice(&bytes)?; Ok(EncodablePost { post, encoded: EncodedPost::Bytes(bytes), }) } pub fn encode(&mut self) -> Result, serde_json::Error> { use std::mem::replace; let encoded = replace(&mut self.encoded, EncodedPost::Stolen); match encoded { EncodedPost::Value(value) => serde_json::to_vec(&value), EncodedPost::Bytes(bytes) => Ok(bytes), EncodedPost::Stolen => panic!("EncodedPost::Stolen"), } } } #[derive(Debug)] pub struct Feed { pub posts: Vec, } impl Feed { /// Analyze time intervals between posts to estimate when to fetch /// next pub fn mean_post_interval(&self) -> Option { let mut timestamps = self.posts.iter() .filter_map(|post| post.timestamp()) .collect::>(); timestamps.sort(); if timestamps.len() > 2 { Some( ((*timestamps.last().unwrap() - timestamps[0]) / (timestamps.len() as i32 - 1) ).to_std().unwrap() ) } else { None } } pub async fn fetch(client: &reqwest::Client, url: &str) -> Result { let body = client.get(url) .send() .await? .bytes() .await?; let posts = tokio::task::spawn_blocking(move || { let values: Vec = serde_json::from_slice(&body)?; let posts: Vec = values.into_iter() .filter_map(|value| EncodablePost::from_value(value).ok()) .collect(); Ok::<_, serde_json::Error>(posts) }).await.expect("join blocking") .unwrap_or_else(|e| { tracing::error!("{}", e); vec![] }); tracing::trace!("{} {} posts", url, posts.len()); Ok(Feed { posts }) } pub async fn stream(client: &reqwest::Client, url: &str) -> Result, String> { let res = client.get(url) .timeout(Duration::MAX) .send() .await .map_err(|e| format!("{}", e))?; if res.status() != 200 { return Err(format!("HTTP {}", res.status())); } let ct = res.headers().get("content-type") .and_then(|c| c.to_str().ok()); if ct.map_or(true, |ct| ct != "text/event-stream") { return Err(format!("Invalid Content-Type: {:?}", ct)); } let src = res.bytes_stream().eventsource() .filter_map(|result| async { let result = result.ok()?; if result.event == "update" { Some(result) } else { None } }) .filter_map(|event| async move { EncodablePost::from_bytes(event.data.into_bytes()).ok() }); Ok(src) } }