From 1a53ed375070d86d25cddb3b54c16a4552e77c24 Mon Sep 17 00:00:00 2001 From: Daniel Klauer Date: Fri, 29 May 2020 10:51:57 +0200 Subject: [PATCH] Fix leak of remaining pending operations during io_service destruction Previously, ops were tracked in boost::intrusive::lists and deleted by unlinking from the list and calling do_complete, which did the "delete". However, any remaining ops in the queues during destruction of per_descriptor_data were not freed, because boost::intrusive::list destruction only unlinks and does not free the nodes. We need to free the reactor_ops remaining in the queue, but without calling the user-supplied completion handlers, because, for example, we want to prevent new enqueue() calls from coming in during io_service destruction. This matches boost::asio behaviour: "Uninvoked handler objects that were scheduled for deferred invocation on the io_context, or any associated strand, are destroyed." (https://www.boost.org/doc/libs/1_74_0/doc/html/boost_asio/reference/io_context/_io_context.html) Thus freeing needs to be separated from do_complete(). This patch refactors the op/queue memory management to fix the leak, and separates the memory management from the do_complete() action. Now, ops are tracked with std::deque + std::unique_ptr everywhere. They no longer delete themselves in do_complete(), but instead we rely on automatic destructor calls to free the memory. do_complete() calls are still done in the same place as before, but now they only call the completion handlers and leave the freeing to the destructors. --- azmq/detail/reactor_op.hpp | 8 ++-- azmq/detail/receive_op.hpp | 32 +++---------- azmq/detail/send_op.hpp | 21 ++------- azmq/detail/socket_service.hpp | 83 ++++++++++++++++------------------ test/socket/main.cpp | 9 ++++ 5 files changed, 61 insertions(+), 92 deletions(-) diff --git a/azmq/detail/reactor_op.hpp b/azmq/detail/reactor_op.hpp index 12af965..120e447 100644 --- a/azmq/detail/reactor_op.hpp +++ b/azmq/detail/reactor_op.hpp @@ -14,7 +14,6 @@ #include #include -#include namespace azmq { namespace detail { @@ -22,20 +21,19 @@ class reactor_op { public: using socket_type = socket_ops::socket_type; using flags_type = socket_ops::flags_type; - boost::intrusive::list_member_hook<> member_hook_; boost::system::error_code ec_; size_t bytes_transferred_; bool do_perform(socket_type & socket) { return perform_func_(this, socket); } - static void do_complete(reactor_op * op) { - op->complete_func_(op, op->ec_, op->bytes_transferred_); + static void do_complete(reactor_op* op) { + op->complete_func_(op); } static boost::system::error_code canceled() { return boost::asio::error::operation_aborted; } protected: typedef bool (*perform_func_type)(reactor_op*, socket_type &); - typedef void (*complete_func_type)(reactor_op* op, boost::system::error_code const&, size_t); + typedef void (*complete_func_type)(reactor_op* op); perform_func_type perform_func_; complete_func_type complete_func_; diff --git a/azmq/detail/receive_op.hpp b/azmq/detail/receive_op.hpp index 1e56bca..9299613 100644 --- a/azmq/detail/receive_op.hpp +++ b/azmq/detail/receive_op.hpp @@ -65,15 +65,9 @@ class receive_buffer_op : public receive_buffer_op_base { , handler_(std::move(handler)) { } - static void do_complete(reactor_op* base, - const boost::system::error_code &, - size_t) { + static void do_complete(reactor_op* base) { auto o = static_cast(base); - auto h = std::move(o->handler_); - auto ec = o->ec_; - auto bt = o->bytes_transferred_; - delete o; - h(ec, bt); + o->handler_(o->ec_, o->bytes_transferred_); } private: @@ -92,16 +86,9 @@ class receive_more_buffer_op : public receive_buffer_op_base(base); - auto h = std::move(o->handler_); - auto ec = o->ec_; - auto bt = o->bytes_transferred_; - auto m = o->more(); - delete o; - h(ec, std::make_pair(bt, m)); + o->handler_(o->ec_, std::make_pair(o->bytes_transferred_, o->more())); } private: @@ -140,16 +127,9 @@ class receive_op : public receive_op_base { , handler_(std::move(handler)) { } - static void do_complete(reactor_op* base, - const boost::system::error_code &, - size_t) { + static void do_complete(reactor_op* base) { auto o = static_cast(base); - auto h = std::move(o->handler_); - auto m = std::move(o->msg_); - auto ec = o->ec_; - auto bt = o->bytes_transferred_; - delete o; - h(ec, m, bt); + o->handler_(o->ec_, o->msg_, o->bytes_transferred_); } private: diff --git a/azmq/detail/send_op.hpp b/azmq/detail/send_op.hpp index aab90aa..6b0721d 100644 --- a/azmq/detail/send_op.hpp +++ b/azmq/detail/send_op.hpp @@ -59,16 +59,9 @@ class send_buffer_op : public send_buffer_op_base { , handler_(std::move(handler)) { } - static void do_complete(reactor_op* base, - const boost::system::error_code &, - size_t) { + static void do_complete(reactor_op* base) { auto o = static_cast(base); - auto h = std::move(o->handler_); - auto ec = o->ec_; - auto bt = o->bytes_transferred_; - delete o; - - h(ec, bt); + o->handler_(o->ec_, o->bytes_transferred_); } private: @@ -110,15 +103,9 @@ class send_op : public send_op_base { , handler_(std::move(handler)) { } - static void do_complete(reactor_op* base, - const boost::system::error_code &, - size_t) { + static void do_complete(reactor_op* base) { auto o = static_cast(base); - auto h = std::move(o->handler_); - auto ec = o->ec_; - auto bt = o->bytes_transferred_; - delete o; - h(ec, bt); + o->handler_(o->ec_, o->bytes_transferred_); } private: diff --git a/azmq/detail/socket_service.hpp b/azmq/detail/socket_service.hpp index 1992804..b6fd84c 100644 --- a/azmq/detail/socket_service.hpp +++ b/azmq/detail/socket_service.hpp @@ -27,7 +27,6 @@ #include #include #include -#include #include #include #include @@ -44,6 +43,7 @@ #include #include #include +#include namespace azmq { namespace detail { @@ -57,12 +57,8 @@ namespace detail { using flags_type = socket_ops::flags_type; using more_result_type = socket_ops::more_result_type; using context_type = context_ops::context_type; - using op_queue_type = boost::intrusive::list, - &reactor_op::member_hook_ - >>; + using reactor_op_ptr = std::unique_ptr; + using op_queue_type = std::deque; using exts_type = boost::container::flat_map; using allow_speculative = opt::boolean(opt::limits::lib_socket_min)>; @@ -91,7 +87,7 @@ namespace detail { exts_type exts_; endpoint_type endpoint_; bool serverish_ = false; - std::array op_queue_; + std::array op_queues_; #ifdef AZMQ_DETAIL_USE_IO_SERVICE void do_open(boost::asio::io_service & ios, @@ -115,8 +111,8 @@ namespace detail { int events_mask() const { static_assert(2 == max_ops, "2 == max_ops"); - return (!op_queue_[read_op].empty() ? ZMQ_POLLIN : 0) - | (!op_queue_[write_op].empty() ? ZMQ_POLLOUT : 0); + return (!op_queues_[read_op].empty() ? ZMQ_POLLIN : 0) + | (!op_queues_[write_op].empty() ? ZMQ_POLLOUT : 0); } bool perform_ops(op_queue_type & ops, boost::system::error_code& ec) { @@ -126,10 +122,9 @@ namespace detail { break; for (size_t i = 0; i != max_ops; ++i) { - if ((evs & filter[i]) && op_queue_[i].front().do_perform(socket_)) { - op_queue_[i].pop_front_and_dispose([&ops](reactor_op * op) { - ops.push_back(*op); - }); + if ((evs & filter[i]) && op_queues_[i].front()->do_perform(socket_)) { + ops.push_back(std::move(op_queues_[i].front())); + op_queues_[i].pop_front(); } } } @@ -143,11 +138,10 @@ namespace detail { void cancel_ops(boost::system::error_code const& ec, op_queue_type & ops) { for (size_t i = 0; i != max_ops; ++i) { - while (!op_queue_[i].empty()) { - op_queue_[i].front().ec_ = ec; - op_queue_[i].pop_front_and_dispose([&ops](reactor_op * op) { - ops.push_back(*op); - }); + while (!op_queues_[i].empty()) { + op_queues_[i].front()->ec_ = ec; + ops.push_back(std::move(op_queues_[i].front())); + op_queues_[i].pop_front(); } } } @@ -477,16 +471,9 @@ namespace detail { return r; } - using reactor_op_ptr = std::unique_ptr; template void enqueue(implementation_type & impl, op_type o, Args&&... args) { - reactor_op_ptr p{ new T(std::forward(args)...) }; - boost::system::error_code ec = enqueue(impl, o, p); - if (ec) { - BOOST_ASSERT_MSG(p, "op ptr"); - p->ec_ = ec; - reactor_op::do_complete(p.release()); - } + enqueue(impl, o, std::unique_ptr(new T(std::forward(args)...))); } boost::system::error_code cancel(implementation_type & impl, @@ -527,8 +514,9 @@ namespace detail { static void cancel_ops(implementation_type & impl) { op_queue_type ops; impl->cancel_ops(reactor_op::canceled(), ops); - while (!ops.empty()) - ops.pop_front_and_dispose(reactor_op::do_complete); + for (const auto &op : ops) { + reactor_op::do_complete(op.get()); + } } using weak_descriptor_ptr = std::weak_ptr; @@ -549,8 +537,9 @@ namespace detail { if (ec) impl->cancel_ops(ec, ops); } - while (!ops.empty()) - ops.pop_front_and_dispose(reactor_op::do_complete); + for (const auto &op : ops) { + reactor_op::do_complete(op.get()); + } } void check_missed_events(implementation_type & impl) @@ -632,8 +621,9 @@ namespace detail { else descriptors_.unregister_descriptor(p); } - while (!ops.empty()) - ops.pop_front_and_dispose(reactor_op::do_complete); + for (const auto &op : ops) { + reactor_op::do_complete(op.get()); + } } static void schedule(descriptor_map & descriptors, implementation_type & impl) { @@ -658,16 +648,17 @@ namespace detail { struct deferred_completion { weak_descriptor_ptr owner_; - reactor_op *op_; + // Using shared_ptr instead of unique_ptr here, so this completion handler is copy-constructible for asio + std::shared_ptr op_; deferred_completion(implementation_type const& owner, reactor_op_ptr op) : owner_(owner) - , op_(op.release()) + , op_(std::move(op)) { } void operator()() { - reactor_op::do_complete(op_); + reactor_op::do_complete(op_.get()); if (auto p = owner_.lock()) { unique_lock l{ *p }; p->in_speculative_completion_ = false; @@ -680,17 +671,20 @@ namespace detail { descriptor_map descriptors_; - boost::system::error_code enqueue(implementation_type & impl, - op_type o, reactor_op_ptr & op) { + void enqueue(implementation_type& impl, op_type o, reactor_op_ptr op) { unique_lock l{ *impl }; boost::system::error_code ec; - if (is_shutdown(impl, o, ec)) - return ec; + if (is_shutdown(impl, o, ec)) { + BOOST_ASSERT_MSG(op, "op ptr"); + op->ec_ = ec; + reactor_op::do_complete(op.get()); + return; + } // we have at most one speculative completion in flight at any time if (impl->allow_speculative_ && !impl->in_speculative_completion_) { // attempt to execute speculatively when the op_queue is empty - if (impl->op_queue_[o].empty()) { + if (impl->op_queues_[o].empty()) { if (op->do_perform(impl->socket_)) { impl->in_speculative_completion_ = true; l.unlock(); @@ -699,11 +693,12 @@ namespace detail { #else get_io_context().post(deferred_completion(impl, std::move(op))); #endif - return ec; + return; } } } - impl->op_queue_[o].push_back(*op.release()); + + impl->op_queues_[o].push_back(std::move(op)); if (!impl->scheduled_) { impl->scheduled_ = true; @@ -711,7 +706,7 @@ namespace detail { } else { check_missed_events(impl); } - return ec; + return; } }; diff --git a/test/socket/main.cpp b/test/socket/main.cpp index a3bfc94..549c65f 100644 --- a/test/socket/main.cpp +++ b/test/socket/main.cpp @@ -693,3 +693,12 @@ TEST_CASE( "Loopback", "[socket]" ) { REQUIRE(s.ec == boost::system::error_code()); REQUIRE(s.ct == ct); } + +TEST_CASE( "socket_service does not call pending completion handlers when destroyed", "[socket]" ) { + boost::asio::io_service ioservice; + azmq::socket socket(ioservice, ZMQ_ROUTER); + socket.bind(subj(BOOST_CURRENT_FUNCTION)); + socket.async_receive([](boost::system::error_code const& ec, azmq::message & msg, size_t bytes_transferred) { + FAIL(); + }); +}