Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handle errors from the fair queue and the case when the queue is empty #206

Merged
merged 7 commits into from
Dec 30, 2024
17 changes: 14 additions & 3 deletions src/dealer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::transport::AcceptStopHandle;
use crate::util::PeerIdentity;
use crate::{
CaptureSocket, Endpoint, MultiPeerBackend, Socket, SocketBackend, SocketEvent, SocketOptions,
SocketRecv, SocketSend, SocketType, ZmqMessage, ZmqResult,
SocketRecv, SocketSend, SocketType, ZmqError, ZmqMessage, ZmqResult,
};

use async_trait::async_trait;
Expand Down Expand Up @@ -66,8 +66,19 @@ impl SocketRecv for DealerSocket {
Some((_peer_id, Ok(Message::Message(message)))) => {
return Ok(message);
}
Some((_peer_id, _)) => todo!(),
None => todo!(),
Some((_peer_id, Ok(_))) => {
// Ignore non-message frames
continue;
}
Some((_peer_id, Err(e))) => {
// Handle potential errors from the fair queue
return Err(e.into());
}
None => {
// The fair queue is empty, which shouldn't happen in normal operation
// We could either wait for more messages or return an error
return Err(ZmqError::NoMessage);
}
};
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,8 +357,8 @@ pub async fn proxy<Frontend: SocketSend + SocketRecv, Backend: SocketSend + Sock
}
backend.send(message).await?;
}
Err(_) => {
todo!()
Err(e) => {
return Err(e);
}
}
},
Expand All @@ -370,8 +370,8 @@ pub async fn proxy<Frontend: SocketSend + SocketRecv, Backend: SocketSend + Sock
}
frontend.send(message).await?;
}
Err(_) => {
todo!()
Err(e) => {
return Err(e);
}
}
}
Expand Down
16 changes: 12 additions & 4 deletions src/pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::transport::AcceptStopHandle;
use crate::util::PeerIdentity;
use crate::{
Endpoint, MultiPeerBackend, Socket, SocketEvent, SocketOptions, SocketRecv, SocketType,
ZmqMessage, ZmqResult,
ZmqError, ZmqMessage, ZmqResult,
};

use async_trait::async_trait;
Expand Down Expand Up @@ -60,11 +60,19 @@ impl SocketRecv for PullSocket {
Some((_peer_id, Ok(Message::Message(message)))) => {
return Ok(message);
}
Some((_peer_id, Ok(msg))) => todo!("Unimplemented message: {:?}", msg),
Some((peer_id, Err(_))) => {
Some((_peer_id, Ok(_msg))) => {
// Ignore non-message frames (Command, Greeting) as PULL sockets are designed to only receive actual messages, not internal protocol frames.
continue;
}
Some((peer_id, Err(e))) => {
self.backend.peer_disconnected(&peer_id);
// Handle potential errors from the fair queue
return Err(e.into());
}
None => {
// The fair queue is empty, which shouldn't happen in normal operation
return Err(ZmqError::NoMessage);
}
None => todo!(),
};
}
}
Expand Down
14 changes: 11 additions & 3 deletions src/rep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@ impl SocketRecv for RepSocket {
match self.fair_queue.next().await {
Some((peer_id, Ok(message))) => match message {
Message::Message(mut m) => {
assert!(m.len() > 1);
if m.len() < 2 {
return Err(ZmqError::Other("Invalid message format"));
}
let mut at = 1;
for (index, frame) in m.iter().enumerate() {
if frame.is_empty() {
Expand All @@ -163,9 +165,15 @@ impl SocketRecv for RepSocket {
self.current_request = Some(peer_id);
return Ok(data);
}
_ => todo!(),
Message::Greeting(_) | Message::Command(_) => {
// Ignore non-message frames. REP sockets should only process actual messages.
continue;
}
},
Some((_peer_id, _)) => todo!(),
Some((peer_id, Err(e))) => {
self.backend.peer_disconnected(&peer_id);
return Err(e.into());
}
None => return Err(ZmqError::NoMessage),
};
}
Expand Down
20 changes: 15 additions & 5 deletions src/req.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,24 @@ impl SocketRecv for ReqSocket {
match self.current_request.take() {
Some(peer_id) => {
if let Some(mut peer) = self.backend.peers.get_mut(&peer_id) {
let message = peer.recv_queue.next().await;
match message {
match peer.recv_queue.next().await {
Some(Ok(Message::Message(mut m))) => {
assert!(m.len() > 1);
assert!(m.pop_front().unwrap().is_empty()); // Ensure that we have delimeter as first part
if m.len() < 2 {
return Err(ZmqError::Other(
"Invalid message format: too few frames",
));
}
if !m.pop_front().unwrap().is_empty() {
return Err(ZmqError::Other(
"Invalid message format: missing delimiter",
));
}
Ok(m)
}
Some(Ok(_)) => todo!(),
Some(Ok(_)) => {
// Non-message frames should be ignored by the caller
Err(ZmqError::Other("Received non-message frame"))
}
Some(Err(error)) => Err(error.into()),
None => Err(ZmqError::NoMessage),
}
Expand Down
17 changes: 14 additions & 3 deletions src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,22 @@ impl SocketRecv for RouterSocket {
message.push_front(peer_id.into());
return Ok(message);
}
Some((_peer_id, Ok(msg))) => todo!("Unimplemented message: {:?}", msg),
Some((peer_id, Err(_))) => {
Some((_peer_id, Ok(_msg))) => {
// todo: Log or handle other message types if needed
// We could take an approach of using `tracing` and have that be an optional feature
// tracing::warn!("Received unimplemented message type: {:?}", msg);
continue;
}
Some((peer_id, Err(_e))) => {
self.backend.peer_disconnected(&peer_id);
// We could take an approach of using `tracing` and have that be an optional feature
// tracing::error!("Error receiving message from peer {}: {:?}", peer_id, e);
continue;
}
None => {
// The fair queue is empty, which shouldn't happen in normal operation
return Err(ZmqError::NoMessage);
}
None => todo!(),
};
}
}
Expand Down
17 changes: 13 additions & 4 deletions src/sub.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::backend::Peer;
use crate::codec::{FramedIo, Message, ZmqFramedRead};
use crate::endpoint::Endpoint;
use crate::error::ZmqResult;
use crate::error::{ZmqError, ZmqResult};
use crate::fair_queue::FairQueue;
use crate::fair_queue::QueueInner;
use crate::message::ZmqMessage;
Expand Down Expand Up @@ -191,11 +191,20 @@ impl SocketRecv for SubSocket {
Some((_peer_id, Ok(Message::Message(message)))) => {
return Ok(message);
}
Some((_peer_id, Ok(msg))) => todo!("Unimplemented message: {:?}", msg),
Some((peer_id, Err(_))) => {
Some((_peer_id, Ok(_msg))) => {
// Ignore non-message frames. SUB sockets are designed to only receive actual messages,
// not internal protocol frames like commands or greetings.
continue;
}
Some((peer_id, Err(e))) => {
self.backend.peer_disconnected(&peer_id);
// Handle potential errors from the fair queue
return Err(e.into());
}
None => {
// The fair queue is empty, which shouldn't happen in normal operation
return Err(ZmqError::NoMessage);
}
None => todo!(),
}
}
}
Expand Down
Loading