Skip to content

Commit

Permalink
Fix leak of remaining pending operations during io_service destruction
Browse files Browse the repository at this point in the history
Previously, ops were tracked in boost::intrusive::lists and deleted by
unlinking from the list and calling do_complete, which did the "delete".
However, any remaining ops in the queues during destruction of
per_descriptor_data were not freed, because boost::intrusive::list
destruction only unlinks and does not free the nodes.

We need to free the reactor_ops remaining in the queue, but without calling
the user-supplied completion handlers, because, for example, we want to
prevent new enqueue() calls from coming in during io_service destruction.
This matches boost::asio behaviour:
"Uninvoked handler objects that were scheduled for deferred invocation on
the io_context, or any associated strand, are destroyed."
(https://www.boost.org/doc/libs/1_74_0/doc/html/boost_asio/reference/io_context/_io_context.html)

Thus freeing needs to be separated from do_complete().

This patch refactors the op/queue memory management to fix the leak,
and separates the memory management from the do_complete() action.
Now, ops are tracked with std::deque + std::unique_ptr everywhere.
They no longer delete themselves in do_complete(), but instead we rely on
automatic destructor calls to free the memory. do_complete() calls are
still done in the same place as before, but now they only call the
completion handlers and leave the freeing to the destructors.
  • Loading branch information
dkl committed Sep 27, 2021
1 parent 6675a0a commit 1a53ed3
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 92 deletions.
8 changes: 3 additions & 5 deletions azmq/detail/reactor_op.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,26 @@

#include <boost/optional.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/intrusive/list.hpp>

namespace azmq {
namespace detail {
class reactor_op {
public:
using socket_type = socket_ops::socket_type;
using flags_type = socket_ops::flags_type;
boost::intrusive::list_member_hook<> member_hook_;
boost::system::error_code ec_;
size_t bytes_transferred_;

bool do_perform(socket_type & socket) { return perform_func_(this, socket); }
static void do_complete(reactor_op * op) {
op->complete_func_(op, op->ec_, op->bytes_transferred_);
static void do_complete(reactor_op* op) {
op->complete_func_(op);
}

static boost::system::error_code canceled() { return boost::asio::error::operation_aborted; }

protected:
typedef bool (*perform_func_type)(reactor_op*, socket_type &);
typedef void (*complete_func_type)(reactor_op* op, boost::system::error_code const&, size_t);
typedef void (*complete_func_type)(reactor_op* op);

perform_func_type perform_func_;
complete_func_type complete_func_;
Expand Down
32 changes: 6 additions & 26 deletions azmq/detail/receive_op.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,9 @@ class receive_buffer_op : public receive_buffer_op_base<MutableBufferSequence> {
, handler_(std::move(handler))
{ }

static void do_complete(reactor_op* base,
const boost::system::error_code &,
size_t) {
static void do_complete(reactor_op* base) {
auto o = static_cast<receive_buffer_op*>(base);
auto h = std::move(o->handler_);
auto ec = o->ec_;
auto bt = o->bytes_transferred_;
delete o;
h(ec, bt);
o->handler_(o->ec_, o->bytes_transferred_);
}

private:
Expand All @@ -92,16 +86,9 @@ class receive_more_buffer_op : public receive_buffer_op_base<MutableBufferSequen
, handler_(std::move(handler))
{ }

static void do_complete(reactor_op* base,
const boost::system::error_code &,
size_t) {
static void do_complete(reactor_op* base) {
auto o = static_cast<receive_more_buffer_op*>(base);
auto h = std::move(o->handler_);
auto ec = o->ec_;
auto bt = o->bytes_transferred_;
auto m = o->more();
delete o;
h(ec, std::make_pair(bt, m));
o->handler_(o->ec_, std::make_pair(o->bytes_transferred_, o->more()));
}

private:
Expand Down Expand Up @@ -140,16 +127,9 @@ class receive_op : public receive_op_base {
, handler_(std::move(handler))
{ }

static void do_complete(reactor_op* base,
const boost::system::error_code &,
size_t) {
static void do_complete(reactor_op* base) {
auto o = static_cast<receive_op*>(base);
auto h = std::move(o->handler_);
auto m = std::move(o->msg_);
auto ec = o->ec_;
auto bt = o->bytes_transferred_;
delete o;
h(ec, m, bt);
o->handler_(o->ec_, o->msg_, o->bytes_transferred_);
}

private:
Expand Down
21 changes: 4 additions & 17 deletions azmq/detail/send_op.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,9 @@ class send_buffer_op : public send_buffer_op_base<ConstBufferSequence> {
, handler_(std::move(handler))
{ }

static void do_complete(reactor_op* base,
const boost::system::error_code &,
size_t) {
static void do_complete(reactor_op* base) {
auto o = static_cast<send_buffer_op*>(base);
auto h = std::move(o->handler_);
auto ec = o->ec_;
auto bt = o->bytes_transferred_;
delete o;

h(ec, bt);
o->handler_(o->ec_, o->bytes_transferred_);
}

private:
Expand Down Expand Up @@ -110,15 +103,9 @@ class send_op : public send_op_base {
, handler_(std::move(handler))
{ }

static void do_complete(reactor_op* base,
const boost::system::error_code &,
size_t) {
static void do_complete(reactor_op* base) {
auto o = static_cast<send_op*>(base);
auto h = std::move(o->handler_);
auto ec = o->ec_;
auto bt = o->bytes_transferred_;
delete o;
h(ec, bt);
o->handler_(o->ec_, o->bytes_transferred_);
}

private:
Expand Down
83 changes: 39 additions & 44 deletions azmq/detail/socket_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
#include <boost/version.hpp>
#include <boost/assert.hpp>
#include <boost/optional.hpp>
#include <boost/intrusive/list.hpp>
#include <boost/system/system_error.hpp>
#include <boost/container/flat_map.hpp>
#include <boost/thread/mutex.hpp>
Expand All @@ -44,6 +43,7 @@
#include <vector>
#include <tuple>
#include <ostream>
#include <deque>

namespace azmq {
namespace detail {
Expand All @@ -57,12 +57,8 @@ namespace detail {
using flags_type = socket_ops::flags_type;
using more_result_type = socket_ops::more_result_type;
using context_type = context_ops::context_type;
using op_queue_type = boost::intrusive::list<reactor_op,
boost::intrusive::member_hook<
reactor_op,
boost::intrusive::list_member_hook<>,
&reactor_op::member_hook_
>>;
using reactor_op_ptr = std::unique_ptr<reactor_op>;
using op_queue_type = std::deque<reactor_op_ptr>;
using exts_type = boost::container::flat_map<std::type_index, socket_ext>;
using allow_speculative = opt::boolean<static_cast<int>(opt::limits::lib_socket_min)>;

Expand Down Expand Up @@ -91,7 +87,7 @@ namespace detail {
exts_type exts_;
endpoint_type endpoint_;
bool serverish_ = false;
std::array<op_queue_type, max_ops> op_queue_;
std::array<op_queue_type, max_ops> op_queues_;

#ifdef AZMQ_DETAIL_USE_IO_SERVICE
void do_open(boost::asio::io_service & ios,
Expand All @@ -115,8 +111,8 @@ namespace detail {
int events_mask() const
{
static_assert(2 == max_ops, "2 == max_ops");
return (!op_queue_[read_op].empty() ? ZMQ_POLLIN : 0)
| (!op_queue_[write_op].empty() ? ZMQ_POLLOUT : 0);
return (!op_queues_[read_op].empty() ? ZMQ_POLLIN : 0)
| (!op_queues_[write_op].empty() ? ZMQ_POLLOUT : 0);
}

bool perform_ops(op_queue_type & ops, boost::system::error_code& ec) {
Expand All @@ -126,10 +122,9 @@ namespace detail {
break;

for (size_t i = 0; i != max_ops; ++i) {
if ((evs & filter[i]) && op_queue_[i].front().do_perform(socket_)) {
op_queue_[i].pop_front_and_dispose([&ops](reactor_op * op) {
ops.push_back(*op);
});
if ((evs & filter[i]) && op_queues_[i].front()->do_perform(socket_)) {
ops.push_back(std::move(op_queues_[i].front()));
op_queues_[i].pop_front();
}
}
}
Expand All @@ -143,11 +138,10 @@ namespace detail {

void cancel_ops(boost::system::error_code const& ec, op_queue_type & ops) {
for (size_t i = 0; i != max_ops; ++i) {
while (!op_queue_[i].empty()) {
op_queue_[i].front().ec_ = ec;
op_queue_[i].pop_front_and_dispose([&ops](reactor_op * op) {
ops.push_back(*op);
});
while (!op_queues_[i].empty()) {
op_queues_[i].front()->ec_ = ec;
ops.push_back(std::move(op_queues_[i].front()));
op_queues_[i].pop_front();
}
}
}
Expand Down Expand Up @@ -477,16 +471,9 @@ namespace detail {
return r;
}

using reactor_op_ptr = std::unique_ptr<reactor_op>;
template<typename T, typename... Args>
void enqueue(implementation_type & impl, op_type o, Args&&... args) {
reactor_op_ptr p{ new T(std::forward<Args>(args)...) };
boost::system::error_code ec = enqueue(impl, o, p);
if (ec) {
BOOST_ASSERT_MSG(p, "op ptr");
p->ec_ = ec;
reactor_op::do_complete(p.release());
}
enqueue(impl, o, std::unique_ptr<T>(new T(std::forward<Args>(args)...)));
}

boost::system::error_code cancel(implementation_type & impl,
Expand Down Expand Up @@ -527,8 +514,9 @@ namespace detail {
static void cancel_ops(implementation_type & impl) {
op_queue_type ops;
impl->cancel_ops(reactor_op::canceled(), ops);
while (!ops.empty())
ops.pop_front_and_dispose(reactor_op::do_complete);
for (const auto &op : ops) {
reactor_op::do_complete(op.get());
}
}

using weak_descriptor_ptr = std::weak_ptr<per_descriptor_data>;
Expand All @@ -549,8 +537,9 @@ namespace detail {
if (ec)
impl->cancel_ops(ec, ops);
}
while (!ops.empty())
ops.pop_front_and_dispose(reactor_op::do_complete);
for (const auto &op : ops) {
reactor_op::do_complete(op.get());
}
}

void check_missed_events(implementation_type & impl)
Expand Down Expand Up @@ -632,8 +621,9 @@ namespace detail {
else
descriptors_.unregister_descriptor(p);
}
while (!ops.empty())
ops.pop_front_and_dispose(reactor_op::do_complete);
for (const auto &op : ops) {
reactor_op::do_complete(op.get());
}
}

static void schedule(descriptor_map & descriptors, implementation_type & impl) {
Expand All @@ -658,16 +648,17 @@ namespace detail {

struct deferred_completion {
weak_descriptor_ptr owner_;
reactor_op *op_;
// Using shared_ptr instead of unique_ptr here, so this completion handler is copy-constructible for asio
std::shared_ptr<reactor_op> op_;

deferred_completion(implementation_type const& owner,
reactor_op_ptr op)
: owner_(owner)
, op_(op.release())
, op_(std::move(op))
{ }

void operator()() {
reactor_op::do_complete(op_);
reactor_op::do_complete(op_.get());
if (auto p = owner_.lock()) {
unique_lock l{ *p };
p->in_speculative_completion_ = false;
Expand All @@ -680,17 +671,20 @@ namespace detail {

descriptor_map descriptors_;

boost::system::error_code enqueue(implementation_type & impl,
op_type o, reactor_op_ptr & op) {
void enqueue(implementation_type& impl, op_type o, reactor_op_ptr op) {
unique_lock l{ *impl };
boost::system::error_code ec;
if (is_shutdown(impl, o, ec))
return ec;
if (is_shutdown(impl, o, ec)) {
BOOST_ASSERT_MSG(op, "op ptr");
op->ec_ = ec;
reactor_op::do_complete(op.get());
return;
}

// we have at most one speculative completion in flight at any time
if (impl->allow_speculative_ && !impl->in_speculative_completion_) {
// attempt to execute speculatively when the op_queue is empty
if (impl->op_queue_[o].empty()) {
if (impl->op_queues_[o].empty()) {
if (op->do_perform(impl->socket_)) {
impl->in_speculative_completion_ = true;
l.unlock();
Expand All @@ -699,19 +693,20 @@ namespace detail {
#else
get_io_context().post(deferred_completion(impl, std::move(op)));
#endif
return ec;
return;
}
}
}
impl->op_queue_[o].push_back(*op.release());

impl->op_queues_[o].push_back(std::move(op));

if (!impl->scheduled_) {
impl->scheduled_ = true;
reactor_handler::schedule(descriptors_, impl);
} else {
check_missed_events(impl);
}
return ec;
return;
}
};

Expand Down
9 changes: 9 additions & 0 deletions test/socket/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -693,3 +693,12 @@ TEST_CASE( "Loopback", "[socket]" ) {
REQUIRE(s.ec == boost::system::error_code());
REQUIRE(s.ct == ct);
}

TEST_CASE( "socket_service does not call pending completion handlers when destroyed", "[socket]" ) {
boost::asio::io_service ioservice;
azmq::socket socket(ioservice, ZMQ_ROUTER);
socket.bind(subj(BOOST_CURRENT_FUNCTION));
socket.async_receive([](boost::system::error_code const& ec, azmq::message & msg, size_t bytes_transferred) {
FAIL();
});
}

0 comments on commit 1a53ed3

Please sign in to comment.