From afa1b3311386462b54063cf2cbddb3bac26a1255 Mon Sep 17 00:00:00 2001 From: Kyle Kelley Date: Thu, 12 Dec 2024 16:37:57 -0800 Subject: [PATCH 1/2] migrate to using plain `futures` crate --- Cargo.toml | 5 +---- examples/task_worker.rs | 2 +- src/backend.rs | 4 ++-- src/codec/framed.rs | 2 +- src/codec/mod.rs | 5 ++--- src/dealer.rs | 4 ++-- src/error.rs | 2 +- src/fair_queue.rs | 6 +++--- src/lib.rs | 4 ++-- src/pub.rs | 4 ++-- src/pull.rs | 4 ++-- src/push.rs | 2 +- src/rep.rs | 2 +- src/req.rs | 2 +- src/router.rs | 4 ++-- src/sub.rs | 4 ++-- src/task_handle.rs | 2 +- src/transport/ipc.rs | 4 ++-- src/transport/mod.rs | 4 ++-- src/transport/tcp.rs | 4 ++-- src/util.rs | 2 +- tests/pub_sub.rs | 6 +++--- tests/req_rep.rs | 2 +- tests/req_router_dealer_rep.rs | 2 +- 24 files changed, 39 insertions(+), 43 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f94083b..7e2d920 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,10 +21,7 @@ tcp-transport = [] [dependencies] async-dispatcher = { version = "0.1", optional = true } thiserror = "1" -futures-channel = { version = "0.3", features = ["sink"] } -futures-io = "0.3" -futures-task = "0.3" -futures-util = { version = "0.3", features = ["sink"] } +futures = "0.3" async-trait = "0.1" parking_lot = "0.12" rand = "0.8" diff --git a/examples/task_worker.rs b/examples/task_worker.rs index 7466263..3d7df10 100644 --- a/examples/task_worker.rs +++ b/examples/task_worker.rs @@ -1,6 +1,6 @@ mod async_helpers; -use futures_util::{select, FutureExt}; +use futures::{select, FutureExt}; use std::io::Write; use std::{error::Error, time::Duration}; use zeromq::{Socket, SocketRecv, SocketSend}; diff --git a/src/backend.rs b/src/backend.rs index de2c009..c1fef3e 100644 --- a/src/backend.rs +++ b/src/backend.rs @@ -8,8 +8,8 @@ use crate::{ use async_trait::async_trait; use crossbeam_queue::SegQueue; use dashmap::DashMap; -use futures_channel::mpsc; -use futures_util::SinkExt; +use futures::channel::mpsc; +use futures::SinkExt; use parking_lot::Mutex; use std::sync::Arc; diff --git a/src/codec/framed.rs b/src/codec/framed.rs index 942852a..fc87bdf 100644 --- a/src/codec/framed.rs +++ b/src/codec/framed.rs @@ -1,7 +1,7 @@ use crate::codec::ZmqCodec; use asynchronous_codec::{FramedRead, FramedWrite}; -use futures_io::{AsyncRead, AsyncWrite}; +use futures::{AsyncRead, AsyncWrite}; // Enables us to have multiple bounds on the dyn trait in `InnerFramed` pub trait FrameableRead: AsyncRead + Unpin + Send + Sync {} diff --git a/src/codec/mod.rs b/src/codec/mod.rs index 43e31bb..8863ac1 100644 --- a/src/codec/mod.rs +++ b/src/codec/mod.rs @@ -16,9 +16,8 @@ pub(crate) use zmq_codec::ZmqCodec; use crate::message::ZmqMessage; use crate::{ZmqError, ZmqResult}; - -use futures_task::noop_waker; -use futures_util::Sink; +use futures::task::noop_waker; +use futures::Sink; use std::pin::Pin; use std::task::{Context, Poll}; diff --git a/src/dealer.rs b/src/dealer.rs index 356c3fe..04cfe74 100644 --- a/src/dealer.rs +++ b/src/dealer.rs @@ -9,8 +9,8 @@ use crate::{ }; use async_trait::async_trait; -use futures_channel::mpsc; -use futures_util::StreamExt; +use futures::channel::mpsc; +use futures::StreamExt; use std::collections::hash_map::RandomState; use std::collections::HashMap; diff --git a/src/error.rs b/src/error.rs index 0edfcf8..5c2d47b 100644 --- a/src/error.rs +++ b/src/error.rs @@ -4,7 +4,7 @@ use crate::endpoint::EndpointError; use crate::task_handle::TaskError; use crate::ZmqMessage; -use futures_channel::mpsc; +use futures::channel::mpsc; use thiserror::Error; pub type ZmqResult = Result; diff --git a/src/fair_queue.rs b/src/fair_queue.rs index cb619a6..b1d2650 100644 --- a/src/fair_queue.rs +++ b/src/fair_queue.rs @@ -1,5 +1,5 @@ -use futures_task::{waker_ref, ArcWake}; -use futures_util::Stream; +use futures::task::{waker_ref, ArcWake}; +use futures::Stream; use parking_lot::Mutex; use std::cmp::Ordering; @@ -164,7 +164,7 @@ impl FairQueue { mod test { use crate::async_rt; use crate::fair_queue::FairQueue; - use futures_util::{stream, StreamExt}; + use futures::{stream, StreamExt}; #[async_rt::test] async fn test_fair_queue_ready() { diff --git a/src/lib.rs b/src/lib.rs index 1d0676d..600fc77 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -43,8 +43,8 @@ use util::PeerIdentity; use async_trait::async_trait; use asynchronous_codec::FramedWrite; -use futures_channel::mpsc; -use futures_util::{select, FutureExt}; +use futures::channel::mpsc; +use futures::{select, FutureExt}; use parking_lot::Mutex; use std::collections::HashMap; diff --git a/src/pub.rs b/src/pub.rs index d4c24d0..e7dc90d 100644 --- a/src/pub.rs +++ b/src/pub.rs @@ -11,8 +11,8 @@ use crate::{ use async_trait::async_trait; use dashmap::DashMap; -use futures_channel::{mpsc, oneshot}; -use futures_util::{select, FutureExt, StreamExt}; +use futures::channel::{mpsc, oneshot}; +use futures::{select, FutureExt, StreamExt}; use parking_lot::Mutex; use std::collections::HashMap; diff --git a/src/pull.rs b/src/pull.rs index d92297e..0144869 100644 --- a/src/pull.rs +++ b/src/pull.rs @@ -9,8 +9,8 @@ use crate::{ }; use async_trait::async_trait; -use futures_channel::mpsc; -use futures_util::StreamExt; +use futures::channel::mpsc; +use futures::StreamExt; use std::collections::hash_map::RandomState; use std::collections::HashMap; diff --git a/src/push.rs b/src/push.rs index 79f8603..9eabe5e 100644 --- a/src/push.rs +++ b/src/push.rs @@ -7,7 +7,7 @@ use crate::{ }; use async_trait::async_trait; -use futures_channel::mpsc; +use futures::channel::mpsc; use std::collections::hash_map::RandomState; use std::collections::HashMap; diff --git a/src/rep.rs b/src/rep.rs index 8867df1..2243f69 100644 --- a/src/rep.rs +++ b/src/rep.rs @@ -8,7 +8,7 @@ use crate::{SocketType, ZmqResult}; use async_trait::async_trait; use dashmap::DashMap; -use futures_util::{SinkExt, StreamExt}; +use futures::{SinkExt, StreamExt}; use parking_lot::Mutex; use std::collections::HashMap; diff --git a/src/req.rs b/src/req.rs index df7f4cc..714f3de 100644 --- a/src/req.rs +++ b/src/req.rs @@ -10,7 +10,7 @@ use async_trait::async_trait; use bytes::Bytes; use crossbeam_queue::SegQueue; use dashmap::DashMap; -use futures_util::{SinkExt, StreamExt}; +use futures::{SinkExt, StreamExt}; use std::collections::HashMap; use std::sync::Arc; diff --git a/src/router.rs b/src/router.rs index b64c954..4ffb4b0 100644 --- a/src/router.rs +++ b/src/router.rs @@ -10,8 +10,8 @@ use crate::{MultiPeerBackend, SocketEvent, SocketOptions, SocketRecv, SocketSend use crate::{Socket, SocketBackend}; use async_trait::async_trait; -use futures_channel::mpsc; -use futures_util::{SinkExt, StreamExt}; +use futures::channel::mpsc; +use futures::{SinkExt, StreamExt}; use std::collections::HashMap; use std::convert::TryInto; diff --git a/src/sub.rs b/src/sub.rs index 85d14e3..82e4835 100644 --- a/src/sub.rs +++ b/src/sub.rs @@ -15,8 +15,8 @@ use async_trait::async_trait; use bytes::{BufMut, BytesMut}; use crossbeam_queue::SegQueue; use dashmap::DashMap; -use futures_channel::mpsc; -use futures_util::{SinkExt, StreamExt}; +use futures::channel::mpsc; +use futures::{SinkExt, StreamExt}; use parking_lot::Mutex; use std::collections::{HashMap, HashSet}; diff --git a/src/task_handle.rs b/src/task_handle.rs index d75487e..b703154 100644 --- a/src/task_handle.rs +++ b/src/task_handle.rs @@ -1,7 +1,7 @@ use crate::async_rt; use crate::error::{ZmqError, ZmqResult}; -use futures_channel::oneshot; +use futures::channel::oneshot; use thiserror::Error; #[derive(Error, Debug)] diff --git a/src/transport/ipc.rs b/src/transport/ipc.rs index 80fab73..3c9698b 100644 --- a/src/transport/ipc.rs +++ b/src/transport/ipc.rs @@ -12,8 +12,8 @@ use crate::endpoint::Endpoint; use crate::task_handle::TaskHandle; use crate::ZmqResult; -use futures_channel::oneshot; -use futures_util::{select, FutureExt}; +use futures::channel::oneshot; +use futures::{select, FutureExt}; use std::path::Path; diff --git a/src/transport/mod.rs b/src/transport/mod.rs index d9ca275..4252387 100644 --- a/src/transport/mod.rs +++ b/src/transport/mod.rs @@ -104,9 +104,9 @@ where #[cfg(any(feature = "async-std-runtime", feature = "async-dispatcher-runtime"))] fn make_framed(stream: T) -> FramedIo where - T: futures_io::AsyncRead + futures_io::AsyncWrite + Send + Sync + 'static, + T: futures::AsyncRead + futures::AsyncWrite + Send + Sync + 'static, { - use futures_util::AsyncReadExt; + use futures::AsyncReadExt; let (read, write) = stream.split(); FramedIo::new(Box::new(read), Box::new(write)) } diff --git a/src/transport/tcp.rs b/src/transport/tcp.rs index 8df04be..7d74f9e 100644 --- a/src/transport/tcp.rs +++ b/src/transport/tcp.rs @@ -12,7 +12,7 @@ use crate::endpoint::{Endpoint, Host, Port}; use crate::task_handle::TaskHandle; use crate::ZmqResult; -use futures_util::{select, FutureExt}; +use futures::{select, FutureExt}; pub(crate) async fn connect(host: &Host, port: Port) -> ZmqResult<(FramedIo, Endpoint)> { let raw_socket = TcpStream::connect((host.to_string().as_str(), port)).await?; @@ -35,7 +35,7 @@ where { let listener = TcpListener::bind((host.to_string().as_str(), port)).await?; let resolved_addr = listener.local_addr()?; - let (stop_channel, stop_callback) = futures_channel::oneshot::channel::<()>(); + let (stop_channel, stop_callback) = futures::channel::oneshot::channel::<()>(); let task_handle = async_rt::task::spawn(async move { let mut stop_callback = stop_callback.fuse(); loop { diff --git a/src/util.rs b/src/util.rs index 836fe8b..1726068 100644 --- a/src/util.rs +++ b/src/util.rs @@ -3,7 +3,7 @@ use crate::*; use asynchronous_codec::FramedRead; use bytes::Bytes; -use futures_util::{SinkExt, StreamExt}; +use futures::{SinkExt, StreamExt}; use rand::Rng; use std::convert::{TryFrom, TryInto}; diff --git a/tests/pub_sub.rs b/tests/pub_sub.rs index 0fa32e2..e01067b 100644 --- a/tests/pub_sub.rs +++ b/tests/pub_sub.rs @@ -5,8 +5,8 @@ mod test { use zeromq::ZmqMessage; use zeromq::__async_rt as async_rt; - use futures_channel::{mpsc, oneshot}; - use futures_util::{SinkExt, StreamExt}; + use futures::channel::{mpsc, oneshot}; + use futures::{SinkExt, StreamExt}; use std::time::Duration; #[async_rt::test] @@ -102,6 +102,6 @@ mod test { "ipc://asdf.sock", "ipc://anothersocket-asdf", ]; - futures_util::future::join_all(addrs.into_iter().map(helper)).await; + futures::future::join_all(addrs.into_iter().map(helper)).await; } } diff --git a/tests/req_rep.rs b/tests/req_rep.rs index 50f5240..a0f7560 100644 --- a/tests/req_rep.rs +++ b/tests/req_rep.rs @@ -3,7 +3,7 @@ mod helpers; use zeromq::__async_rt as async_rt; use zeromq::prelude::*; -use futures_util::StreamExt; +use futures::StreamExt; use std::error::Error; use std::time::Duration; diff --git a/tests/req_router_dealer_rep.rs b/tests/req_router_dealer_rep.rs index dffcbd5..8aaaf06 100644 --- a/tests/req_router_dealer_rep.rs +++ b/tests/req_router_dealer_rep.rs @@ -7,7 +7,7 @@ mod test { use zeromq::__async_rt as async_rt; use zeromq::prelude::*; - use futures_util::StreamExt; + use futures::StreamExt; use std::error::Error; use std::time::Duration; From 38a536742c3b4359079c4792ef39e3bb76fa3266 Mon Sep 17 00:00:00 2001 From: Kyle Kelley Date: Thu, 12 Dec 2024 16:52:38 -0800 Subject: [PATCH 2/2] address clippy lints --- src/fair_queue.rs | 7 +++---- src/lib.rs | 7 ++++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/fair_queue.rs b/src/fair_queue.rs index b1d2650..a2d7b88 100644 --- a/src/fair_queue.rs +++ b/src/fair_queue.rs @@ -24,10 +24,9 @@ impl QueueInner { priority: self.counter.fetch_add(1, atomic::Ordering::Relaxed), key: k, }); - match &self.waker { - Some(w) => w.wake_by_ref(), - None => (), - }; + if let Some(w) = &self.waker { + w.wake_by_ref(); + } } pub fn remove(&mut self, k: &K) { diff --git a/src/lib.rs b/src/lib.rs index 600fc77..fa6d830 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -185,8 +185,8 @@ impl SocketOptions { pub trait MultiPeerBackend: SocketBackend { /// This should not be public.. /// Find a better way of doing this - async fn peer_connected(self: Arc, peer_id: &PeerIdentity, io: FramedIo); + fn peer_disconnected(&self, peer_id: &PeerIdentity); } @@ -211,6 +211,7 @@ pub trait SocketSend { /// in [proxy] function as a capture parameter pub trait CaptureSocket: SocketSend {} +#[allow(clippy::empty_line_after_outer_attr)] #[async_trait] pub trait Socket: Sized + Send { fn new() -> Self { @@ -333,11 +334,11 @@ pub trait Socket: Sized + Send { /// # Errors /// May give a `ZmqError::NoSuchConnection` if `endpoint` isn't connected. /// May also give any other zmq errors encountered when attempting to - /// disconnect. + /// disconnect // TODO: async fn disconnect(&mut self, endpoint: impl TryIntoEndpoint + 'async_trait) -> // ZmqResult<()>; - /// Disconnects all connecttions, blocking until finished. + /// Disconnects all connections, blocking until finished. // TODO: async fn disconnect_all(&mut self) -> ZmqResult<()>; /// Closes the socket, blocking until all associated binds are closed.