Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More clippy #204

Merged
merged 14 commits into from
Dec 13, 2024
79 changes: 79 additions & 0 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
[target.'cfg(all())']
rustflags = [
# BEGIN - Embark standard lints v6 for Rust 1.55+
# do not change or add/remove here, but one can add exceptions after this section
# for more info see: <https://github.com/EmbarkStudios/rust-ecosystem/issues/59>
"-Dunsafe_code",
"-Wclippy::all",
"-Wclippy::await_holding_lock",
"-Wclippy::char_lit_as_u8",
"-Wclippy::checked_conversions",
"-Wclippy::dbg_macro",
"-Wclippy::debug_assert_with_mut_call",
"-Wclippy::doc_markdown",
"-Wclippy::empty_enum",
"-Wclippy::enum_glob_use",
"-Wclippy::exit",
"-Wclippy::expl_impl_clone_on_copy",
"-Wclippy::explicit_deref_methods",
"-Wclippy::explicit_into_iter_loop",
"-Wclippy::fallible_impl_from",
"-Wclippy::filter_map_next",
"-Wclippy::flat_map_option",
"-Wclippy::float_cmp_const",
"-Wclippy::fn_params_excessive_bools",
"-Wclippy::from_iter_instead_of_collect",
"-Wclippy::if_let_mutex",
"-Wclippy::implicit_clone",
"-Wclippy::imprecise_flops",
"-Wclippy::inefficient_to_string",
"-Wclippy::invalid_upcast_comparisons",
"-Wclippy::large_digit_groups",
"-Wclippy::large_stack_arrays",
"-Wclippy::large_types_passed_by_value",
"-Wclippy::let_unit_value",
"-Wclippy::linkedlist",
"-Wclippy::lossy_float_literal",
"-Wclippy::macro_use_imports",
"-Wclippy::manual_ok_or",
"-Wclippy::map_err_ignore",
"-Wclippy::map_flatten",
"-Wclippy::map_unwrap_or",
"-Wclippy::match_on_vec_items",
"-Wclippy::match_same_arms",
"-Wclippy::match_wild_err_arm",
"-Wclippy::match_wildcard_for_single_variants",
"-Wclippy::mem_forget",
"-Wclippy::missing_enforced_import_renames",
"-Wclippy::mut_mut",
"-Wclippy::mutex_integer",
"-Wclippy::needless_borrow",
"-Wclippy::needless_continue",
"-Wclippy::needless_for_each",
"-Wclippy::option_option",
"-Wclippy::path_buf_push_overwrite",
"-Wclippy::ptr_as_ptr",
"-Wclippy::rc_mutex",
"-Wclippy::ref_option_ref",
"-Wclippy::rest_pat_in_fully_bound_structs",
"-Wclippy::same_functions_in_if_condition",
"-Wclippy::semicolon_if_nothing_returned",
"-Wclippy::single_match_else",
"-Wclippy::string_add_assign",
"-Wclippy::string_add",
"-Wclippy::string_lit_as_bytes",
"-Wclippy::string_to_string",
# "-Wclippy::todo", // todo!("handle these in subsequent PRs")
"-Wclippy::trait_duplication_in_bounds",
"-Wclippy::unimplemented",
"-Wclippy::unnested_or_patterns",
"-Wclippy::unused_self",
"-Wclippy::useless_transmute",
"-Wclippy::verbose_file_reads",
"-Wclippy::zero_sized_map_values",
"-Wfuture_incompatible",
"-Wnonstandard_style",
"-Wrust_2018_idioms",
"-Wunexpected_cfgs",
# END - Embark standard lints v6 for Rust 1.55+
]
4 changes: 2 additions & 2 deletions benches/req_rep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ fn criterion_benchmark(c: &mut Criterion) {
bench(&mut group, "TCP", "tcp://localhost:0", &mut rt);
bench(&mut group, "IPC", "ipc://req_rep.sock", &mut rt);

fn bench(group: &mut BenchGroup, bench_name: &str, endpoint: &str, rt: &mut Runtime) {
fn bench(group: &mut BenchGroup<'_>, bench_name: &str, endpoint: &str, rt: &mut Runtime) {
#[allow(unused, clippy::redundant_locals)]
let rt = rt;

Expand All @@ -61,7 +61,7 @@ fn criterion_benchmark(c: &mut Criterion) {
async_std::task::block_on(iter_fn(&mut req, &mut rep));
#[cfg(feature = "async-dispatcher-runtime")]
async_dispatcher::block_on(iter_fn(&mut req, &mut rep));
})
});
});
}

Expand Down
4 changes: 2 additions & 2 deletions examples/async_helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ pub use async_std::{main, test};
#[allow(unused)]
#[cfg(feature = "tokio-runtime")]
pub async fn sleep(duration: std::time::Duration) {
tokio::time::sleep(duration).await
tokio::time::sleep(duration).await;
}
#[allow(unused)]
#[cfg(feature = "async-std-runtime")]
pub async fn sleep(duration: std::time::Duration) {
async_std::task::sleep(duration).await
async_std::task::sleep(duration).await;
}

#[allow(unused_imports)]
Expand Down
4 changes: 2 additions & 2 deletions examples/message_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ async fn main() -> Result<(), Box<dyn Error>> {

socket.send("Hello".into()).await?;
let repl = socket.recv().await?;
dbg!(repl);
println!("{:?}", repl);

socket.send("Hello".into()).await?;
let repl = socket.recv().await?;
dbg!(repl);
println!("{:?}", repl);
Ok(())
}
2 changes: 1 addition & 1 deletion examples/message_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ async fn main() -> Result<(), Box<dyn Error>> {

loop {
let mut repl: String = socket.recv().await?.try_into()?;
dbg!(&repl);
println!("Received: {}", repl);
repl.push_str(" Reply");
socket.send(repl.into()).await?;
}
Expand Down
2 changes: 1 addition & 1 deletion examples/socket_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
for _ in 0..10u64 {
socket.send("Hello".into()).await?;
let repl = socket.recv().await?;
dbg!(repl);
println!("Received: {:?}", repl);
}
Ok(())
}
2 changes: 1 addition & 1 deletion examples/socket_client_with_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
for _ in 0..10u64 {
socket.send("Hello".into()).await?;
let repl = socket.recv().await?;
dbg!(repl);
println!("Received: {:?}", repl);
}
Ok(())
}
2 changes: 1 addition & 1 deletion examples/socket_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

loop {
let mut repl: String = socket.recv().await?.try_into()?;
dbg!(&repl);
println!("Received: {:?}", repl);
repl.push_str(" Reply");
socket.send(repl.into()).await?;
}
Expand Down
2 changes: 1 addition & 1 deletion examples/stock_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let price: u32 = rng.gen_range(1..100);
let mut m: ZmqMessage = ZmqMessage::from(*stock);
m.push_back(price.to_ne_bytes().to_vec().into());
dbg!(m.clone());
println!("Sending: {:?}", m);
socket.send(m).await?;
}
async_helpers::sleep(Duration::from_secs(1)).await;
Expand Down
3 changes: 2 additions & 1 deletion examples/weather_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
for i in 0..10 {
println!("Message {}", i);
let repl = socket.recv().await?;
dbg!(repl);

println!("Received: {:?}", repl);
}
Ok(())
}
2 changes: 1 addition & 1 deletion src/async_rt/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ where
///
/// Note that some async runtimes (like async-std), may not bubble up panics
/// but instead abort the entire application. In these runtimes, you won't ever
/// get the opportunity to see the JoinError, because you're already dead.
/// get the opportunity to see the `JoinError`, because you're already dead.
#[derive(Debug)]
pub enum JoinError {
Cancelled,
Expand Down
45 changes: 22 additions & 23 deletions src/codec/zmq_codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,27 +120,25 @@ impl Decoder for ZmqCodec {
}
}

impl ZmqCodec {
fn _encode_frame(&mut self, frame: &Bytes, dst: &mut BytesMut, more: bool) {
let mut flags: u8 = 0;
if more {
flags |= 0b0000_0001;
}
let len = frame.len();
if len > 255 {
flags |= 0b0000_0010;
dst.reserve(len + 9);
} else {
dst.reserve(len + 2);
}
dst.put_u8(flags);
if len > 255 {
dst.put_u64(len as u64);
} else {
dst.put_u8(len as u8);
}
dst.extend_from_slice(frame.as_ref());
fn encode_frame(frame: &Bytes, dst: &mut BytesMut, more: bool) {
let mut flags: u8 = 0;
if more {
flags |= 0b0000_0001;
}
let len = frame.len();
if len > 255 {
flags |= 0b0000_0010;
dst.reserve(len + 9);
} else {
dst.reserve(len + 2);
}
dst.put_u8(flags);
if len > 255 {
dst.put_u64(len as u64);
} else {
dst.put_u8(len as u8);
}
dst.extend_from_slice(frame.as_ref());
}

impl Encoder for ZmqCodec {
Expand All @@ -154,7 +152,7 @@ impl Encoder for ZmqCodec {
Message::Message(message) => {
let last_element = message.len() - 1;
for (idx, part) in message.iter().enumerate() {
self._encode_frame(part, dst, idx != last_element);
encode_frame(part, dst, idx != last_element);
}
}
}
Expand All @@ -179,7 +177,8 @@ pub(crate) mod tests {
.decode(&mut bytes)
.expect("decode success")
.expect("single message");
dbg!(&message);

eprintln!("{:?}", &message);
match message {
Message::Message(m) => {
assert_eq!(6, m.into_vecdeque().len());
Expand All @@ -202,7 +201,7 @@ pub(crate) mod tests {
.decode(&mut bytes)
.expect("decode success")
.expect("single message");
dbg!(&message);
eprintln!("{:?}", &message);
assert_eq!(bytes.len(), 0);
match message {
Message::Message(m) => {
Expand Down
2 changes: 1 addition & 1 deletion src/endpoint/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub enum Host {
}

impl fmt::Display for Host {
fn fmt(&self, f: &mut fmt::Formatter) -> std::result::Result<(), std::fmt::Error> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
match self {
Host::Ipv4(addr) => write!(f, "{}", addr),
Host::Ipv6(addr) => write!(f, "{}", addr),
Expand Down
4 changes: 2 additions & 2 deletions src/endpoint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl FromStr for Endpoint {
let port = caps.get(2).unwrap().as_str();
let port: Port = port
.parse()
.map_err(|_| EndpointError::Syntax("Port must be a u16 but was out of range"))?;
.map_err(|_e| EndpointError::Syntax("Port must be a u16 but was out of range"))?;

let host: Host = host.parse()?;
Ok((host, port))
Expand All @@ -98,7 +98,7 @@ impl FromStr for Endpoint {
}

impl fmt::Display for Endpoint {
fn fmt(&self, f: &mut fmt::Formatter) -> std::result::Result<(), std::fmt::Error> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
match self {
Endpoint::Tcp(host, port) => {
if let Host::Ipv6(_) = host {
Expand Down
2 changes: 1 addition & 1 deletion src/endpoint/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl TryFrom<&str> for Transport {
}

impl fmt::Display for Transport {
fn fmt(&self, f: &mut fmt::Formatter) -> std::result::Result<(), std::fmt::Error> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
f.write_str(self.as_str())
}
}
2 changes: 1 addition & 1 deletion src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::fmt;
pub struct ZmqEmptyMessageError;

impl fmt::Display for ZmqEmptyMessageError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Unable to construct an empty ZmqMessage")
}
}
Expand Down
34 changes: 22 additions & 12 deletions src/pub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,31 @@ pub(crate) struct PubSocketBackend {

impl PubSocketBackend {
fn message_received(&self, peer_id: &PeerIdentity, message: Message) {
let message = match message {
Message::Message(m) => m,
let data = match message {
Message::Message(m) => {
if m.len() != 1 {
log::warn!("Received message with unexpected length: {}", m.len());
return;
}
m.into_vec().pop().unwrap_or_default()
}
_ => return,
};
assert_eq!(message.len(), 1);
let data: Vec<u8> = message.into_vec().pop().unwrap().to_vec();

if data.is_empty() {
return;
}
match data[0] {
1 => {

match data.first() {
Some(1) => {
// Subscribe
self.subscribers
.get_mut(peer_id)
.unwrap()
.subscriptions
.push(Vec::from(&data[1..]));
}
0 => {
Some(0) => {
// Unsubscribe
let mut del_index = None;
let sub = Vec::from(&data[1..]);
Expand All @@ -77,7 +83,10 @@ impl PubSocketBackend {
.remove(index);
}
}
_ => (),
_ => log::warn!(
"Received message with unexpected first byte: {:?}",
data.first()
),
}
}
}
Expand Down Expand Up @@ -127,7 +136,7 @@ impl MultiPeerBackend for PubSocketBackend {
match message {
Some(Ok(m)) => backend.message_received(&peer_id, m),
Some(Err(e)) => {
dbg!(e);
log::debug!("Error receiving message: {:?}", e);
backend.peer_disconnected(&peer_id);
break;
}
Expand Down Expand Up @@ -179,17 +188,18 @@ impl SocketSend for PubSocket {
if e.kind() == ErrorKind::BrokenPipe {
dead_peers.push(subscriber.key().clone());
} else {
dbg!(e);
log::error!("Error receiving message: {:?}", e);
}
}
Err(ZmqError::BufferFull(_)) => {
// ignore silently. https://rfc.zeromq.org/spec/29/ says:
// For processing outgoing messages:
// SHALL silently drop the message if the queue for a subscriber is full.
log::debug!("Queue for subscriber is full",);
}
Err(e) => {
dbg!(e);
todo!()
log::error!("Error receiving message: {:?}", e);
return Err(e);
}
}
break;
Expand Down
2 changes: 1 addition & 1 deletion src/sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ pub struct SubSocket {

impl Drop for SubSocket {
fn drop(&mut self) {
self.backend.shutdown()
self.backend.shutdown();
}
}

Expand Down
Loading
Loading