diff --git a/examples/async_helpers/mod.rs b/examples/async_helpers/mod.rs index d024276..19f8013 100644 --- a/examples/async_helpers/mod.rs +++ b/examples/async_helpers/mod.rs @@ -1,4 +1,5 @@ // Helper functions to be runtime agnostic +use futures::Future; #[cfg(feature = "tokio-runtime")] extern crate tokio; @@ -22,3 +23,23 @@ pub async fn sleep(duration: std::time::Duration) { pub async fn sleep(duration: std::time::Duration) { async_std::task::sleep(duration).await } + +#[allow(unused)] +#[cfg(feature = "tokio-runtime")] +pub fn spawn(future: T) +where + T: Future + Send + 'static, + T::Output: Send + 'static, +{ + tokio::spawn(future); +} + +#[allow(unused)] +#[cfg(feature = "async-std-runtime")] +pub fn spawn(future: T) +where + T: Future + Send + 'static, + T::Output: Send + 'static, +{ + async_std::spawn(future); +} diff --git a/examples/dealer_client.rs b/examples/dealer_client.rs new file mode 100644 index 0000000..edde433 --- /dev/null +++ b/examples/dealer_client.rs @@ -0,0 +1,24 @@ +mod async_helpers; + +use futures::StreamExt; +use std::{error::Error, time::Duration}; +use zeromq::prelude::*; + +#[async_helpers::main] +async fn main() -> Result<(), Box> { + let mut client = zeromq::DealerSocket::new(); + let mut monitor = client.monitor(); + async_helpers::spawn(async move { + while let Some(event) = monitor.next().await { + dbg!(event); + } + }); + + client.connect("tcp://127.0.0.1:5559").await?; + + loop { + let result = client.send("Test message".into()).await; + dbg!(result); + async_helpers::sleep(Duration::from_secs(1)).await; + } +} diff --git a/examples/router_server.rs b/examples/router_server.rs new file mode 100644 index 0000000..4a6a55c --- /dev/null +++ b/examples/router_server.rs @@ -0,0 +1,17 @@ +mod async_helpers; + +use std::{convert::TryFrom, error::Error}; +use zeromq::{prelude::*, util::PeerIdentity, SocketOptions}; + +#[async_helpers::main] +async fn main() -> Result<(), Box> { + let mut options = SocketOptions::default(); + options.peer_identity(PeerIdentity::try_from(Vec::from("SomeCustomId")).unwrap()); + let mut frontend = zeromq::RouterSocket::with_options(options); + frontend.bind("tcp://127.0.0.1:5559").await?; + + loop { + let message = frontend.recv().await?; + dbg!(message); + } +} diff --git a/src/backend.rs b/src/backend.rs index de2c009..1bd20f5 100644 --- a/src/backend.rs +++ b/src/backend.rs @@ -1,8 +1,9 @@ use crate::codec::{FramedIo, Message, ZmqFramedRead, ZmqFramedWrite}; use crate::fair_queue::QueueInner; -use crate::util::PeerIdentity; +use crate::util::{self, PeerIdentity}; use crate::{ - MultiPeerBackend, SocketBackend, SocketEvent, SocketOptions, SocketType, ZmqError, ZmqResult, + async_rt, Endpoint, MultiPeerBackend, SocketBackend, SocketEvent, SocketOptions, SocketType, + ZmqError, ZmqResult, }; use async_trait::async_trait; @@ -25,6 +26,7 @@ pub(crate) struct GenericSocketBackend { socket_type: SocketType, socket_options: SocketOptions, pub(crate) socket_monitor: Mutex>>, + connect_endpoints: DashMap, } impl GenericSocketBackend { @@ -40,10 +42,14 @@ impl GenericSocketBackend { socket_type, socket_options: options, socket_monitor: Mutex::new(None), + connect_endpoints: DashMap::new(), } } - pub(crate) async fn send_round_robin(&self, message: Message) -> ZmqResult { + pub(crate) async fn send_round_robin( + self: &Arc, + message: Message, + ) -> ZmqResult { // In normal scenario this will always be only 1 iteration // There can be special case when peer has disconnected and his id is still in // RR queue This happens because SegQueue don't have an api to delete @@ -73,7 +79,7 @@ impl GenericSocketBackend { Ok(next_peer_id) } Err(e) => { - self.peer_disconnected(&next_peer_id); + self.clone().peer_disconnected(&next_peer_id); Err(e.into()) } }; @@ -101,25 +107,51 @@ impl SocketBackend for GenericSocketBackend { #[async_trait] impl MultiPeerBackend for GenericSocketBackend { - async fn peer_connected(self: Arc, peer_id: &PeerIdentity, io: FramedIo) { + async fn peer_connected( + self: Arc, + peer_id: &PeerIdentity, + io: FramedIo, + endpoint: Option, + ) { let (recv_queue, send_queue) = io.into_parts(); self.peers.insert(peer_id.clone(), Peer { send_queue }); self.round_robin.push(peer_id.clone()); - match &self.fair_queue_inner { - None => {} - Some(inner) => { - inner.lock().insert(peer_id.clone(), recv_queue); - } - }; + + if let Some(queue_inner) = &self.fair_queue_inner { + queue_inner.lock().insert(peer_id.clone(), recv_queue); + } + + if let Some(e) = endpoint { + self.connect_endpoints.insert(peer_id.clone(), e); + } } - fn peer_disconnected(&self, peer_id: &PeerIdentity) { + fn peer_disconnected(self: Arc, peer_id: &PeerIdentity) { + if let Some(monitor) = self.monitor().lock().as_mut() { + let _ = monitor.try_send(SocketEvent::Disconnected(peer_id.clone())); + } + self.peers.remove(peer_id); - match &self.fair_queue_inner { - None => {} - Some(inner) => { - inner.lock().remove(peer_id); - } + if let Some(inner) = &self.fair_queue_inner { + inner.lock().remove(peer_id); + } + + let endpoint = match self.connect_endpoints.remove(peer_id) { + Some((_, e)) => e, + None => return, }; + let backend = self; + + async_rt::task::spawn(async move { + let (socket, endpoint) = util::connect_forever(endpoint) + .await + .expect("Failed to connect"); + let peer_id = util::peer_connected(socket, backend.clone(), Some(endpoint.clone())) + .await + .expect("Failed to handshake"); + if let Some(monitor) = backend.monitor().lock().as_mut() { + let _ = monitor.try_send(SocketEvent::Connected(endpoint, peer_id)); + } + }); } } diff --git a/src/lib.rs b/src/lib.rs index 1d0676d..3e9f193 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -186,8 +186,13 @@ pub trait MultiPeerBackend: SocketBackend { /// This should not be public.. /// Find a better way of doing this - async fn peer_connected(self: Arc, peer_id: &PeerIdentity, io: FramedIo); - fn peer_disconnected(&self, peer_id: &PeerIdentity); + async fn peer_connected( + self: Arc, + peer_id: &PeerIdentity, + io: FramedIo, + endpoint: Option, + ); + fn peer_disconnected(self: Arc, peer_id: &PeerIdentity); } pub trait SocketBackend: Send + Sync { @@ -235,7 +240,7 @@ pub trait Socket: Sized + Send { async move { let result = match result { Ok((socket, endpoint)) => { - match util::peer_connected(socket, cloned_backend.clone()).await { + match util::peer_connected(socket, cloned_backend.clone(), None).await { Ok(peer_id) => Ok((endpoint, peer_id)), Err(e) => Err(e), } @@ -303,10 +308,12 @@ pub trait Socket: Sized + Send { let endpoint = TryIntoEndpoint::try_into(endpoint)?; let result = match util::connect_forever(endpoint).await { - Ok((socket, endpoint)) => match util::peer_connected(socket, backend).await { - Ok(peer_id) => Ok((endpoint, peer_id)), - Err(e) => Err(e), - }, + Ok((socket, endpoint)) => { + match util::peer_connected(socket, backend, Some(endpoint.clone())).await { + Ok(peer_id) => Ok((endpoint, peer_id)), + Err(e) => Err(e), + } + } Err(e) => Err(e), }; match result { diff --git a/src/pub.rs b/src/pub.rs index d4c24d0..bcef9a6 100644 --- a/src/pub.rs +++ b/src/pub.rs @@ -102,7 +102,12 @@ impl SocketBackend for PubSocketBackend { #[async_trait] impl MultiPeerBackend for PubSocketBackend { - async fn peer_connected(self: Arc, peer_id: &PeerIdentity, io: FramedIo) { + async fn peer_connected( + self: Arc, + peer_id: &PeerIdentity, + io: FramedIo, + endpoint: Option, + ) { let (mut recv_queue, send_queue) = io.into_parts(); // TODO provide handling for recv_queue let (sender, stop_receiver) = oneshot::channel(); @@ -143,7 +148,7 @@ impl MultiPeerBackend for PubSocketBackend { }); } - fn peer_disconnected(&self, peer_id: &PeerIdentity) { + fn peer_disconnected(self: Arc, peer_id: &PeerIdentity) { log::info!("Client disconnected {:?}", peer_id); self.subscribers.remove(peer_id); } @@ -197,7 +202,7 @@ impl SocketSend for PubSocket { } } for peer in dead_peers { - self.backend.peer_disconnected(&peer); + self.backend.clone().peer_disconnected(&peer); } Ok(()) } diff --git a/src/pull.rs b/src/pull.rs index d92297e..5387eaa 100644 --- a/src/pull.rs +++ b/src/pull.rs @@ -62,7 +62,7 @@ impl SocketRecv for PullSocket { } Some((_peer_id, Ok(msg))) => todo!("Unimplemented message: {:?}", msg), Some((peer_id, Err(_))) => { - self.backend.peer_disconnected(&peer_id); + self.backend.clone().peer_disconnected(&peer_id); } None => todo!(), }; diff --git a/src/rep.rs b/src/rep.rs index 8867df1..d9ab1dd 100644 --- a/src/rep.rs +++ b/src/rep.rs @@ -75,7 +75,12 @@ impl Socket for RepSocket { #[async_trait] impl MultiPeerBackend for RepSocketBackend { - async fn peer_connected(self: Arc, peer_id: &PeerIdentity, io: FramedIo) { + async fn peer_connected( + self: Arc, + peer_id: &PeerIdentity, + io: FramedIo, + endpoint: Option, + ) { let (recv_queue, send_queue) = io.into_parts(); self.peers.insert( @@ -90,7 +95,7 @@ impl MultiPeerBackend for RepSocketBackend { .insert(peer_id.clone(), recv_queue); } - fn peer_disconnected(&self, peer_id: &PeerIdentity) { + fn peer_disconnected(self: Arc, peer_id: &PeerIdentity) { if let Some(monitor) = self.monitor().lock().as_mut() { let _ = monitor.try_send(SocketEvent::Disconnected(peer_id.clone())); } diff --git a/src/req.rs b/src/req.rs index df7f4cc..943b94a 100644 --- a/src/req.rs +++ b/src/req.rs @@ -130,7 +130,12 @@ impl Socket for ReqSocket { #[async_trait] impl MultiPeerBackend for ReqSocketBackend { - async fn peer_connected(self: Arc, peer_id: &PeerIdentity, io: FramedIo) { + async fn peer_connected( + self: Arc, + peer_id: &PeerIdentity, + io: FramedIo, + endpoint: Option, + ) { let (recv_queue, send_queue) = io.into_parts(); self.peers.insert( peer_id.clone(), @@ -143,7 +148,7 @@ impl MultiPeerBackend for ReqSocketBackend { self.round_robin.push(peer_id.clone()); } - fn peer_disconnected(&self, peer_id: &PeerIdentity) { + fn peer_disconnected(self: Arc, peer_id: &PeerIdentity) { self.peers.remove(peer_id); } } diff --git a/src/router.rs b/src/router.rs index b64c954..4456f46 100644 --- a/src/router.rs +++ b/src/router.rs @@ -70,7 +70,7 @@ impl SocketRecv for RouterSocket { } Some((_peer_id, Ok(msg))) => todo!("Unimplemented message: {:?}", msg), Some((peer_id, Err(_))) => { - self.backend.peer_disconnected(&peer_id); + self.backend.clone().peer_disconnected(&peer_id); } None => todo!(), }; diff --git a/src/sub.rs b/src/sub.rs index 85d14e3..46f33a5 100644 --- a/src/sub.rs +++ b/src/sub.rs @@ -83,7 +83,12 @@ impl SocketBackend for SubSocketBackend { #[async_trait] impl MultiPeerBackend for SubSocketBackend { - async fn peer_connected(self: Arc, peer_id: &PeerIdentity, io: FramedIo) { + async fn peer_connected( + self: Arc, + peer_id: &PeerIdentity, + io: FramedIo, + endpoint: Option, + ) { let (recv_queue, mut send_queue) = io.into_parts(); let subs_msgs: Vec = self @@ -107,7 +112,7 @@ impl MultiPeerBackend for SubSocketBackend { }; } - fn peer_disconnected(&self, peer_id: &PeerIdentity) { + fn peer_disconnected(self: Arc, peer_id: &PeerIdentity) { self.peers.remove(peer_id); } } @@ -193,7 +198,7 @@ impl SocketRecv for SubSocket { } Some((_peer_id, Ok(msg))) => todo!("Unimplemented message: {:?}", msg), Some((peer_id, Err(_))) => { - self.backend.peer_disconnected(&peer_id); + self.backend.clone().peer_disconnected(&peer_id); } None => todo!(), } diff --git a/src/util.rs b/src/util.rs index 836fe8b..f4a9c88 100644 --- a/src/util.rs +++ b/src/util.rs @@ -188,6 +188,7 @@ pub(crate) async fn ready_exchange( pub(crate) async fn peer_connected( mut raw_socket: FramedIo, backend: Arc, + endpoint: Option, ) -> ZmqResult { greet_exchange(&mut raw_socket).await?; let mut props = None; @@ -197,7 +198,7 @@ pub(crate) async fn peer_connected( props = Some(connect_ops); } let peer_id = ready_exchange(&mut raw_socket, backend.socket_type(), props).await?; - backend.peer_connected(&peer_id, raw_socket).await; + backend.peer_connected(&peer_id, raw_socket, endpoint).await; Ok(peer_id) }