Skip to content

Commit

Permalink
Merge pull request #117 from zeromq/alex/connect_forever
Browse files Browse the repository at this point in the history
Implement connect_forever in util. Related to #73
  • Loading branch information
Alexei-Kornienko authored Jan 8, 2021
2 parents 3c65602 + 9cb5179 commit 636eb05
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 9 deletions.
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ pub trait Socket: Sized + Send {
let backend = self.backend();
let endpoint = endpoint.try_into()?;

let result = match transport::connect(endpoint).await {
let result = match util::connect_forever(endpoint).await {
Ok((socket, endpoint)) => match util::peer_connected(socket, backend).await {
Ok(peer_id) => Ok((endpoint, peer_id)),
Err(e) => Err(e),
Expand Down
8 changes: 4 additions & 4 deletions src/transport/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,18 @@ use crate::task_handle::TaskHandle;
use crate::ZmqResult;

use futures::{select, FutureExt};
use std::path::{Path, PathBuf};
use std::path::Path;

pub(crate) async fn connect(path: PathBuf) -> ZmqResult<(FramedIo, Endpoint)> {
let raw_socket = UnixStream::connect(&path).await?;
pub(crate) async fn connect(path: &Path) -> ZmqResult<(FramedIo, Endpoint)> {
let raw_socket = UnixStream::connect(path).await?;
let peer_addr = raw_socket.peer_addr()?;
let peer_addr = peer_addr.as_pathname().map(|a| a.to_owned());

Ok((make_framed(raw_socket), Endpoint::Ipc(peer_addr)))
}

pub(crate) async fn begin_accept<T>(
path: PathBuf,
path: &Path,
cback: impl Fn(ZmqResult<(FramedIo, Endpoint)>) -> T + Send + 'static,
) -> ZmqResult<(Endpoint, AcceptStopHandle)>
where
Expand Down
6 changes: 3 additions & 3 deletions src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ macro_rules! do_if_enabled {
///
/// # Panics
/// Panics if the requested endpoint uses a transport type that isn't enabled
pub(crate) async fn connect(endpoint: Endpoint) -> ZmqResult<(FramedIo, Endpoint)> {
pub(crate) async fn connect(endpoint: &Endpoint) -> ZmqResult<(FramedIo, Endpoint)> {
match endpoint {
Endpoint::Tcp(_host, _port) => {
do_if_enabled!("tcp-transport", tcp::connect(_host, _port).await)
do_if_enabled!("tcp-transport", tcp::connect(_host, *_port).await)
}
Endpoint::Ipc(_path) => do_if_enabled!(
"ipc-transport",
Expand Down Expand Up @@ -71,7 +71,7 @@ where
Endpoint::Ipc(_path) => do_if_enabled!(
"ipc-transport",
if let Some(path) = _path {
ipc::begin_accept(path, _cback).await
ipc::begin_accept(&path, _cback).await
} else {
Err(crate::error::ZmqError::Socket(
"Cannot begin accepting peers at an unnamed ipc socket",
Expand Down
2 changes: 1 addition & 1 deletion src/transport/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::ZmqResult;

use futures::{select, FutureExt};

pub(crate) async fn connect(host: Host, port: Port) -> ZmqResult<(FramedIo, Endpoint)> {
pub(crate) async fn connect(host: &Host, port: Port) -> ZmqResult<(FramedIo, Endpoint)> {
let raw_socket = TcpStream::connect((host.to_string().as_str(), port)).await?;
let peer_addr = raw_socket.peer_addr()?;

Expand Down
23 changes: 23 additions & 0 deletions src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use bytes::Bytes;
use futures::stream::StreamExt;
use futures::SinkExt;
use futures_codec::FramedRead;
use num_traits::Pow;
use rand::Rng;
use std::convert::{TryFrom, TryInto};
use std::sync::Arc;
use uuid::Uuid;
Expand Down Expand Up @@ -178,6 +180,27 @@ pub(crate) async fn peer_connected(
Ok(peer_id)
}

pub(crate) async fn connect_forever(endpoint: Endpoint) -> ZmqResult<(FramedIo, Endpoint)> {
let mut try_num: u64 = 0;
loop {
match transport::connect(&endpoint).await {
Ok(res) => return Ok(res),
Err(ZmqError::Network(e)) if e.kind() == std::io::ErrorKind::ConnectionRefused => {
if try_num < 5 {
try_num += 1;
}
let delay = {
let mut rng = rand::thread_rng();
std::f64::consts::E.pow(try_num as f64 / 3.0) + rng.gen_range(0.0f64, 0.1f64)
};
async_rt::task::sleep(std::time::Duration::from_secs_f64(delay)).await;
continue;
}
Err(e) => return Err(e),
}
}
}

#[cfg(test)]
pub(crate) mod tests {
use super::*;
Expand Down

0 comments on commit 636eb05

Please sign in to comment.