From 1a53ed375070d86d25cddb3b54c16a4552e77c24 Mon Sep 17 00:00:00 2001 From: Daniel Klauer Date: Fri, 29 May 2020 10:51:57 +0200 Subject: [PATCH 1/2] Fix leak of remaining pending operations during io_service destruction Previously, ops were tracked in boost::intrusive::lists and deleted by unlinking from the list and calling do_complete, which did the "delete". However, any remaining ops in the queues during destruction of per_descriptor_data were not freed, because boost::intrusive::list destruction only unlinks and does not free the nodes. We need to free the reactor_ops remaining in the queue, but without calling the user-supplied completion handlers, because, for example, we want to prevent new enqueue() calls from coming in during io_service destruction. This matches boost::asio behaviour: "Uninvoked handler objects that were scheduled for deferred invocation on the io_context, or any associated strand, are destroyed." (https://www.boost.org/doc/libs/1_74_0/doc/html/boost_asio/reference/io_context/_io_context.html) Thus freeing needs to be separated from do_complete(). This patch refactors the op/queue memory management to fix the leak, and separates the memory management from the do_complete() action. Now, ops are tracked with std::deque + std::unique_ptr everywhere. They no longer delete themselves in do_complete(), but instead we rely on automatic destructor calls to free the memory. do_complete() calls are still done in the same place as before, but now they only call the completion handlers and leave the freeing to the destructors. --- azmq/detail/reactor_op.hpp | 8 ++-- azmq/detail/receive_op.hpp | 32 +++---------- azmq/detail/send_op.hpp | 21 ++------- azmq/detail/socket_service.hpp | 83 ++++++++++++++++------------------ test/socket/main.cpp | 9 ++++ 5 files changed, 61 insertions(+), 92 deletions(-) diff --git a/azmq/detail/reactor_op.hpp b/azmq/detail/reactor_op.hpp index 12af965..120e447 100644 --- a/azmq/detail/reactor_op.hpp +++ b/azmq/detail/reactor_op.hpp @@ -14,7 +14,6 @@ #include #include -#include namespace azmq { namespace detail { @@ -22,20 +21,19 @@ class reactor_op { public: using socket_type = socket_ops::socket_type; using flags_type = socket_ops::flags_type; - boost::intrusive::list_member_hook<> member_hook_; boost::system::error_code ec_; size_t bytes_transferred_; bool do_perform(socket_type & socket) { return perform_func_(this, socket); } - static void do_complete(reactor_op * op) { - op->complete_func_(op, op->ec_, op->bytes_transferred_); + static void do_complete(reactor_op* op) { + op->complete_func_(op); } static boost::system::error_code canceled() { return boost::asio::error::operation_aborted; } protected: typedef bool (*perform_func_type)(reactor_op*, socket_type &); - typedef void (*complete_func_type)(reactor_op* op, boost::system::error_code const&, size_t); + typedef void (*complete_func_type)(reactor_op* op); perform_func_type perform_func_; complete_func_type complete_func_; diff --git a/azmq/detail/receive_op.hpp b/azmq/detail/receive_op.hpp index 1e56bca..9299613 100644 --- a/azmq/detail/receive_op.hpp +++ b/azmq/detail/receive_op.hpp @@ -65,15 +65,9 @@ class receive_buffer_op : public receive_buffer_op_base { , handler_(std::move(handler)) { } - static void do_complete(reactor_op* base, - const boost::system::error_code &, - size_t) { + static void do_complete(reactor_op* base) { auto o = static_cast(base); - auto h = std::move(o->handler_); - auto ec = o->ec_; - auto bt = o->bytes_transferred_; - delete o; - h(ec, bt); + o->handler_(o->ec_, o->bytes_transferred_); } private: @@ -92,16 +86,9 @@ class receive_more_buffer_op : public receive_buffer_op_base(base); - auto h = std::move(o->handler_); - auto ec = o->ec_; - auto bt = o->bytes_transferred_; - auto m = o->more(); - delete o; - h(ec, std::make_pair(bt, m)); + o->handler_(o->ec_, std::make_pair(o->bytes_transferred_, o->more())); } private: @@ -140,16 +127,9 @@ class receive_op : public receive_op_base { , handler_(std::move(handler)) { } - static void do_complete(reactor_op* base, - const boost::system::error_code &, - size_t) { + static void do_complete(reactor_op* base) { auto o = static_cast(base); - auto h = std::move(o->handler_); - auto m = std::move(o->msg_); - auto ec = o->ec_; - auto bt = o->bytes_transferred_; - delete o; - h(ec, m, bt); + o->handler_(o->ec_, o->msg_, o->bytes_transferred_); } private: diff --git a/azmq/detail/send_op.hpp b/azmq/detail/send_op.hpp index aab90aa..6b0721d 100644 --- a/azmq/detail/send_op.hpp +++ b/azmq/detail/send_op.hpp @@ -59,16 +59,9 @@ class send_buffer_op : public send_buffer_op_base { , handler_(std::move(handler)) { } - static void do_complete(reactor_op* base, - const boost::system::error_code &, - size_t) { + static void do_complete(reactor_op* base) { auto o = static_cast(base); - auto h = std::move(o->handler_); - auto ec = o->ec_; - auto bt = o->bytes_transferred_; - delete o; - - h(ec, bt); + o->handler_(o->ec_, o->bytes_transferred_); } private: @@ -110,15 +103,9 @@ class send_op : public send_op_base { , handler_(std::move(handler)) { } - static void do_complete(reactor_op* base, - const boost::system::error_code &, - size_t) { + static void do_complete(reactor_op* base) { auto o = static_cast(base); - auto h = std::move(o->handler_); - auto ec = o->ec_; - auto bt = o->bytes_transferred_; - delete o; - h(ec, bt); + o->handler_(o->ec_, o->bytes_transferred_); } private: diff --git a/azmq/detail/socket_service.hpp b/azmq/detail/socket_service.hpp index 1992804..b6fd84c 100644 --- a/azmq/detail/socket_service.hpp +++ b/azmq/detail/socket_service.hpp @@ -27,7 +27,6 @@ #include #include #include -#include #include #include #include @@ -44,6 +43,7 @@ #include #include #include +#include namespace azmq { namespace detail { @@ -57,12 +57,8 @@ namespace detail { using flags_type = socket_ops::flags_type; using more_result_type = socket_ops::more_result_type; using context_type = context_ops::context_type; - using op_queue_type = boost::intrusive::list, - &reactor_op::member_hook_ - >>; + using reactor_op_ptr = std::unique_ptr; + using op_queue_type = std::deque; using exts_type = boost::container::flat_map; using allow_speculative = opt::boolean(opt::limits::lib_socket_min)>; @@ -91,7 +87,7 @@ namespace detail { exts_type exts_; endpoint_type endpoint_; bool serverish_ = false; - std::array op_queue_; + std::array op_queues_; #ifdef AZMQ_DETAIL_USE_IO_SERVICE void do_open(boost::asio::io_service & ios, @@ -115,8 +111,8 @@ namespace detail { int events_mask() const { static_assert(2 == max_ops, "2 == max_ops"); - return (!op_queue_[read_op].empty() ? ZMQ_POLLIN : 0) - | (!op_queue_[write_op].empty() ? ZMQ_POLLOUT : 0); + return (!op_queues_[read_op].empty() ? ZMQ_POLLIN : 0) + | (!op_queues_[write_op].empty() ? ZMQ_POLLOUT : 0); } bool perform_ops(op_queue_type & ops, boost::system::error_code& ec) { @@ -126,10 +122,9 @@ namespace detail { break; for (size_t i = 0; i != max_ops; ++i) { - if ((evs & filter[i]) && op_queue_[i].front().do_perform(socket_)) { - op_queue_[i].pop_front_and_dispose([&ops](reactor_op * op) { - ops.push_back(*op); - }); + if ((evs & filter[i]) && op_queues_[i].front()->do_perform(socket_)) { + ops.push_back(std::move(op_queues_[i].front())); + op_queues_[i].pop_front(); } } } @@ -143,11 +138,10 @@ namespace detail { void cancel_ops(boost::system::error_code const& ec, op_queue_type & ops) { for (size_t i = 0; i != max_ops; ++i) { - while (!op_queue_[i].empty()) { - op_queue_[i].front().ec_ = ec; - op_queue_[i].pop_front_and_dispose([&ops](reactor_op * op) { - ops.push_back(*op); - }); + while (!op_queues_[i].empty()) { + op_queues_[i].front()->ec_ = ec; + ops.push_back(std::move(op_queues_[i].front())); + op_queues_[i].pop_front(); } } } @@ -477,16 +471,9 @@ namespace detail { return r; } - using reactor_op_ptr = std::unique_ptr; template void enqueue(implementation_type & impl, op_type o, Args&&... args) { - reactor_op_ptr p{ new T(std::forward(args)...) }; - boost::system::error_code ec = enqueue(impl, o, p); - if (ec) { - BOOST_ASSERT_MSG(p, "op ptr"); - p->ec_ = ec; - reactor_op::do_complete(p.release()); - } + enqueue(impl, o, std::unique_ptr(new T(std::forward(args)...))); } boost::system::error_code cancel(implementation_type & impl, @@ -527,8 +514,9 @@ namespace detail { static void cancel_ops(implementation_type & impl) { op_queue_type ops; impl->cancel_ops(reactor_op::canceled(), ops); - while (!ops.empty()) - ops.pop_front_and_dispose(reactor_op::do_complete); + for (const auto &op : ops) { + reactor_op::do_complete(op.get()); + } } using weak_descriptor_ptr = std::weak_ptr; @@ -549,8 +537,9 @@ namespace detail { if (ec) impl->cancel_ops(ec, ops); } - while (!ops.empty()) - ops.pop_front_and_dispose(reactor_op::do_complete); + for (const auto &op : ops) { + reactor_op::do_complete(op.get()); + } } void check_missed_events(implementation_type & impl) @@ -632,8 +621,9 @@ namespace detail { else descriptors_.unregister_descriptor(p); } - while (!ops.empty()) - ops.pop_front_and_dispose(reactor_op::do_complete); + for (const auto &op : ops) { + reactor_op::do_complete(op.get()); + } } static void schedule(descriptor_map & descriptors, implementation_type & impl) { @@ -658,16 +648,17 @@ namespace detail { struct deferred_completion { weak_descriptor_ptr owner_; - reactor_op *op_; + // Using shared_ptr instead of unique_ptr here, so this completion handler is copy-constructible for asio + std::shared_ptr op_; deferred_completion(implementation_type const& owner, reactor_op_ptr op) : owner_(owner) - , op_(op.release()) + , op_(std::move(op)) { } void operator()() { - reactor_op::do_complete(op_); + reactor_op::do_complete(op_.get()); if (auto p = owner_.lock()) { unique_lock l{ *p }; p->in_speculative_completion_ = false; @@ -680,17 +671,20 @@ namespace detail { descriptor_map descriptors_; - boost::system::error_code enqueue(implementation_type & impl, - op_type o, reactor_op_ptr & op) { + void enqueue(implementation_type& impl, op_type o, reactor_op_ptr op) { unique_lock l{ *impl }; boost::system::error_code ec; - if (is_shutdown(impl, o, ec)) - return ec; + if (is_shutdown(impl, o, ec)) { + BOOST_ASSERT_MSG(op, "op ptr"); + op->ec_ = ec; + reactor_op::do_complete(op.get()); + return; + } // we have at most one speculative completion in flight at any time if (impl->allow_speculative_ && !impl->in_speculative_completion_) { // attempt to execute speculatively when the op_queue is empty - if (impl->op_queue_[o].empty()) { + if (impl->op_queues_[o].empty()) { if (op->do_perform(impl->socket_)) { impl->in_speculative_completion_ = true; l.unlock(); @@ -699,11 +693,12 @@ namespace detail { #else get_io_context().post(deferred_completion(impl, std::move(op))); #endif - return ec; + return; } } } - impl->op_queue_[o].push_back(*op.release()); + + impl->op_queues_[o].push_back(std::move(op)); if (!impl->scheduled_) { impl->scheduled_ = true; @@ -711,7 +706,7 @@ namespace detail { } else { check_missed_events(impl); } - return ec; + return; } }; diff --git a/test/socket/main.cpp b/test/socket/main.cpp index a3bfc94..549c65f 100644 --- a/test/socket/main.cpp +++ b/test/socket/main.cpp @@ -693,3 +693,12 @@ TEST_CASE( "Loopback", "[socket]" ) { REQUIRE(s.ec == boost::system::error_code()); REQUIRE(s.ct == ct); } + +TEST_CASE( "socket_service does not call pending completion handlers when destroyed", "[socket]" ) { + boost::asio::io_service ioservice; + azmq::socket socket(ioservice, ZMQ_ROUTER); + socket.bind(subj(BOOST_CURRENT_FUNCTION)); + socket.async_receive([](boost::system::error_code const& ec, azmq::message & msg, size_t bytes_transferred) { + FAIL(); + }); +} From 15a4f45ee92f57ed1938b885bcba876b97ba5967 Mon Sep 17 00:00:00 2001 From: Daniel Klauer Date: Wed, 28 Oct 2020 15:31:21 +0100 Subject: [PATCH 2/2] Refactor do_perform & do_complete into virtual methods Since the previous patch, do_complete() no longer deletes the op object, so now we can turn it into a proper method and simplify the code. --- azmq/detail/reactor_op.hpp | 23 +++---------- azmq/detail/receive_op.hpp | 60 +++++++++++++--------------------- azmq/detail/send_op.hpp | 52 ++++++++++++----------------- azmq/detail/socket_service.hpp | 29 ++++++++-------- 4 files changed, 62 insertions(+), 102 deletions(-) diff --git a/azmq/detail/reactor_op.hpp b/azmq/detail/reactor_op.hpp index 120e447..1e5f632 100644 --- a/azmq/detail/reactor_op.hpp +++ b/azmq/detail/reactor_op.hpp @@ -22,35 +22,20 @@ class reactor_op { using socket_type = socket_ops::socket_type; using flags_type = socket_ops::flags_type; boost::system::error_code ec_; - size_t bytes_transferred_; + size_t bytes_transferred_ = 0; - bool do_perform(socket_type & socket) { return perform_func_(this, socket); } - static void do_complete(reactor_op* op) { - op->complete_func_(op); - } + virtual ~reactor_op() = default; + virtual bool do_perform(socket_type& socket) = 0; + virtual void do_complete() = 0; static boost::system::error_code canceled() { return boost::asio::error::operation_aborted; } protected: - typedef bool (*perform_func_type)(reactor_op*, socket_type &); - typedef void (*complete_func_type)(reactor_op* op); - - perform_func_type perform_func_; - complete_func_type complete_func_; - bool try_again() const { return ec_.value() == boost::system::errc::resource_unavailable_try_again; } bool is_canceled() const { return ec_ == canceled(); } - - reactor_op(perform_func_type perform_func, - complete_func_type complete_func) - : bytes_transferred_(0) - , perform_func_(perform_func) - , complete_func_(complete_func) - { } - }; } // namespace detail diff --git a/azmq/detail/receive_op.hpp b/azmq/detail/receive_op.hpp index 9299613..b65c33c 100644 --- a/azmq/detail/receive_op.hpp +++ b/azmq/detail/receive_op.hpp @@ -25,21 +25,16 @@ namespace detail { template class receive_buffer_op_base : public reactor_op { public: - receive_buffer_op_base(MutableBufferSequence const& buffers, - flags_type flags, - complete_func_type complete_func) - : reactor_op(&receive_buffer_op_base::do_perform, complete_func) - , buffers_(buffers) + receive_buffer_op_base(MutableBufferSequence const& buffers, flags_type flags) + : buffers_(buffers) , flags_(flags) { } - static bool do_perform(reactor_op* base, socket_type & socket) { - auto o = static_cast(base); - o->ec_ = boost::system::error_code(); - - o->bytes_transferred_ += socket_ops::receive(o->buffers_, socket, o->flags_ | ZMQ_DONTWAIT, o->ec_); - if (o->ec_) - return !o->try_again(); + virtual bool do_perform(socket_type& socket) override { + ec_ = boost::system::error_code(); + bytes_transferred_ += socket_ops::receive(buffers_, socket, flags_ | ZMQ_DONTWAIT, ec_); + if (ec_) + return !try_again(); return true; } @@ -60,14 +55,12 @@ class receive_buffer_op : public receive_buffer_op_base { receive_buffer_op(MutableBufferSequence const& buffers, Handler handler, socket_ops::flags_type flags) - : receive_buffer_op_base(buffers, flags, - &receive_buffer_op::do_complete) + : receive_buffer_op_base(buffers, flags) , handler_(std::move(handler)) { } - static void do_complete(reactor_op* base) { - auto o = static_cast(base); - o->handler_(o->ec_, o->bytes_transferred_); + virtual void do_complete() override { + handler_(this->ec_, this->bytes_transferred_); } private: @@ -81,14 +74,12 @@ class receive_more_buffer_op : public receive_buffer_op_base(buffers, flags, - &receive_more_buffer_op::do_complete) + : receive_buffer_op_base(buffers, flags) , handler_(std::move(handler)) { } - static void do_complete(reactor_op* base) { - auto o = static_cast(base); - o->handler_(o->ec_, std::make_pair(o->bytes_transferred_, o->more())); + virtual void do_complete() override { + handler_(this->ec_, std::make_pair(this->bytes_transferred_, this->more())); } private: @@ -97,19 +88,15 @@ class receive_more_buffer_op : public receive_buffer_op_base(base); - o->ec_ = boost::system::error_code(); - - o->bytes_transferred_ = socket_ops::receive(o->msg_, socket, o->flags_ | ZMQ_DONTWAIT, o->ec_); - if (o->ec_) - return !o->try_again(); + virtual bool do_perform(socket_type& socket) override { + ec_ = boost::system::error_code(); + bytes_transferred_ = socket_ops::receive(msg_, socket, flags_ | ZMQ_DONTWAIT, ec_); + if (ec_) + return !try_again(); return true; } @@ -123,13 +110,12 @@ class receive_op : public receive_op_base { public: receive_op(Handler handler, socket_ops::flags_type flags) - : receive_op_base(flags, &receive_op::do_complete) + : receive_op_base(flags) , handler_(std::move(handler)) { } - static void do_complete(reactor_op* base) { - auto o = static_cast(base); - o->handler_(o->ec_, o->msg_, o->bytes_transferred_); + virtual void do_complete() override { + handler_(ec_, msg_, bytes_transferred_); } private: diff --git a/azmq/detail/send_op.hpp b/azmq/detail/send_op.hpp index 6b0721d..4823b55 100644 --- a/azmq/detail/send_op.hpp +++ b/azmq/detail/send_op.hpp @@ -24,20 +24,16 @@ namespace detail { template class send_buffer_op_base : public reactor_op { public: - send_buffer_op_base(ConstBufferSequence const& buffers, - flags_type flags, - complete_func_type complete_func) - : reactor_op(&send_buffer_op_base::do_perform, complete_func) - , buffers_(buffers) + send_buffer_op_base(ConstBufferSequence const& buffers, flags_type flags) + : buffers_(buffers) , flags_(flags) { } - static bool do_perform(reactor_op* base, socket_type & socket) { - auto o = static_cast(base); - o->ec_ = boost::system::error_code(); - o->bytes_transferred_ += socket_ops::send(o->buffers_, socket, o->flags_ | ZMQ_DONTWAIT, o->ec_); - if (o->ec_) { - return !o->try_again(); + virtual bool do_perform(socket_type& socket) override { + ec_ = boost::system::error_code(); + bytes_transferred_ += socket_ops::send(buffers_, socket, flags_ | ZMQ_DONTWAIT, ec_); + if (ec_) { + return !try_again(); } return true; } @@ -54,14 +50,12 @@ class send_buffer_op : public send_buffer_op_base { send_buffer_op(ConstBufferSequence const& buffers, Handler handler, reactor_op::flags_type flags) - : send_buffer_op_base(buffers, flags, - &send_buffer_op::do_complete) + : send_buffer_op_base(buffers, flags) , handler_(std::move(handler)) { } - static void do_complete(reactor_op* base) { - auto o = static_cast(base); - o->handler_(o->ec_, o->bytes_transferred_); + virtual void do_complete() override { + handler_(this->ec_, this->bytes_transferred_); } private: @@ -70,21 +64,16 @@ class send_buffer_op : public send_buffer_op_base { class send_op_base : public reactor_op { public: - send_op_base(message msg, - flags_type flags, - complete_func_type complete_func) - : reactor_op(&send_op_base::do_perform, complete_func) - , msg_(std::move(msg)) + send_op_base(message msg, flags_type flags) + : msg_(std::move(msg)) , flags_(flags) { } - static bool do_perform(reactor_op* base, socket_type & socket) { - auto o = static_cast(base); - o->ec_ = boost::system::error_code(); - o->bytes_transferred_ = socket_ops::send(o->msg_, socket, o->flags_ | ZMQ_DONTWAIT, o->ec_); - - if (o->ec_) - return !o->try_again(); // some other error + virtual bool do_perform(socket_type & socket) override { + ec_ = boost::system::error_code(); + bytes_transferred_ = socket_ops::send(msg_, socket, flags_ | ZMQ_DONTWAIT, ec_); + if (ec_) + return !try_again(); // some other error return true; }; @@ -99,13 +88,12 @@ class send_op : public send_op_base { send_op(message msg, Handler handler, flags_type flags) - : send_op_base(std::move(msg), flags, &send_op::do_complete) + : send_op_base(std::move(msg), flags) , handler_(std::move(handler)) { } - static void do_complete(reactor_op* base) { - auto o = static_cast(base); - o->handler_(o->ec_, o->bytes_transferred_); + virtual void do_complete() override { + handler_(ec_, bytes_transferred_); } private: diff --git a/azmq/detail/socket_service.hpp b/azmq/detail/socket_service.hpp index b6fd84c..8250ee0 100644 --- a/azmq/detail/socket_service.hpp +++ b/azmq/detail/socket_service.hpp @@ -115,7 +115,7 @@ namespace detail { | (!op_queues_[write_op].empty() ? ZMQ_POLLOUT : 0); } - bool perform_ops(op_queue_type & ops, boost::system::error_code& ec) { + bool perform_ops(op_queue_type& ops, boost::system::error_code& ec) { const int filter[max_ops] = { ZMQ_POLLIN, ZMQ_POLLOUT }; while(int evs = socket_ops::get_events(socket_, ec)& events_mask()) { if (ec) @@ -511,12 +511,16 @@ namespace detail { : what >= shutdown_type::receive; } + static void complete_ops(const op_queue_type &ops) { + for (const auto &op : ops) { + op->do_complete(); + } + } + static void cancel_ops(implementation_type & impl) { op_queue_type ops; impl->cancel_ops(reactor_op::canceled(), ops); - for (const auto &op : ops) { - reactor_op::do_complete(op.get()); - } + complete_ops(ops); } using weak_descriptor_ptr = std::weak_ptr; @@ -537,9 +541,7 @@ namespace detail { if (ec) impl->cancel_ops(ec, ops); } - for (const auto &op : ops) { - reactor_op::do_complete(op.get()); - } + complete_ops(ops); } void check_missed_events(implementation_type & impl) @@ -621,9 +623,7 @@ namespace detail { else descriptors_.unregister_descriptor(p); } - for (const auto &op : ops) { - reactor_op::do_complete(op.get()); - } + complete_ops(ops); } static void schedule(descriptor_map & descriptors, implementation_type & impl) { @@ -658,7 +658,7 @@ namespace detail { { } void operator()() { - reactor_op::do_complete(op_.get()); + op_->do_complete(); if (auto p = owner_.lock()) { unique_lock l{ *p }; p->in_speculative_completion_ = false; @@ -677,7 +677,7 @@ namespace detail { if (is_shutdown(impl, o, ec)) { BOOST_ASSERT_MSG(op, "op ptr"); op->ec_ = ec; - reactor_op::do_complete(op.get()); + op->do_complete(); return; } @@ -689,10 +689,11 @@ namespace detail { impl->in_speculative_completion_ = true; l.unlock(); #ifdef AZMQ_DETAIL_USE_IO_SERVICE - get_io_service().post(deferred_completion(impl, std::move(op))); + get_io_service() #else - get_io_context().post(deferred_completion(impl, std::move(op))); + get_io_context() #endif + .post(deferred_completion(impl, std::move(op))); return; } }