diff --git a/.cargo/config.toml b/.cargo/config.toml new file mode 100644 index 0000000..a0431bc --- /dev/null +++ b/.cargo/config.toml @@ -0,0 +1,79 @@ +[target.'cfg(all())'] +rustflags = [ + # BEGIN - Embark standard lints v6 for Rust 1.55+ + # do not change or add/remove here, but one can add exceptions after this section + # for more info see: + "-Dunsafe_code", + "-Wclippy::all", + "-Wclippy::await_holding_lock", + "-Wclippy::char_lit_as_u8", + "-Wclippy::checked_conversions", + "-Wclippy::dbg_macro", + "-Wclippy::debug_assert_with_mut_call", + "-Wclippy::doc_markdown", + "-Wclippy::empty_enum", + "-Wclippy::enum_glob_use", + "-Wclippy::exit", + "-Wclippy::expl_impl_clone_on_copy", + "-Wclippy::explicit_deref_methods", + "-Wclippy::explicit_into_iter_loop", + "-Wclippy::fallible_impl_from", + "-Wclippy::filter_map_next", + "-Wclippy::flat_map_option", + "-Wclippy::float_cmp_const", + "-Wclippy::fn_params_excessive_bools", + "-Wclippy::from_iter_instead_of_collect", + "-Wclippy::if_let_mutex", + "-Wclippy::implicit_clone", + "-Wclippy::imprecise_flops", + "-Wclippy::inefficient_to_string", + "-Wclippy::invalid_upcast_comparisons", + "-Wclippy::large_digit_groups", + "-Wclippy::large_stack_arrays", + "-Wclippy::large_types_passed_by_value", + "-Wclippy::let_unit_value", + "-Wclippy::linkedlist", + "-Wclippy::lossy_float_literal", + "-Wclippy::macro_use_imports", + "-Wclippy::manual_ok_or", + "-Wclippy::map_err_ignore", + "-Wclippy::map_flatten", + "-Wclippy::map_unwrap_or", + "-Wclippy::match_on_vec_items", + "-Wclippy::match_same_arms", + "-Wclippy::match_wild_err_arm", + "-Wclippy::match_wildcard_for_single_variants", + "-Wclippy::mem_forget", + "-Wclippy::missing_enforced_import_renames", + "-Wclippy::mut_mut", + "-Wclippy::mutex_integer", + "-Wclippy::needless_borrow", + "-Wclippy::needless_continue", + "-Wclippy::needless_for_each", + "-Wclippy::option_option", + "-Wclippy::path_buf_push_overwrite", + "-Wclippy::ptr_as_ptr", + "-Wclippy::rc_mutex", + "-Wclippy::ref_option_ref", + "-Wclippy::rest_pat_in_fully_bound_structs", + "-Wclippy::same_functions_in_if_condition", + "-Wclippy::semicolon_if_nothing_returned", + "-Wclippy::single_match_else", + "-Wclippy::string_add_assign", + "-Wclippy::string_add", + "-Wclippy::string_lit_as_bytes", + "-Wclippy::string_to_string", + # "-Wclippy::todo", // todo!("handle these in subsequent PRs") + "-Wclippy::trait_duplication_in_bounds", + "-Wclippy::unimplemented", + "-Wclippy::unnested_or_patterns", + "-Wclippy::unused_self", + "-Wclippy::useless_transmute", + "-Wclippy::verbose_file_reads", + "-Wclippy::zero_sized_map_values", + "-Wfuture_incompatible", + "-Wnonstandard_style", + "-Wrust_2018_idioms", + "-Wunexpected_cfgs", + # END - Embark standard lints v6 for Rust 1.55+ +] diff --git a/benches/req_rep.rs b/benches/req_rep.rs index d759e1d..6451d2b 100644 --- a/benches/req_rep.rs +++ b/benches/req_rep.rs @@ -40,7 +40,7 @@ fn criterion_benchmark(c: &mut Criterion) { bench(&mut group, "TCP", "tcp://localhost:0", &mut rt); bench(&mut group, "IPC", "ipc://req_rep.sock", &mut rt); - fn bench(group: &mut BenchGroup, bench_name: &str, endpoint: &str, rt: &mut Runtime) { + fn bench(group: &mut BenchGroup<'_>, bench_name: &str, endpoint: &str, rt: &mut Runtime) { #[allow(unused, clippy::redundant_locals)] let rt = rt; @@ -61,7 +61,7 @@ fn criterion_benchmark(c: &mut Criterion) { async_std::task::block_on(iter_fn(&mut req, &mut rep)); #[cfg(feature = "async-dispatcher-runtime")] async_dispatcher::block_on(iter_fn(&mut req, &mut rep)); - }) + }); }); } diff --git a/examples/async_helpers/mod.rs b/examples/async_helpers/mod.rs index c404041..8719dbd 100644 --- a/examples/async_helpers/mod.rs +++ b/examples/async_helpers/mod.rs @@ -15,12 +15,12 @@ pub use async_std::{main, test}; #[allow(unused)] #[cfg(feature = "tokio-runtime")] pub async fn sleep(duration: std::time::Duration) { - tokio::time::sleep(duration).await + tokio::time::sleep(duration).await; } #[allow(unused)] #[cfg(feature = "async-std-runtime")] pub async fn sleep(duration: std::time::Duration) { - async_std::task::sleep(duration).await + async_std::task::sleep(duration).await; } #[allow(unused_imports)] diff --git a/examples/message_client.rs b/examples/message_client.rs index da3f0cd..86081f2 100644 --- a/examples/message_client.rs +++ b/examples/message_client.rs @@ -14,10 +14,10 @@ async fn main() -> Result<(), Box> { socket.send("Hello".into()).await?; let repl = socket.recv().await?; - dbg!(repl); + println!("{:?}", repl); socket.send("Hello".into()).await?; let repl = socket.recv().await?; - dbg!(repl); + println!("{:?}", repl); Ok(()) } diff --git a/examples/message_server.rs b/examples/message_server.rs index a66d963..d58c091 100644 --- a/examples/message_server.rs +++ b/examples/message_server.rs @@ -14,7 +14,7 @@ async fn main() -> Result<(), Box> { loop { let mut repl: String = socket.recv().await?.try_into()?; - dbg!(&repl); + println!("Received: {}", repl); repl.push_str(" Reply"); socket.send(repl.into()).await?; } diff --git a/examples/socket_client.rs b/examples/socket_client.rs index 22152b5..638a120 100644 --- a/examples/socket_client.rs +++ b/examples/socket_client.rs @@ -15,7 +15,7 @@ async fn main() -> Result<(), Box> { for _ in 0..10u64 { socket.send("Hello".into()).await?; let repl = socket.recv().await?; - dbg!(repl); + println!("Received: {:?}", repl); } Ok(()) } diff --git a/examples/socket_client_with_options.rs b/examples/socket_client_with_options.rs index 1679218..0d4c971 100644 --- a/examples/socket_client_with_options.rs +++ b/examples/socket_client_with_options.rs @@ -20,7 +20,7 @@ async fn main() -> Result<(), Box> { for _ in 0..10u64 { socket.send("Hello".into()).await?; let repl = socket.recv().await?; - dbg!(repl); + println!("Received: {:?}", repl); } Ok(()) } diff --git a/examples/socket_server.rs b/examples/socket_server.rs index cae80ee..41863b1 100644 --- a/examples/socket_server.rs +++ b/examples/socket_server.rs @@ -11,7 +11,7 @@ async fn main() -> Result<(), Box> { loop { let mut repl: String = socket.recv().await?.try_into()?; - dbg!(&repl); + println!("Received: {:?}", repl); repl.push_str(" Reply"); socket.send(repl.into()).await?; } diff --git a/examples/stock_server.rs b/examples/stock_server.rs index 01f5a6f..4bc7bd9 100644 --- a/examples/stock_server.rs +++ b/examples/stock_server.rs @@ -18,7 +18,7 @@ async fn main() -> Result<(), Box> { let price: u32 = rng.gen_range(1..100); let mut m: ZmqMessage = ZmqMessage::from(*stock); m.push_back(price.to_ne_bytes().to_vec().into()); - dbg!(m.clone()); + println!("Sending: {:?}", m); socket.send(m).await?; } async_helpers::sleep(Duration::from_secs(1)).await; diff --git a/examples/weather_client.rs b/examples/weather_client.rs index 6f125f8..0c9a652 100644 --- a/examples/weather_client.rs +++ b/examples/weather_client.rs @@ -17,7 +17,8 @@ async fn main() -> Result<(), Box> { for i in 0..10 { println!("Message {}", i); let repl = socket.recv().await?; - dbg!(repl); + + println!("Received: {:?}", repl); } Ok(()) } diff --git a/src/async_rt/task/mod.rs b/src/async_rt/task/mod.rs index 7a7b770..b2e1350 100644 --- a/src/async_rt/task/mod.rs +++ b/src/async_rt/task/mod.rs @@ -25,7 +25,7 @@ where /// /// Note that some async runtimes (like async-std), may not bubble up panics /// but instead abort the entire application. In these runtimes, you won't ever -/// get the opportunity to see the JoinError, because you're already dead. +/// get the opportunity to see the `JoinError`, because you're already dead. #[derive(Debug)] pub enum JoinError { Cancelled, diff --git a/src/codec/zmq_codec.rs b/src/codec/zmq_codec.rs index d4f5e1f..e280969 100644 --- a/src/codec/zmq_codec.rs +++ b/src/codec/zmq_codec.rs @@ -120,27 +120,25 @@ impl Decoder for ZmqCodec { } } -impl ZmqCodec { - fn _encode_frame(&mut self, frame: &Bytes, dst: &mut BytesMut, more: bool) { - let mut flags: u8 = 0; - if more { - flags |= 0b0000_0001; - } - let len = frame.len(); - if len > 255 { - flags |= 0b0000_0010; - dst.reserve(len + 9); - } else { - dst.reserve(len + 2); - } - dst.put_u8(flags); - if len > 255 { - dst.put_u64(len as u64); - } else { - dst.put_u8(len as u8); - } - dst.extend_from_slice(frame.as_ref()); +fn encode_frame(frame: &Bytes, dst: &mut BytesMut, more: bool) { + let mut flags: u8 = 0; + if more { + flags |= 0b0000_0001; } + let len = frame.len(); + if len > 255 { + flags |= 0b0000_0010; + dst.reserve(len + 9); + } else { + dst.reserve(len + 2); + } + dst.put_u8(flags); + if len > 255 { + dst.put_u64(len as u64); + } else { + dst.put_u8(len as u8); + } + dst.extend_from_slice(frame.as_ref()); } impl Encoder for ZmqCodec { @@ -154,7 +152,7 @@ impl Encoder for ZmqCodec { Message::Message(message) => { let last_element = message.len() - 1; for (idx, part) in message.iter().enumerate() { - self._encode_frame(part, dst, idx != last_element); + encode_frame(part, dst, idx != last_element); } } } @@ -179,7 +177,8 @@ pub(crate) mod tests { .decode(&mut bytes) .expect("decode success") .expect("single message"); - dbg!(&message); + + eprintln!("{:?}", &message); match message { Message::Message(m) => { assert_eq!(6, m.into_vecdeque().len()); @@ -202,7 +201,7 @@ pub(crate) mod tests { .decode(&mut bytes) .expect("decode success") .expect("single message"); - dbg!(&message); + eprintln!("{:?}", &message); assert_eq!(bytes.len(), 0); match message { Message::Message(m) => { diff --git a/src/endpoint/host.rs b/src/endpoint/host.rs index ddfb594..e4aac98 100644 --- a/src/endpoint/host.rs +++ b/src/endpoint/host.rs @@ -19,7 +19,7 @@ pub enum Host { } impl fmt::Display for Host { - fn fmt(&self, f: &mut fmt::Formatter) -> std::result::Result<(), std::fmt::Error> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> { match self { Host::Ipv4(addr) => write!(f, "{}", addr), Host::Ipv6(addr) => write!(f, "{}", addr), diff --git a/src/endpoint/mod.rs b/src/endpoint/mod.rs index e7f0352..77d8b7e 100644 --- a/src/endpoint/mod.rs +++ b/src/endpoint/mod.rs @@ -76,7 +76,7 @@ impl FromStr for Endpoint { let port = caps.get(2).unwrap().as_str(); let port: Port = port .parse() - .map_err(|_| EndpointError::Syntax("Port must be a u16 but was out of range"))?; + .map_err(|_e| EndpointError::Syntax("Port must be a u16 but was out of range"))?; let host: Host = host.parse()?; Ok((host, port)) @@ -98,7 +98,7 @@ impl FromStr for Endpoint { } impl fmt::Display for Endpoint { - fn fmt(&self, f: &mut fmt::Formatter) -> std::result::Result<(), std::fmt::Error> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> { match self { Endpoint::Tcp(host, port) => { if let Host::Ipv6(_) = host { diff --git a/src/endpoint/transport.rs b/src/endpoint/transport.rs index c88b49b..50d4d35 100644 --- a/src/endpoint/transport.rs +++ b/src/endpoint/transport.rs @@ -44,7 +44,7 @@ impl TryFrom<&str> for Transport { } impl fmt::Display for Transport { - fn fmt(&self, f: &mut fmt::Formatter) -> std::result::Result<(), std::fmt::Error> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> { f.write_str(self.as_str()) } } diff --git a/src/message.rs b/src/message.rs index 3e7c546..c50da6e 100644 --- a/src/message.rs +++ b/src/message.rs @@ -8,7 +8,7 @@ use std::fmt; pub struct ZmqEmptyMessageError; impl fmt::Display for ZmqEmptyMessageError { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "Unable to construct an empty ZmqMessage") } } diff --git a/src/pub.rs b/src/pub.rs index e7dc90d..0db5d30 100644 --- a/src/pub.rs +++ b/src/pub.rs @@ -34,17 +34,23 @@ pub(crate) struct PubSocketBackend { impl PubSocketBackend { fn message_received(&self, peer_id: &PeerIdentity, message: Message) { - let message = match message { - Message::Message(m) => m, + let data = match message { + Message::Message(m) => { + if m.len() != 1 { + log::warn!("Received message with unexpected length: {}", m.len()); + return; + } + m.into_vec().pop().unwrap_or_default() + } _ => return, }; - assert_eq!(message.len(), 1); - let data: Vec = message.into_vec().pop().unwrap().to_vec(); + if data.is_empty() { return; } - match data[0] { - 1 => { + + match data.first() { + Some(1) => { // Subscribe self.subscribers .get_mut(peer_id) @@ -52,7 +58,7 @@ impl PubSocketBackend { .subscriptions .push(Vec::from(&data[1..])); } - 0 => { + Some(0) => { // Unsubscribe let mut del_index = None; let sub = Vec::from(&data[1..]); @@ -77,7 +83,10 @@ impl PubSocketBackend { .remove(index); } } - _ => (), + _ => log::warn!( + "Received message with unexpected first byte: {:?}", + data.first() + ), } } } @@ -127,7 +136,7 @@ impl MultiPeerBackend for PubSocketBackend { match message { Some(Ok(m)) => backend.message_received(&peer_id, m), Some(Err(e)) => { - dbg!(e); + log::debug!("Error receiving message: {:?}", e); backend.peer_disconnected(&peer_id); break; } @@ -179,17 +188,18 @@ impl SocketSend for PubSocket { if e.kind() == ErrorKind::BrokenPipe { dead_peers.push(subscriber.key().clone()); } else { - dbg!(e); + log::error!("Error receiving message: {:?}", e); } } Err(ZmqError::BufferFull(_)) => { // ignore silently. https://rfc.zeromq.org/spec/29/ says: // For processing outgoing messages: // SHALL silently drop the message if the queue for a subscriber is full. + log::debug!("Queue for subscriber is full",); } Err(e) => { - dbg!(e); - todo!() + log::error!("Error receiving message: {:?}", e); + return Err(e); } } break; diff --git a/src/sub.rs b/src/sub.rs index 82e4835..9e4818b 100644 --- a/src/sub.rs +++ b/src/sub.rs @@ -120,7 +120,7 @@ pub struct SubSocket { impl Drop for SubSocket { fn drop(&mut self) { - self.backend.shutdown() + self.backend.shutdown(); } } diff --git a/src/transport/mod.rs b/src/transport/mod.rs index 4252387..15782e0 100644 --- a/src/transport/mod.rs +++ b/src/transport/mod.rs @@ -54,7 +54,7 @@ pub struct AcceptStopHandle(pub(crate) TaskHandle<()>); /// `Ok`, it will receive a tuple containing the framed raw socket, along with /// the endpoint of the remote connection accepted. /// -/// Returns a ZmqResult, which when Ok is a tuple of the resolved bound +/// Returns a `ZmqResult`, which when Ok is a tuple of the resolved bound /// endpoint, as well as a channel to stop the async accept task /// /// # Panics diff --git a/src/util.rs b/src/util.rs index 1726068..ca4b37e 100644 --- a/src/util.rs +++ b/src/util.rs @@ -102,7 +102,7 @@ pub(crate) struct Peer { /// Given the result of the greetings exchange, determines the version of the /// ZMTP protocol that should be used for communication with the peer according -/// to https://rfc.zeromq.org/spec/23/#version-negotiation. +/// to [ZeroMQ RFC 23](https://rfc.zeromq.org/spec/23/#version-negotiation). fn negotiate_version(greeting: Message) -> ZmqResult { let my_version = ZmqGreeting::default().version; diff --git a/tests/helpers.rs b/tests/helpers.rs index e102a1e..ccb606a 100644 --- a/tests/helpers.rs +++ b/tests/helpers.rs @@ -52,7 +52,7 @@ pub async fn run_req_client( assert_eq!( format!("Req - {}, Rep - {}", i, i), String::from_utf8(repl.get(0).unwrap().to_vec()).unwrap() - ) + ); } req_socket.close().await; Ok(()) diff --git a/tests/pub_sub_compliant.rs b/tests/pub_sub_compliant.rs index 21a5916..075d8ef 100644 --- a/tests/pub_sub_compliant.rs +++ b/tests/pub_sub_compliant.rs @@ -60,7 +60,7 @@ async fn run_our_subs(our_subs: Vec, num_to_recv: u32) { }) }); for h in join_handles { - h.await.expect("Subscriber task panicked!") + h.await.expect("Subscriber task panicked!"); } println!("Finished sub task"); } @@ -128,7 +128,7 @@ mod test { // https://github.com/zeromq/libzmq/issues/3387 // So we will delete it ourselves. if let Some(path) = e.strip_prefix("ipc://") { - std::fs::remove_file(path).expect("Failed to remove ipc file") + std::fs::remove_file(path).expect("Failed to remove ipc file"); } } } diff --git a/tests/req_rep_compliant.rs b/tests/req_rep_compliant.rs index 5bdc934..5d309d4 100644 --- a/tests/req_rep_compliant.rs +++ b/tests/req_rep_compliant.rs @@ -11,7 +11,7 @@ mod test { use zeromq::prelude::*; use zeromq::ZmqMessage; - /// Returns (socket, bound_endpoint, monitor) + // Returns (socket, bound_endpoint, monitor) fn setup_their_rep(bind_endpoint: &str) -> (zmq2::Socket, String, zmq2::Socket) { let ctx = zmq2::Context::new(); let their_rep = ctx.socket(zmq2::REP).expect("Couldn't make rep socket");