diff --git a/tests/monitor.cpp b/tests/monitor.cpp index bb54ca16..dc06670a 100644 --- a/tests/monitor.cpp +++ b/tests/monitor.cpp @@ -74,7 +74,7 @@ TEST_CASE("monitor move assign", "[monitor]") } } -TEST_CASE("monitor init event count", "[monitor]") +TEST_CASE("monitor init check event count", "[monitor]") { common_server_client_setup s{false}; mock_monitor_t monitor; @@ -92,6 +92,93 @@ TEST_CASE("monitor init event count", "[monitor]") CHECK(monitor.total == expected_event_count); } +TEST_CASE("monitor init get event count", "[monitor]") +{ + common_server_client_setup s{ false }; + zmq::monitor_t monitor; + + const int expected_event_count = 2; + monitor.init(s.client, "inproc://foo"); + + int total{ 0 }; + int connect_delayed{ 0 }; + int connected{ 0 }; + + auto lbd_count_event = [&](const zmq_event_t& event) { + switch (event.event) + { + case ZMQ_EVENT_CONNECT_DELAYED: + connect_delayed++; + total++; + break; + + case ZMQ_EVENT_CONNECTED: + connected++; + total++; + break; + } + }; + + zmq_event_t eventMsg; + std::string address; + CHECK_FALSE(monitor.get_event(eventMsg, address, zmq::recv_flags::dontwait)); + s.init(); + + SECTION("get_event") + { + while (total < expected_event_count) + { + if (!monitor.get_event(eventMsg, address)) + continue; + + lbd_count_event(eventMsg); + } + + } + + SECTION("poll get_event") + { + while (total < expected_event_count) + { + zmq::pollitem_t items[] = { + { monitor.handle(), 0, ZMQ_POLLIN, 0 }, + }; + + zmq::poll(&items[0], 1, 100); + + if (!(items[0].revents & ZMQ_POLLIN)) { + continue; + } + + CHECK(monitor.get_event(eventMsg, address)); + + lbd_count_event(eventMsg); + } + } + + SECTION("poller_t get_event") + { + zmq::poller_t<> poller; + CHECK_NOTHROW(poller.add(monitor, zmq::event_flags::pollin)); + + while (total < expected_event_count) + { + std::vector> events(1); + if(0 == poller.wait_all(events, std::chrono::milliseconds{ 100 })) + continue; + + CHECK(zmq::event_flags::pollin == events[0].events); + CHECK(monitor.get_event(eventMsg, address)); + + lbd_count_event(eventMsg); + } + } + + CHECK(connect_delayed == 1); + CHECK(connected == 1); + CHECK(total == expected_event_count); +} + TEST_CASE("monitor init abort", "[monitor]") { class mock_monitor : public mock_monitor_t diff --git a/zmq.hpp b/zmq.hpp index 84457d38..82dec4ea 100644 --- a/zmq.hpp +++ b/zmq.hpp @@ -2224,6 +2224,63 @@ class monitor_t on_monitor_started(); } + operator void *() ZMQ_NOTHROW { return handle(); } + + operator void const *() const ZMQ_NOTHROW { return handle(); } + + ZMQ_NODISCARD void *handle() ZMQ_NOTHROW { return _monitor_socket.handle(); } + + ZMQ_NODISCARD const void *handle() const ZMQ_NOTHROW { return _monitor_socket.handle(); } + + operator socket_ref() ZMQ_NOTHROW { return (zmq::socket_ref) _monitor_socket; } + +#if ZMQ_VERSION_MAJOR >= 4 + bool get_event(zmq_event_t& eventMsg, std::string& address, zmq::recv_flags flags = zmq::recv_flags::none) + { + assert(_monitor_socket); + + eventMsg.event = 0; + eventMsg.value = 0; + address = std::string(); + + { + message_t msg; + int rc = zmq_msg_recv(msg.handle(), _monitor_socket.handle(), + static_cast(flags)); + + if (rc == -1) + { + if (zmq_errno() == ETERM || zmq_errno() == EAGAIN) + return false; + else + throw error_t(); + } + + const char *data = msg.data(); + memcpy(&eventMsg.event, data, sizeof(uint16_t)); + data += sizeof(uint16_t); + memcpy(&eventMsg.value, data, sizeof(int32_t)); + } + + message_t addrMsg; + int rc = zmq_msg_recv(addrMsg.handle(), _monitor_socket.handle(), + static_cast(flags)); + + if (rc == -1) + { + if (zmq_errno() == ETERM) + return false; + else + throw error_t(); + } + + const char *str = addrMsg.data(); + address = std::string(str, str + addrMsg.size()); + + return true; + } +#endif + bool check_event(int timeout = 0) { assert(_monitor_socket); @@ -2357,6 +2414,15 @@ class monitor_t _socket = socket_ref(); } #endif + + void close() ZMQ_NOTHROW + { +#ifdef ZMQ_EVENT_MONITOR_STOPPED + abort(); +#endif + _monitor_socket = socket_t(); + } + virtual void on_monitor_started() {} virtual void on_event_connected(const zmq_event_t &event_, const char *addr_) { @@ -2461,13 +2527,6 @@ class monitor_t socket_ref _socket; socket_t _monitor_socket; - - void close() ZMQ_NOTHROW - { - if (_socket) - zmq_socket_monitor(_socket.handle(), ZMQ_NULLPTR, 0); - _monitor_socket.close(); - } }; #if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER)