Skip to content

Commit

Permalink
* fix zeromq#205 "Segmentation at sock.close() at loop.timer".
Browse files Browse the repository at this point in the history
  part 2: exception at loop.start()
  part 3: socket is not removed from items_
  via separate cb for close()
* testcases for it
  • Loading branch information
Pavel Orekhov authored and Pavel Orekhov committed Jan 29, 2018
1 parent bd94d1f commit ef37737
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 15 deletions.
62 changes: 62 additions & 0 deletions src/tests/test_loop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#include <thread>
#include <exception>

#define private public

#include "zmqpp/context.hpp"
#include "zmqpp/message.hpp"
#include "zmqpp/loop.hpp"
Expand Down Expand Up @@ -72,6 +74,66 @@ BOOST_AUTO_TEST_CASE(socket_removed_in_timer)
BOOST_CHECK(socket_called == false);
}

BOOST_AUTO_TEST_CASE(socket_closed_in_timer)
{
zmqpp::context context;

zmqpp::socket output(context, zmqpp::socket_type::pair);
output.bind("inproc://test");
zmqpp::socket input(context, zmqpp::socket_type::pair);
input.connect("inproc://test");

zmqpp::loop loop;

bool socket_called = false;

loop.add(output, [&socket_called]() -> bool { socket_called = true; return false; });
loop.add(std::chrono::milliseconds(0), 1, [&loop, &output]() -> bool {
loop.remove(output);
output.close();
loop.add(std::chrono::milliseconds(10), 1, []() -> bool { return false; });
return true;
});

input.send("PING");

BOOST_CHECK_NO_THROW(loop.start());
BOOST_CHECK(loop.items_.size() == 0);
BOOST_CHECK(socket_called == false);
}

BOOST_AUTO_TEST_CASE(socket_closed_after_remove_at_timer)
{
zmqpp::context context;

zmqpp::socket output(context, zmqpp::socket_type::pair);
output.bind("inproc://test");
zmqpp::socket input(context, zmqpp::socket_type::pair);
input.connect("inproc://test");

zmqpp::loop loop;

bool socket_called = false;

loop.add(output, [&socket_called]() -> bool { socket_called = true; return false; },
zmqpp::poller::poll_in, [&output]()
{
output.close();
return true;
});
loop.add(std::chrono::milliseconds(0), 1, [&loop, &output]() -> bool {
loop.remove(output);
loop.add(std::chrono::milliseconds(10), 1, []() -> bool { return false; });
return true;
});

input.send("PING");

BOOST_CHECK_NO_THROW(loop.start());
BOOST_CHECK(loop.items_.size() == 0);
BOOST_CHECK(socket_called == false);
}

BOOST_AUTO_TEST_CASE(simple_pull_push)
{
zmqpp::context context;
Expand Down
30 changes: 19 additions & 11 deletions src/zmqpp/loop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ namespace zmqpp
when += delay;
}

void loop::add(socket& socket, Callable callable, short const event /* = POLL_IN */)
void loop::add(socket& socket, Callable callable, short const event /* = POLL_IN */, Callable after_remove_cb /* = Callable(nullptr)*/)
{
zmq_pollitem_t item{static_cast<void *> (socket), 0, event, 0};
add(item, callable);
add(item, callable, after_remove_cb);
}

void loop::add(raw_socket_t const descriptor, Callable callable, short const event /* = POLL_IN */)
Expand All @@ -59,11 +59,11 @@ namespace zmqpp
add(item, callable);
}

void loop::add(const zmq_pollitem_t& item, Callable callable)
void loop::add(const zmq_pollitem_t& item, Callable callable, Callable after_remove_cb)
{
poller_.add(item);
rebuild_poller_ = true;
items_.push_back(std::make_pair(item, callable));
items_.push_back(std::make_tuple(item, callable, after_remove_cb));
}

loop::timer_id_t loop::add(std::chrono::milliseconds delay, size_t times, Callable callable)
Expand Down Expand Up @@ -110,16 +110,24 @@ namespace zmqpp
sockRemoveLater_.push_back(&socket);
return;
}
items_.erase(std::remove_if(items_.begin(), items_.end(), [&socket](const PollItemCallablePair & pair) -> bool

std::vector<PollItemCallableTuple> cb_after_remove;

items_.erase(std::remove_if(items_.begin(), items_.end(),
[&socket, &cb_after_remove](const PollItemCallableTuple & tuple) -> bool
{
const zmq_pollitem_t &item = pair.first;
const zmq_pollitem_t &item = std::get<0>(tuple);
if (nullptr != item.socket && item.socket == static_cast<void *> (socket))
{
if(std::get<2>(tuple))
cb_after_remove.push_back(tuple);
return true;
}
return false;
}), items_.end());
poller_.remove(socket);
for (const PollItemCallableTuple& item : cb_after_remove)
std::get<2>(item)();
}

void loop::remove(raw_socket_t const descriptor)
Expand All @@ -130,9 +138,9 @@ namespace zmqpp
fdRemoveLater_.push_back(descriptor);
return;
}
items_.erase(std::remove_if(items_.begin(), items_.end(), [descriptor](const PollItemCallablePair & pair) -> bool
items_.erase(std::remove_if(items_.begin(), items_.end(), [descriptor](const PollItemCallableTuple & tuple) -> bool
{
const zmq_pollitem_t &item = pair.first;
const zmq_pollitem_t &item = std::get<0>(tuple);
if (nullptr == item.socket && item.fd == descriptor)
{
return true;
Expand Down Expand Up @@ -194,12 +202,12 @@ namespace zmqpp

bool loop::start_handle_poller()
{
for (const PollItemCallablePair &pair : items_)
for (const PollItemCallableTuple &tuple : items_)
{
const zmq_pollitem_t &pollitem = pair.first;
const zmq_pollitem_t &pollitem = std::get<0>(tuple);

if (poller_.has_input(pollitem) || poller_.has_error(pollitem) || poller_.has_output(pollitem))
if(!pair.second())
if(!std::get<1>(tuple)())
return false;
}
return true;
Expand Down
10 changes: 6 additions & 4 deletions src/zmqpp/loop.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,10 @@ namespace zmqpp
* \param socket the socket to monitor.
* \param callable the function that will be called by the loop when a registered event occurs on socket.
* \param event the event flags to monitor on the socket.
* \param after_remove_cb will be called by loop after remove() completion.
* See tests/test_loop.cpp: socket_closed_in_timer and socket_closed_after_remove_at_timer
*/
void add(socket_t& socket, Callable callable, short const event = poller::poll_in);
void add(socket_t& socket, Callable callable, short const event = poller::poll_in, Callable after_remove_cb = Callable(nullptr));

/*!
* Add a standard socket to the loop, providing a handler that will be called when the monitored events occur.
Expand Down Expand Up @@ -125,18 +127,18 @@ namespace zmqpp
void update();
};

typedef std::pair<zmq_pollitem_t, Callable> PollItemCallablePair;
typedef std::tuple<zmq_pollitem_t, Callable, Callable> PollItemCallableTuple;
typedef std::pair<std::unique_ptr<timer_t>, Callable> TimerItemCallablePair;
static bool TimerItemCallablePairComp(const TimerItemCallablePair &lhs, const TimerItemCallablePair &rhs);

std::vector<PollItemCallablePair> items_;
std::vector<PollItemCallableTuple> items_;
std::list<TimerItemCallablePair> timers_;
std::vector<const socket_t *> sockRemoveLater_;
std::vector<raw_socket_t> fdRemoveLater_;
std::vector<timer_id_t> timerRemoveLater_;


void add(const zmq_pollitem_t &item, Callable callable);
void add(const zmq_pollitem_t &item, Callable callable, Callable after_remove_cb = Callable(nullptr));
void add(std::unique_ptr<timer_t>, Callable callable);

bool start_handle_timers();
Expand Down

0 comments on commit ef37737

Please sign in to comment.