1
0
mirror of https://gitlab.com/xmpp-rs/xmpp-rs.git synced 2024-05-31 13:29:20 +02:00

WHY DOES THIS DEADLOCK?!

This commit is contained in:
xmppftw 2023-06-17 14:30:08 +02:00
parent 92bad0038d
commit 8566adb376
2 changed files with 122 additions and 7 deletions

View File

@ -17,11 +17,46 @@ pub struct JoinRoomAction {
pub history: History,
}
#[derive(Clone, Debug, Default)]
pub struct JoinRoomResult {
pub nicks: Vec<String>,
pub topic: String,
}
#[async_trait]
impl Act<()> for JoinRoomAction {
async fn act(self, agent: &mut Agent) -> Result<(), Error> {
impl Act<JoinRoomResult> for JoinRoomAction {
async fn act(self, agent: &mut Agent) -> Result<JoinRoomResult, Error> {
agent.client.send_stanza(self.clone().into()).await?;
Ok(())
let mut recv_presence = agent.hook_muc_presence(self.room.clone());
let mut recv_topic = agent.hook_muc_topic(self.room.clone());
let mut res = JoinRoomResult::default();
info!("Waiting for room response");
// TODO: Why deadlock?!
//let res = tokio::spawn(async move {
//loop {
tokio::select! {
hook_res = recv_presence.recv() => {
if let Some(nick) = hook_res {
res.nicks.push(nick);
}
// break
},
hook_res = recv_topic.recv() => {
if let Some((_opt_nick, subject, _lang)) = hook_res {
res.topic = subject.0;
}
//break
}
}
//}
//return res;
//}).await.unwrap();
Ok(res)
}
}

View File

@ -10,6 +10,7 @@ use futures::stream::StreamExt;
use reqwest::{
header::HeaderMap as ReqwestHeaderMap, Body as ReqwestBody, Client as ReqwestClient,
};
use std::collections::HashMap;
use std::convert::TryFrom;
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
@ -23,7 +24,7 @@ use xmpp_parsers::{
hashes::Algo,
http_upload::{Header as HttpUploadHeader, SlotRequest, SlotResult},
iq::{Iq, IqType},
message::{Body, Message, MessageType},
message::{Body, Message, MessageType, Subject},
muc::{
user::{MucUser, Status},
Muc,
@ -210,6 +211,7 @@ impl ClientBuilder<'_> {
disco,
node,
uploads: Vec::new(),
hooks: Hooks::default(),
}
}
}
@ -221,6 +223,7 @@ pub struct Agent {
disco: DiscoInfoResult,
node: String,
uploads: Vec<(String, Jid, PathBuf)>,
hooks: Hooks,
}
impl Agent {
@ -374,7 +377,7 @@ impl Agent {
let mut events = vec![];
let from = message.from.clone().unwrap();
let langs: Vec<&str> = self.lang.iter().map(String::as_str).collect();
match message.get_best_body(langs) {
match message.get_best_body(langs.clone()) {
Some((_lang, body)) => match message.type_ {
MessageType::Groupchat => {
let event = match from.clone() {
@ -425,7 +428,31 @@ impl Agent {
}
_ => (),
},
None => (),
None => {
// Check if we have a MUC topic.
// TODO: handle other subject cases
match message.get_best_subject(langs) {
Some((lang, subject)) => {
match message.type_ {
MessageType::Groupchat => {
// from.into() here drops the resource part and allows to check for room-hook
if let Some(hook_sender) =
self.hooks.muc_topic.get(&from.clone().into())
{
// TODO: Error
hook_sender
.send((from.clone().resource(), subject.clone(), lang))
.await
.unwrap();
//hook_sender.send(message.clone()).await.unwrap();
}
}
_ => {}
}
}
_ => {}
}
}
}
for child in message.payloads {
if child.is("event", ns::PUBSUB_EVENT) {
@ -439,15 +466,28 @@ impl Agent {
async fn handle_presence(&mut self, presence: Presence) -> Vec<Event> {
let mut events = vec![];
// A MUC presence comes from the participant's FullJid, however we are
// interested in the room where this happened stored as `from` here,
// because we only handle the "self-presence" of when a room was joined.
let from: BareJid = match presence.from.clone().unwrap() {
Jid::Full(FullJid { node, domain, .. }) => BareJid { node, domain },
Jid::Bare(bare) => bare,
};
for payload in presence.payloads.into_iter() {
for payload in presence.clone().payloads.into_iter() {
let muc_user = match MucUser::try_from(payload) {
Ok(muc_user) => muc_user,
_ => continue,
};
if let Some(hook_sender) = self.hooks.muc_presence.get(&from) {
// Check that there is an associated nick with this presence
if let Some(nick) = presence.from.clone().unwrap().resource() {
// TODO: Error
hook_sender.send(nick).await.unwrap();
break;
}
}
for status in muc_user.status.into_iter() {
if status == Status::SelfPresence {
events.push(Event::RoomJoined(from.clone()));
@ -529,6 +569,35 @@ impl Agent {
.push((String::from("upload1"), to, path.to_path_buf()));
self.client.send_stanza(request.into()).await.unwrap();
}
pub fn hook_muc_topic(&mut self, room: BareJid) -> tokio::sync::mpsc::Receiver<MucTopicHook> {
let (tx, rx) = tokio::sync::mpsc::channel(100);
// TODO: what about multiple hooks on same room?
self.hooks.muc_topic.insert(room, tx);
rx
}
pub fn remove_hook_muc_topic(&mut self, room: BareJid) {
self.hooks.muc_topic.remove(&room);
}
pub fn hook_muc_presence(
&mut self,
room: BareJid,
) -> tokio::sync::mpsc::Receiver<MucPresenceHook> {
let (tx, rx) = tokio::sync::mpsc::channel(100);
// TODO: what about multiple hooks on same room?
self.hooks.muc_presence.insert(room, tx);
rx
}
pub fn remove_hook_muc_presence(&mut self, room: BareJid) {
self.hooks.muc_presence.remove(&room);
}
}
async fn handle_upload_result(
@ -578,6 +647,17 @@ async fn handle_upload_result(
return vec![];
}
// TODO: Lang type
pub type MucTopicHook = (Option<RoomNick>, Subject, String);
pub type MucPresenceHook = RoomNick;
#[derive(Clone, Debug, Default)]
pub struct Hooks {
muc_presence: HashMap<BareJid, tokio::sync::mpsc::Sender<MucPresenceHook>>,
muc_topic: HashMap<BareJid, tokio::sync::mpsc::Sender<MucTopicHook>>,
}
#[cfg(test)]
mod tests {
use super::{Agent, BareJid, ClientBuilder, ClientFeature, ClientType, Event};