Skip to content

Commit

Permalink
Merge pull request #204 from zeromq/more-clippy
Browse files Browse the repository at this point in the history
More clippy
  • Loading branch information
rgbkrk authored Dec 13, 2024
2 parents d06811a + a4ef3de commit 07c65dc
Show file tree
Hide file tree
Showing 23 changed files with 149 additions and 60 deletions.
79 changes: 79 additions & 0 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
[target.'cfg(all())']
rustflags = [
# BEGIN - Embark standard lints v6 for Rust 1.55+
# do not change or add/remove here, but one can add exceptions after this section
# for more info see: <https://github.com/EmbarkStudios/rust-ecosystem/issues/59>
"-Dunsafe_code",
"-Wclippy::all",
"-Wclippy::await_holding_lock",
"-Wclippy::char_lit_as_u8",
"-Wclippy::checked_conversions",
"-Wclippy::dbg_macro",
"-Wclippy::debug_assert_with_mut_call",
"-Wclippy::doc_markdown",
"-Wclippy::empty_enum",
"-Wclippy::enum_glob_use",
"-Wclippy::exit",
"-Wclippy::expl_impl_clone_on_copy",
"-Wclippy::explicit_deref_methods",
"-Wclippy::explicit_into_iter_loop",
"-Wclippy::fallible_impl_from",
"-Wclippy::filter_map_next",
"-Wclippy::flat_map_option",
"-Wclippy::float_cmp_const",
"-Wclippy::fn_params_excessive_bools",
"-Wclippy::from_iter_instead_of_collect",
"-Wclippy::if_let_mutex",
"-Wclippy::implicit_clone",
"-Wclippy::imprecise_flops",
"-Wclippy::inefficient_to_string",
"-Wclippy::invalid_upcast_comparisons",
"-Wclippy::large_digit_groups",
"-Wclippy::large_stack_arrays",
"-Wclippy::large_types_passed_by_value",
"-Wclippy::let_unit_value",
"-Wclippy::linkedlist",
"-Wclippy::lossy_float_literal",
"-Wclippy::macro_use_imports",
"-Wclippy::manual_ok_or",
"-Wclippy::map_err_ignore",
"-Wclippy::map_flatten",
"-Wclippy::map_unwrap_or",
"-Wclippy::match_on_vec_items",
"-Wclippy::match_same_arms",
"-Wclippy::match_wild_err_arm",
"-Wclippy::match_wildcard_for_single_variants",
"-Wclippy::mem_forget",
"-Wclippy::missing_enforced_import_renames",
"-Wclippy::mut_mut",
"-Wclippy::mutex_integer",
"-Wclippy::needless_borrow",
"-Wclippy::needless_continue",
"-Wclippy::needless_for_each",
"-Wclippy::option_option",
"-Wclippy::path_buf_push_overwrite",
"-Wclippy::ptr_as_ptr",
"-Wclippy::rc_mutex",
"-Wclippy::ref_option_ref",
"-Wclippy::rest_pat_in_fully_bound_structs",
"-Wclippy::same_functions_in_if_condition",
"-Wclippy::semicolon_if_nothing_returned",
"-Wclippy::single_match_else",
"-Wclippy::string_add_assign",
"-Wclippy::string_add",
"-Wclippy::string_lit_as_bytes",
"-Wclippy::string_to_string",
# "-Wclippy::todo", // todo!("handle these in subsequent PRs")
"-Wclippy::trait_duplication_in_bounds",
"-Wclippy::unimplemented",
"-Wclippy::unnested_or_patterns",
"-Wclippy::unused_self",
"-Wclippy::useless_transmute",
"-Wclippy::verbose_file_reads",
"-Wclippy::zero_sized_map_values",
"-Wfuture_incompatible",
"-Wnonstandard_style",
"-Wrust_2018_idioms",
"-Wunexpected_cfgs",
# END - Embark standard lints v6 for Rust 1.55+
]
4 changes: 2 additions & 2 deletions benches/req_rep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ fn criterion_benchmark(c: &mut Criterion) {
bench(&mut group, "TCP", "tcp://localhost:0", &mut rt);
bench(&mut group, "IPC", "ipc://req_rep.sock", &mut rt);

fn bench(group: &mut BenchGroup, bench_name: &str, endpoint: &str, rt: &mut Runtime) {
fn bench(group: &mut BenchGroup<'_>, bench_name: &str, endpoint: &str, rt: &mut Runtime) {
#[allow(unused, clippy::redundant_locals)]
let rt = rt;

Expand All @@ -61,7 +61,7 @@ fn criterion_benchmark(c: &mut Criterion) {
async_std::task::block_on(iter_fn(&mut req, &mut rep));
#[cfg(feature = "async-dispatcher-runtime")]
async_dispatcher::block_on(iter_fn(&mut req, &mut rep));
})
});
});
}

Expand Down
4 changes: 2 additions & 2 deletions examples/async_helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ pub use async_std::{main, test};
#[allow(unused)]
#[cfg(feature = "tokio-runtime")]
pub async fn sleep(duration: std::time::Duration) {
tokio::time::sleep(duration).await
tokio::time::sleep(duration).await;
}
#[allow(unused)]
#[cfg(feature = "async-std-runtime")]
pub async fn sleep(duration: std::time::Duration) {
async_std::task::sleep(duration).await
async_std::task::sleep(duration).await;
}

#[allow(unused_imports)]
Expand Down
4 changes: 2 additions & 2 deletions examples/message_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ async fn main() -> Result<(), Box<dyn Error>> {

socket.send("Hello".into()).await?;
let repl = socket.recv().await?;
dbg!(repl);
println!("{:?}", repl);

socket.send("Hello".into()).await?;
let repl = socket.recv().await?;
dbg!(repl);
println!("{:?}", repl);
Ok(())
}
2 changes: 1 addition & 1 deletion examples/message_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ async fn main() -> Result<(), Box<dyn Error>> {

loop {
let mut repl: String = socket.recv().await?.try_into()?;
dbg!(&repl);
println!("Received: {}", repl);
repl.push_str(" Reply");
socket.send(repl.into()).await?;
}
Expand Down
2 changes: 1 addition & 1 deletion examples/socket_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
for _ in 0..10u64 {
socket.send("Hello".into()).await?;
let repl = socket.recv().await?;
dbg!(repl);
println!("Received: {:?}", repl);
}
Ok(())
}
2 changes: 1 addition & 1 deletion examples/socket_client_with_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
for _ in 0..10u64 {
socket.send("Hello".into()).await?;
let repl = socket.recv().await?;
dbg!(repl);
println!("Received: {:?}", repl);
}
Ok(())
}
2 changes: 1 addition & 1 deletion examples/socket_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

loop {
let mut repl: String = socket.recv().await?.try_into()?;
dbg!(&repl);
println!("Received: {:?}", repl);
repl.push_str(" Reply");
socket.send(repl.into()).await?;
}
Expand Down
2 changes: 1 addition & 1 deletion examples/stock_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let price: u32 = rng.gen_range(1..100);
let mut m: ZmqMessage = ZmqMessage::from(*stock);
m.push_back(price.to_ne_bytes().to_vec().into());
dbg!(m.clone());
println!("Sending: {:?}", m);
socket.send(m).await?;
}
async_helpers::sleep(Duration::from_secs(1)).await;
Expand Down
3 changes: 2 additions & 1 deletion examples/weather_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
for i in 0..10 {
println!("Message {}", i);
let repl = socket.recv().await?;
dbg!(repl);

println!("Received: {:?}", repl);
}
Ok(())
}
2 changes: 1 addition & 1 deletion src/async_rt/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ where
///
/// Note that some async runtimes (like async-std), may not bubble up panics
/// but instead abort the entire application. In these runtimes, you won't ever
/// get the opportunity to see the JoinError, because you're already dead.
/// get the opportunity to see the `JoinError`, because you're already dead.
#[derive(Debug)]
pub enum JoinError {
Cancelled,
Expand Down
45 changes: 22 additions & 23 deletions src/codec/zmq_codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,27 +120,25 @@ impl Decoder for ZmqCodec {
}
}

impl ZmqCodec {
fn _encode_frame(&mut self, frame: &Bytes, dst: &mut BytesMut, more: bool) {
let mut flags: u8 = 0;
if more {
flags |= 0b0000_0001;
}
let len = frame.len();
if len > 255 {
flags |= 0b0000_0010;
dst.reserve(len + 9);
} else {
dst.reserve(len + 2);
}
dst.put_u8(flags);
if len > 255 {
dst.put_u64(len as u64);
} else {
dst.put_u8(len as u8);
}
dst.extend_from_slice(frame.as_ref());
fn encode_frame(frame: &Bytes, dst: &mut BytesMut, more: bool) {
let mut flags: u8 = 0;
if more {
flags |= 0b0000_0001;
}
let len = frame.len();
if len > 255 {
flags |= 0b0000_0010;
dst.reserve(len + 9);
} else {
dst.reserve(len + 2);
}
dst.put_u8(flags);
if len > 255 {
dst.put_u64(len as u64);
} else {
dst.put_u8(len as u8);
}
dst.extend_from_slice(frame.as_ref());
}

impl Encoder for ZmqCodec {
Expand All @@ -154,7 +152,7 @@ impl Encoder for ZmqCodec {
Message::Message(message) => {
let last_element = message.len() - 1;
for (idx, part) in message.iter().enumerate() {
self._encode_frame(part, dst, idx != last_element);
encode_frame(part, dst, idx != last_element);
}
}
}
Expand All @@ -179,7 +177,8 @@ pub(crate) mod tests {
.decode(&mut bytes)
.expect("decode success")
.expect("single message");
dbg!(&message);

eprintln!("{:?}", &message);
match message {
Message::Message(m) => {
assert_eq!(6, m.into_vecdeque().len());
Expand All @@ -202,7 +201,7 @@ pub(crate) mod tests {
.decode(&mut bytes)
.expect("decode success")
.expect("single message");
dbg!(&message);
eprintln!("{:?}", &message);
assert_eq!(bytes.len(), 0);
match message {
Message::Message(m) => {
Expand Down
2 changes: 1 addition & 1 deletion src/endpoint/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub enum Host {
}

impl fmt::Display for Host {
fn fmt(&self, f: &mut fmt::Formatter) -> std::result::Result<(), std::fmt::Error> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
match self {
Host::Ipv4(addr) => write!(f, "{}", addr),
Host::Ipv6(addr) => write!(f, "{}", addr),
Expand Down
4 changes: 2 additions & 2 deletions src/endpoint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl FromStr for Endpoint {
let port = caps.get(2).unwrap().as_str();
let port: Port = port
.parse()
.map_err(|_| EndpointError::Syntax("Port must be a u16 but was out of range"))?;
.map_err(|_e| EndpointError::Syntax("Port must be a u16 but was out of range"))?;

let host: Host = host.parse()?;
Ok((host, port))
Expand All @@ -98,7 +98,7 @@ impl FromStr for Endpoint {
}

impl fmt::Display for Endpoint {
fn fmt(&self, f: &mut fmt::Formatter) -> std::result::Result<(), std::fmt::Error> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
match self {
Endpoint::Tcp(host, port) => {
if let Host::Ipv6(_) = host {
Expand Down
2 changes: 1 addition & 1 deletion src/endpoint/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl TryFrom<&str> for Transport {
}

impl fmt::Display for Transport {
fn fmt(&self, f: &mut fmt::Formatter) -> std::result::Result<(), std::fmt::Error> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), std::fmt::Error> {
f.write_str(self.as_str())
}
}
2 changes: 1 addition & 1 deletion src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::fmt;
pub struct ZmqEmptyMessageError;

impl fmt::Display for ZmqEmptyMessageError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Unable to construct an empty ZmqMessage")
}
}
Expand Down
34 changes: 22 additions & 12 deletions src/pub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,31 @@ pub(crate) struct PubSocketBackend {

impl PubSocketBackend {
fn message_received(&self, peer_id: &PeerIdentity, message: Message) {
let message = match message {
Message::Message(m) => m,
let data = match message {
Message::Message(m) => {
if m.len() != 1 {
log::warn!("Received message with unexpected length: {}", m.len());
return;
}
m.into_vec().pop().unwrap_or_default()
}
_ => return,
};
assert_eq!(message.len(), 1);
let data: Vec<u8> = message.into_vec().pop().unwrap().to_vec();

if data.is_empty() {
return;
}
match data[0] {
1 => {

match data.first() {
Some(1) => {
// Subscribe
self.subscribers
.get_mut(peer_id)
.unwrap()
.subscriptions
.push(Vec::from(&data[1..]));
}
0 => {
Some(0) => {
// Unsubscribe
let mut del_index = None;
let sub = Vec::from(&data[1..]);
Expand All @@ -77,7 +83,10 @@ impl PubSocketBackend {
.remove(index);
}
}
_ => (),
_ => log::warn!(
"Received message with unexpected first byte: {:?}",
data.first()
),
}
}
}
Expand Down Expand Up @@ -127,7 +136,7 @@ impl MultiPeerBackend for PubSocketBackend {
match message {
Some(Ok(m)) => backend.message_received(&peer_id, m),
Some(Err(e)) => {
dbg!(e);
log::debug!("Error receiving message: {:?}", e);
backend.peer_disconnected(&peer_id);
break;
}
Expand Down Expand Up @@ -179,17 +188,18 @@ impl SocketSend for PubSocket {
if e.kind() == ErrorKind::BrokenPipe {
dead_peers.push(subscriber.key().clone());
} else {
dbg!(e);
log::error!("Error receiving message: {:?}", e);
}
}
Err(ZmqError::BufferFull(_)) => {
// ignore silently. https://rfc.zeromq.org/spec/29/ says:
// For processing outgoing messages:
// SHALL silently drop the message if the queue for a subscriber is full.
log::debug!("Queue for subscriber is full",);
}
Err(e) => {
dbg!(e);
todo!()
log::error!("Error receiving message: {:?}", e);
return Err(e);
}
}
break;
Expand Down
2 changes: 1 addition & 1 deletion src/sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ pub struct SubSocket {

impl Drop for SubSocket {
fn drop(&mut self) {
self.backend.shutdown()
self.backend.shutdown();
}
}

Expand Down
Loading

0 comments on commit 07c65dc

Please sign in to comment.