Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: fix and enable Zeromq Draft API support #683

Merged
merged 3 commits into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/workflows/docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ jobs:
steps:
- uses: actions/checkout@v4


- name: Cache
uses: actions/cache@v4
with:
Expand Down
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ macro(set_option_from_env OPTION_NAME)
message(STATUS "${OPTION_NAME}: ${${OPTION_NAME}}")
endmacro()

option(ZMQ_DRAFT "Build and install draft APIs" OFF)
option(ZMQ_DRAFT "Build and install draft APIs (e.g. `server-client`, `radio-dish`, `scatter-gather`)" ON)
set_option_from_env(ZMQ_DRAFT)

option(ZMQ_CURVE "Enable CURVE security" ON)
Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
- Fully usable with TypeScript (3+).
- Compatible with Zeromq 4/5 via "zeromq/v5-compat"
- Secure Curve protocol with Libsodium
- Zeromq Draft API support

## Useful links

Expand Down Expand Up @@ -142,6 +143,8 @@ etc.

#### Draft support

(Enabled by default)

By default `libzmq` is built with support for `Draft` patterns (e.g.
`server-client`, `radio-dish`, `scatter-gather`). If you want to build `libzmq`
without support for `Draft`, you can specify the following in `.npmrc`:
Expand Down
16 changes: 6 additions & 10 deletions src/draft.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,27 +41,23 @@ interface RadioGroupOptions {
export interface Radio extends Writable<MessageLike, [RadioGroupOptions]> {}
allowMethods(Radio.prototype, ["send"])

const join = (Socket.prototype as any).join
const leave = (Socket.prototype as any).leave

export class Dish extends Socket {
constructor(options?: SocketOptions<Dish>) {
super(SocketType.Dish, options)
}

/* TODO: These methods might accept arrays in their C++ implementation for
the sake of simplicity. */

join(...values: Array<Buffer | string>): void {
for (const value of values) {
join(value)
const {join} = Socket.prototype as Socket & {
join: (value: Array<string | Buffer>) => void
}
join(values)
}

leave(...values: Array<Buffer | string>): void {
for (const value of values) {
leave(value)
const {leave} = Socket.prototype as Socket & {
leave: (value: Array<string | Buffer>) => void
}
leave(values)
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/module.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ struct Terminator {
});

using namespace std::chrono_literals;
if (terminate.wait_for(500ms) == std::future_status::timeout) {
const auto timeout = 500ms;
if (terminate.wait_for(timeout) == std::future_status::timeout) {
/* We can't use process.emitWarning, because the Node.js runtime
has already shut down. So we mimic it instead. */
(void)fprintf(stderr,
Expand Down
21 changes: 6 additions & 15 deletions src/outgoing_msg.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "./module.h"
#include "util/error.h"
#include "util/string_or_buffer.h"

namespace zmq {
OutgoingMsg::OutgoingMsg(Napi::Value value, std::reference_wrapper<Module> module) {
Expand Down Expand Up @@ -116,21 +117,10 @@ bool OutgoingMsg::Parts::SetGroup(Napi::Value value) {
return false;
}

auto group = [&]() {
if (value.IsString()) {
return std::string(value.As<Napi::String>());
} else if (value.IsBuffer()) {
Napi::Object buf = value.As<Napi::Object>();
auto length = buf.As<Napi::Buffer<char>>().Length();
auto value = buf.As<Napi::Buffer<char>>().Data();
return std::string(value, length);
} else {
return std::string();
}
}();
const auto group = convert_string_or_buffer(value);

for (auto& part : parts) {
if (zmq_msg_set_group(part, group.c_str()) < 0) {
if (zmq_msg_set_group(part.get(), group.c_str()) < 0) {
ErrnoException(value.Env(), zmq_errno()).ThrowAsJavaScriptException();
return false;
}
Expand All @@ -141,14 +131,15 @@ bool OutgoingMsg::Parts::SetGroup(Napi::Value value) {

bool OutgoingMsg::Parts::SetRoutingId(Napi::Value value) {
if (value.IsUndefined()) {
// https://clang.llvm.org/extra/clang-tidy/checks/readability/identifier-length.html
ErrnoException(value.Env(), EINVAL).ThrowAsJavaScriptException();
return false;
}

auto id = value.As<Napi::Number>().Uint32Value();
auto routing_id = value.As<Napi::Number>().Uint32Value();

for (auto& part : parts) {
if (zmq_msg_set_routing_id(part, id) < 0) {
if (zmq_msg_set_routing_id(part.get(), routing_id) < 0) {
ErrnoException(value.Env(), zmq_errno()).ThrowAsJavaScriptException();
return false;
}
Expand Down
87 changes: 43 additions & 44 deletions src/socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "util/async_scope.h"
#include "util/error.h"
#include "util/object.h"
#include "util/string_or_buffer.h"
#include "util/take.h"
#include "util/uvdelayed.h"
#include "util/uvwork.h"
Expand Down Expand Up @@ -102,8 +103,7 @@ Socket::Socket(const Napi::CallbackInfo& info)
return;
}

uv_os_sock_t file_descriptor = 0;
std::function<void()> const finalize = nullptr;
auto file_descriptor = uv_os_sock_t{};

const auto error = [this]() {
[[maybe_unused]] auto err = zmq_close(socket);
Expand All @@ -125,22 +125,27 @@ Socket::Socket(const Napi::CallbackInfo& info)
}
#endif

std::function<void()> finalize = nullptr;

/* Currently only some DRAFT sockets are threadsafe. */
if (thread_safe) {
#ifdef ZMQ_HAS_POLLABLE_THREAD_SAFE
/* Threadsafe sockets do not expose an FD we can integrate into the
event loop, so we have to construct one by creating a zmq_poller. */
auto poll = zmq_poller_new();
auto* poll = zmq_poller_new();
if (poll == nullptr) {
ErrnoException(Env(), zmq_errno()).ThrowAsJavaScriptException();
error();
}

/* Callback to free the underlying poller. Move the poller to transfer
ownership after the constructor has completed. */
finalize = [=]() mutable {
[[maybe_unused]] auto err = zmq_poller_destroy(&poll);
assert(err == 0);
finalize = [poll]() mutable {
if (poll != nullptr) {
[[maybe_unused]] auto err = zmq_poller_destroy(&poll);
assert(err == 0);
poll = nullptr;
}
};

if (zmq_poller_add(poll, socket, nullptr, ZMQ_POLLIN | ZMQ_POLLOUT) < 0) {
Expand All @@ -149,7 +154,7 @@ Socket::Socket(const Napi::CallbackInfo& info)
error();
}

if (zmq_poller_fd(poll, &fd) < 0) {
if (zmq_poller_fd(poll, &file_descriptor) < 0) {
ErrnoException(Env(), zmq_errno()).ThrowAsJavaScriptException();
finalize();
error();
Expand Down Expand Up @@ -327,17 +332,17 @@ void Socket::Receive(const Napi::Promise::Deferred& res) {
switch (type) {
case ZMQ_SERVER: {
auto meta = Napi::Object::New(Env());
meta.Set("routingId", zmq_msg_routing_id(part));
list[i++] = meta;
meta.Set("routingId", zmq_msg_routing_id(part.get()));
list[i_part++] = meta;
break;
}

case ZMQ_DISH: {
auto meta = Napi::Object::New(Env());
auto data = zmq_msg_group(part);
const auto* data = zmq_msg_group(part.get());
auto length = strnlen(data, ZMQ_GROUP_MAX_LENGTH);
meta.Set("group", Napi::Buffer<char>::Copy(Env(), data, length));
list[i++] = meta;
list[i_part++] = meta;
break;
}
}
Expand Down Expand Up @@ -610,7 +615,9 @@ Napi::Value Socket::Send(const Napi::CallbackInfo& info) {
Arg::Required<Arg::Object>("Options must be an object"),
};

if (args.ThrowIfInvalid(info)) return Env().Undefined();
if (args.ThrowIfInvalid(info)) {
return Env().Undefined();
}

break;
}
Expand Down Expand Up @@ -748,24 +755,20 @@ Napi::Value Socket::Receive(const Napi::CallbackInfo& info) {

void Socket::Join([[maybe_unused]] const Napi::CallbackInfo& info) {
#ifdef ZMQ_HAS_POLLABLE_THREAD_SAFE
Arg::Validator args{
Arg::Required<Arg::String, Arg::Buffer>("Group must be a string or buffer"),
};
for (size_t i_value = 0; i_value < info.Length(); ++i_value) {
const auto& value = info[i_value];
this->JoinElement(value);
}
#endif
}

if (args.ThrowIfInvalid(info)) return;
void Socket::JoinElement([[maybe_unused]] const Napi::Value& value) {
#ifdef ZMQ_HAS_POLLABLE_THREAD_SAFE
if (!ValidateOpen()) {
return;
}

if (!ValidateOpen()) return;

auto str = [&]() {
if (info[0].IsString()) {
return std::string(info[0].As<Napi::String>());
} else {
Napi::Object buf = info[0].As<Napi::Object>();
auto length = buf.As<Napi::Buffer<char>>().Length();
auto value = buf.As<Napi::Buffer<char>>().Data();
return std::string(value, length);
}
}();
const auto str = convert_string_or_buffer(value);

if (zmq_join(socket, str.c_str()) < 0) {
ErrnoException(Env(), zmq_errno()).ThrowAsJavaScriptException();
Expand All @@ -776,24 +779,20 @@ void Socket::Join([[maybe_unused]] const Napi::CallbackInfo& info) {

void Socket::Leave([[maybe_unused]] const Napi::CallbackInfo& info) {
#ifdef ZMQ_HAS_POLLABLE_THREAD_SAFE
Arg::Validator args{
Arg::Required<Arg::String, Arg::Buffer>("Group must be a string or buffer"),
};

if (args.ThrowIfInvalid(info)) return;
for (size_t i_value = 0; i_value < info.Length(); ++i_value) {
const auto& value = info[i_value];
this->LeaveElement(value);
}
#endif
}

if (!ValidateOpen()) return;
void Socket::LeaveElement([[maybe_unused]] const Napi::Value& value) {
#ifdef ZMQ_HAS_POLLABLE_THREAD_SAFE
if (!ValidateOpen()) {
return;
}

auto str = [&]() {
if (info[0].IsString()) {
return std::string(info[0].As<Napi::String>());
} else {
Napi::Object buf = info[0].As<Napi::Object>();
auto length = buf.As<Napi::Buffer<char>>().Length();
auto value = buf.As<Napi::Buffer<char>>().Data();
return std::string(value, length);
}
}();
const auto str = convert_string_or_buffer(value);

if (zmq_leave(socket, str.c_str()) < 0) {
ErrnoException(Env(), zmq_errno()).ThrowAsJavaScriptException();
Expand Down
3 changes: 3 additions & 0 deletions src/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ class Socket : public Napi::ObjectWrap<Socket>, public Closable {
force_inline void Send(const Napi::Promise::Deferred& res, OutgoingMsg::Parts& parts);
force_inline void Receive(const Napi::Promise::Deferred& res);

inline void JoinElement(const Napi::Value& value);
inline void LeaveElement(const Napi::Value& value);

class Poller : public zmq::Poller<Poller> {
std::reference_wrapper<Socket> socket;
std::optional<Napi::Promise::Deferred> read_deferred;
Expand Down
3 changes: 2 additions & 1 deletion src/util/arguments.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ class Validator {
if constexpr (I == NumArgs) {
if (info.Length() > NumArgs) {
auto msg = "Expected " + std::to_string(NumArgs) + " argument"
+ (NumArgs != 1 ? "s" : "");
+ (NumArgs != 1 ? "s" : "") + " but received "
+ std::to_string(info.Length());
return Napi::TypeError::New(info.Env(), msg);
}

Expand Down
22 changes: 22 additions & 0 deletions src/util/string_or_buffer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#pragma once

#include <napi.h>

#include <string>

namespace zmq {

inline std::string convert_string_or_buffer(const Napi::Value& value) {
if (value.IsString()) {
return std::string(value.As<Napi::String>());
}
if (value.IsBuffer()) {
auto buf = value.As<Napi::Object>();
auto length = buf.As<Napi::Buffer<char>>().Length();
auto* value = buf.As<Napi::Buffer<char>>().Data();
return {value, length};
}
throw Napi::TypeError::New(value.Env(), "Value must be a string or buffer");
}

} // namespace zmq
Loading