add working streaming

This commit is contained in:
Astro 2022-11-11 21:52:52 +01:00
parent c473f1e03d
commit 663db47973
5 changed files with 99 additions and 13 deletions

1
Cargo.lock generated
View File

@ -272,6 +272,7 @@ dependencies = [
"redis",
"reqwest",
"serde",
"serde_json",
"serde_yaml",
"systemd",
"tokio",

View File

@ -8,6 +8,7 @@ edition = "2021"
futures = "0.3"
tokio = { version = "1", features = ["full"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
serde_yaml = "0.9"
chrono = "0.4"
bb8 = "0.8"

View File

@ -1,6 +1,6 @@
use std::{collections::{HashMap, HashSet}, time::Duration};
use chrono::{DateTime, FixedOffset};
use futures::Stream;
use futures::{Stream, StreamExt};
use eventsource_stream::Eventsource;
#[derive(Debug, serde::Deserialize)]
@ -151,11 +151,40 @@ impl Feed {
Ok(Feed { posts })
}
pub async fn stream(client: &reqwest::Client, url: &str) -> Result<impl Stream, reqwest::Error> {
pub async fn stream(client: &reqwest::Client, url: &str) -> Result<impl Stream<Item = Post>, String> {
let res = client.get(url)
.timeout(Duration::MAX)
.send()
.await?;
let src = res.bytes_stream().eventsource();
.await
.map_err(|e| format!("{}", e))?;
if res.status() != 200 {
return Err(format!("HTTP {}", res.status()));
}
let ct = res.headers().get("content-type")
.and_then(|c| c.to_str().ok());
if ct.map_or(true, |ct| ct != "text/event-stream") {
return Err(format!("Invalid Content-Type: {:?}", ct));
}
let src = res.bytes_stream().eventsource()
.filter_map(|result| async {
let result = result.ok()?;
if result.event == "update" {
Some(result)
} else {
None
}
})
.filter_map(|event| async move {
match serde_json::from_str(&event.data) {
Ok(post) =>
Some(post),
Err(e) => {
log::error!("Error decoding stream data: {}", e);
None
}
}
});
Ok(src)
}
}

View File

@ -63,8 +63,10 @@ async fn main() {
let _ = timeout(duration, async {
let message = message_rx.recv().await.unwrap();
match message {
Message::Fetched { host, mean_interval, new_post_ratio } => {
Message::WorkerDone => {
workers_active -= 1;
}
Message::Fetched { host, mean_interval, new_post_ratio } => {
scheduler.reenqueue(host, new_post_ratio, mean_interval);
}
Message::IntroduceHosts { hosts } => {

View File

@ -1,9 +1,11 @@
use std::collections::HashSet;
use std::future::Future;
use std::time::Duration;
use cave::{
feed::{Feed, Post},
store::Store,
};
use futures::{StreamExt, future};
use crate::trend_setter::UpdateSet;
pub struct RobotsTxt {
@ -47,6 +49,7 @@ impl RobotsTxt {
#[derive(Debug)]
pub enum Message {
WorkerDone,
Fetched {
host: String,
new_post_ratio: Option<f64>,
@ -62,23 +65,38 @@ pub async fn run(
client: reqwest::Client,
host: String,
) {
// Fetch /robots.txt
let robots_txt = RobotsTxt::fetch(&client, &host).await;
let (new_post_ratio, mut mean_interval) =
// let (_, stream) = future::join(
// Fetch posts and open stream
let ((new_post_ratio, mut mean_interval), stream_result) = future::join(
fetch_timeline(
message_tx.clone(), store.clone(),
trend_setter_tx.clone(),
client.clone(), &robots_txt, &host
// ),
// open_stream(&host),
).await;
&client, &robots_txt, &host
),
open_stream(
message_tx.clone(), store.clone(),
trend_setter_tx.clone(),
&client, &robots_txt, host.clone()
),
).await;
// Next worker can start
message_tx.send(Message::WorkerDone).unwrap();
// Process stream
if let Ok(stream) = stream_result {
log::info!("Processing stream for {}", host);
stream.await;
log::warn!("Ended stream for {}", host);
}
// Ready for reenqueue
if let Some(mean_interval) = &mut mean_interval {
if let Some(delay) = robots_txt.delay() {
*mean_interval = (*mean_interval).max(delay);
}
}
message_tx.send(Message::Fetched {
host: host.to_string(),
new_post_ratio,
@ -90,7 +108,7 @@ async fn fetch_timeline(
message_tx: tokio::sync::mpsc::UnboundedSender<Message>,
mut store: Store,
trend_setter_tx: crate::trend_setter::Tx,
client: reqwest::Client,
client: &reqwest::Client,
robots_txt: &RobotsTxt,
host: &str,
) -> (Option<f64>, Option<Duration>) {
@ -172,3 +190,38 @@ async fn process_posts(
(new_post_ratio, introduce_hosts)
}
async fn open_stream(
message_tx: tokio::sync::mpsc::UnboundedSender<Message>,
store: Store,
trend_setter_tx: crate::trend_setter::Tx,
client: &reqwest::Client,
robots_txt: &RobotsTxt,
host: String,
) -> Result<impl Future<Output = ()>, ()> {
let url = format!("https://{}/api/v1/streaming/public", host);
if ! robots_txt.allowed(&url) {
log::warn!("Streaming of {} forbidden by robots.txt", host);
return Err(());
}
let stream = Feed::stream(client, &url).await
.map_err(|e| {
log::error!("Stream error for {}: {}", host, e);
})?;
Ok(stream.for_each(move |post| {
let message_tx = message_tx.clone();
let mut store = store.clone();
let trend_setter_tx = trend_setter_tx.clone();
let host = host.clone();
async move {
log::info!("Stream {} new post: {}", host, &post.uri);
let (_, introduce_hosts) =
process_posts(&mut store, &trend_setter_tx, &host, [post].into_iter()).await;
let _ = message_tx.send(Message::IntroduceHosts {
hosts: introduce_hosts.into_iter().collect(),
});
}
}))
}