Skip to content

Commit

Permalink
Refactor do_perform & do_complete into virtual methods
Browse files Browse the repository at this point in the history
Since the previous patch, do_complete() no longer deletes the op object,
so now we can turn it into a proper method and simplify the code.
  • Loading branch information
dkl committed Sep 27, 2021
1 parent 1a53ed3 commit 15a4f45
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 102 deletions.
23 changes: 4 additions & 19 deletions azmq/detail/reactor_op.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,35 +22,20 @@ class reactor_op {
using socket_type = socket_ops::socket_type;
using flags_type = socket_ops::flags_type;
boost::system::error_code ec_;
size_t bytes_transferred_;
size_t bytes_transferred_ = 0;

bool do_perform(socket_type & socket) { return perform_func_(this, socket); }
static void do_complete(reactor_op* op) {
op->complete_func_(op);
}
virtual ~reactor_op() = default;
virtual bool do_perform(socket_type& socket) = 0;
virtual void do_complete() = 0;

static boost::system::error_code canceled() { return boost::asio::error::operation_aborted; }

protected:
typedef bool (*perform_func_type)(reactor_op*, socket_type &);
typedef void (*complete_func_type)(reactor_op* op);

perform_func_type perform_func_;
complete_func_type complete_func_;

bool try_again() const {
return ec_.value() == boost::system::errc::resource_unavailable_try_again;
}

bool is_canceled() const { return ec_ == canceled(); }

reactor_op(perform_func_type perform_func,
complete_func_type complete_func)
: bytes_transferred_(0)
, perform_func_(perform_func)
, complete_func_(complete_func)
{ }

};

} // namespace detail
Expand Down
60 changes: 23 additions & 37 deletions azmq/detail/receive_op.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,16 @@ namespace detail {
template<typename MutableBufferSequence>
class receive_buffer_op_base : public reactor_op {
public:
receive_buffer_op_base(MutableBufferSequence const& buffers,
flags_type flags,
complete_func_type complete_func)
: reactor_op(&receive_buffer_op_base::do_perform, complete_func)
, buffers_(buffers)
receive_buffer_op_base(MutableBufferSequence const& buffers, flags_type flags)
: buffers_(buffers)
, flags_(flags)
{ }

static bool do_perform(reactor_op* base, socket_type & socket) {
auto o = static_cast<receive_buffer_op_base*>(base);
o->ec_ = boost::system::error_code();

o->bytes_transferred_ += socket_ops::receive(o->buffers_, socket, o->flags_ | ZMQ_DONTWAIT, o->ec_);
if (o->ec_)
return !o->try_again();
virtual bool do_perform(socket_type& socket) override {
ec_ = boost::system::error_code();
bytes_transferred_ += socket_ops::receive(buffers_, socket, flags_ | ZMQ_DONTWAIT, ec_);
if (ec_)
return !try_again();
return true;
}

Expand All @@ -60,14 +55,12 @@ class receive_buffer_op : public receive_buffer_op_base<MutableBufferSequence> {
receive_buffer_op(MutableBufferSequence const& buffers,
Handler handler,
socket_ops::flags_type flags)
: receive_buffer_op_base<MutableBufferSequence>(buffers, flags,
&receive_buffer_op::do_complete)
: receive_buffer_op_base<MutableBufferSequence>(buffers, flags)
, handler_(std::move(handler))
{ }

static void do_complete(reactor_op* base) {
auto o = static_cast<receive_buffer_op*>(base);
o->handler_(o->ec_, o->bytes_transferred_);
virtual void do_complete() override {
handler_(this->ec_, this->bytes_transferred_);
}

private:
Expand All @@ -81,14 +74,12 @@ class receive_more_buffer_op : public receive_buffer_op_base<MutableBufferSequen
receive_more_buffer_op(MutableBufferSequence const& buffers,
Handler handler,
socket_ops::flags_type flags)
: receive_buffer_op_base<MutableBufferSequence>(buffers, flags,
&receive_more_buffer_op::do_complete)
: receive_buffer_op_base<MutableBufferSequence>(buffers, flags)
, handler_(std::move(handler))
{ }

static void do_complete(reactor_op* base) {
auto o = static_cast<receive_more_buffer_op*>(base);
o->handler_(o->ec_, std::make_pair(o->bytes_transferred_, o->more()));
virtual void do_complete() override {
handler_(this->ec_, std::make_pair(this->bytes_transferred_, this->more()));
}

private:
Expand All @@ -97,19 +88,15 @@ class receive_more_buffer_op : public receive_buffer_op_base<MutableBufferSequen

class receive_op_base : public reactor_op {
public:
receive_op_base(socket_ops::flags_type flags,
complete_func_type complete_func)
: reactor_op(&receive_op_base::do_perform, complete_func)
, flags_(flags)
receive_op_base(socket_ops::flags_type flags)
: flags_(flags)
{ }

static bool do_perform(reactor_op* base, socket_type & socket) {
auto o = static_cast<receive_op_base*>(base);
o->ec_ = boost::system::error_code();

o->bytes_transferred_ = socket_ops::receive(o->msg_, socket, o->flags_ | ZMQ_DONTWAIT, o->ec_);
if (o->ec_)
return !o->try_again();
virtual bool do_perform(socket_type& socket) override {
ec_ = boost::system::error_code();
bytes_transferred_ = socket_ops::receive(msg_, socket, flags_ | ZMQ_DONTWAIT, ec_);
if (ec_)
return !try_again();
return true;
}

Expand All @@ -123,13 +110,12 @@ class receive_op : public receive_op_base {
public:
receive_op(Handler handler,
socket_ops::flags_type flags)
: receive_op_base(flags, &receive_op::do_complete)
: receive_op_base(flags)
, handler_(std::move(handler))
{ }

static void do_complete(reactor_op* base) {
auto o = static_cast<receive_op*>(base);
o->handler_(o->ec_, o->msg_, o->bytes_transferred_);
virtual void do_complete() override {
handler_(ec_, msg_, bytes_transferred_);
}

private:
Expand Down
52 changes: 20 additions & 32 deletions azmq/detail/send_op.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,16 @@ namespace detail {
template<typename ConstBufferSequence>
class send_buffer_op_base : public reactor_op {
public:
send_buffer_op_base(ConstBufferSequence const& buffers,
flags_type flags,
complete_func_type complete_func)
: reactor_op(&send_buffer_op_base::do_perform, complete_func)
, buffers_(buffers)
send_buffer_op_base(ConstBufferSequence const& buffers, flags_type flags)
: buffers_(buffers)
, flags_(flags)
{ }

static bool do_perform(reactor_op* base, socket_type & socket) {
auto o = static_cast<send_buffer_op_base*>(base);
o->ec_ = boost::system::error_code();
o->bytes_transferred_ += socket_ops::send(o->buffers_, socket, o->flags_ | ZMQ_DONTWAIT, o->ec_);
if (o->ec_) {
return !o->try_again();
virtual bool do_perform(socket_type& socket) override {
ec_ = boost::system::error_code();
bytes_transferred_ += socket_ops::send(buffers_, socket, flags_ | ZMQ_DONTWAIT, ec_);
if (ec_) {
return !try_again();
}
return true;
}
Expand All @@ -54,14 +50,12 @@ class send_buffer_op : public send_buffer_op_base<ConstBufferSequence> {
send_buffer_op(ConstBufferSequence const& buffers,
Handler handler,
reactor_op::flags_type flags)
: send_buffer_op_base<ConstBufferSequence>(buffers, flags,
&send_buffer_op::do_complete)
: send_buffer_op_base<ConstBufferSequence>(buffers, flags)
, handler_(std::move(handler))
{ }

static void do_complete(reactor_op* base) {
auto o = static_cast<send_buffer_op*>(base);
o->handler_(o->ec_, o->bytes_transferred_);
virtual void do_complete() override {
handler_(this->ec_, this->bytes_transferred_);
}

private:
Expand All @@ -70,21 +64,16 @@ class send_buffer_op : public send_buffer_op_base<ConstBufferSequence> {

class send_op_base : public reactor_op {
public:
send_op_base(message msg,
flags_type flags,
complete_func_type complete_func)
: reactor_op(&send_op_base::do_perform, complete_func)
, msg_(std::move(msg))
send_op_base(message msg, flags_type flags)
: msg_(std::move(msg))
, flags_(flags)
{ }

static bool do_perform(reactor_op* base, socket_type & socket) {
auto o = static_cast<send_op_base*>(base);
o->ec_ = boost::system::error_code();
o->bytes_transferred_ = socket_ops::send(o->msg_, socket, o->flags_ | ZMQ_DONTWAIT, o->ec_);

if (o->ec_)
return !o->try_again(); // some other error
virtual bool do_perform(socket_type & socket) override {
ec_ = boost::system::error_code();
bytes_transferred_ = socket_ops::send(msg_, socket, flags_ | ZMQ_DONTWAIT, ec_);
if (ec_)
return !try_again(); // some other error
return true;
};

Expand All @@ -99,13 +88,12 @@ class send_op : public send_op_base {
send_op(message msg,
Handler handler,
flags_type flags)
: send_op_base(std::move(msg), flags, &send_op::do_complete)
: send_op_base(std::move(msg), flags)
, handler_(std::move(handler))
{ }

static void do_complete(reactor_op* base) {
auto o = static_cast<send_op*>(base);
o->handler_(o->ec_, o->bytes_transferred_);
virtual void do_complete() override {
handler_(ec_, bytes_transferred_);
}

private:
Expand Down
29 changes: 15 additions & 14 deletions azmq/detail/socket_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ namespace detail {
| (!op_queues_[write_op].empty() ? ZMQ_POLLOUT : 0);
}

bool perform_ops(op_queue_type & ops, boost::system::error_code& ec) {
bool perform_ops(op_queue_type& ops, boost::system::error_code& ec) {
const int filter[max_ops] = { ZMQ_POLLIN, ZMQ_POLLOUT };
while(int evs = socket_ops::get_events(socket_, ec)& events_mask()) {
if (ec)
Expand Down Expand Up @@ -511,12 +511,16 @@ namespace detail {
: what >= shutdown_type::receive;
}

static void complete_ops(const op_queue_type &ops) {
for (const auto &op : ops) {
op->do_complete();
}
}

static void cancel_ops(implementation_type & impl) {
op_queue_type ops;
impl->cancel_ops(reactor_op::canceled(), ops);
for (const auto &op : ops) {
reactor_op::do_complete(op.get());
}
complete_ops(ops);
}

using weak_descriptor_ptr = std::weak_ptr<per_descriptor_data>;
Expand All @@ -537,9 +541,7 @@ namespace detail {
if (ec)
impl->cancel_ops(ec, ops);
}
for (const auto &op : ops) {
reactor_op::do_complete(op.get());
}
complete_ops(ops);
}

void check_missed_events(implementation_type & impl)
Expand Down Expand Up @@ -621,9 +623,7 @@ namespace detail {
else
descriptors_.unregister_descriptor(p);
}
for (const auto &op : ops) {
reactor_op::do_complete(op.get());
}
complete_ops(ops);
}

static void schedule(descriptor_map & descriptors, implementation_type & impl) {
Expand Down Expand Up @@ -658,7 +658,7 @@ namespace detail {
{ }

void operator()() {
reactor_op::do_complete(op_.get());
op_->do_complete();
if (auto p = owner_.lock()) {
unique_lock l{ *p };
p->in_speculative_completion_ = false;
Expand All @@ -677,7 +677,7 @@ namespace detail {
if (is_shutdown(impl, o, ec)) {
BOOST_ASSERT_MSG(op, "op ptr");
op->ec_ = ec;
reactor_op::do_complete(op.get());
op->do_complete();
return;
}

Expand All @@ -689,10 +689,11 @@ namespace detail {
impl->in_speculative_completion_ = true;
l.unlock();
#ifdef AZMQ_DETAIL_USE_IO_SERVICE
get_io_service().post(deferred_completion(impl, std::move(op)));
get_io_service()
#else
get_io_context().post(deferred_completion(impl, std::move(op)));
get_io_context()
#endif
.post(deferred_completion(impl, std::move(op)));
return;
}
}
Expand Down

0 comments on commit 15a4f45

Please sign in to comment.