Skip to content

Commit

Permalink
feat: enable Draft API support
Browse files Browse the repository at this point in the history
  • Loading branch information
aminya committed Dec 29, 2024
1 parent 9d69b11 commit f1172a0
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 39 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ macro(set_option_from_env OPTION_NAME)
message(STATUS "${OPTION_NAME}: ${${OPTION_NAME}}")
endmacro()

option(ZMQ_DRAFT "Build and install draft APIs" OFF)
option(ZMQ_DRAFT "Build and install draft APIs (e.g. `server-client`, `radio-dish`, `scatter-gather`)" ON)
set_option_from_env(ZMQ_DRAFT)

option(ZMQ_CURVE "Enable CURVE security" ON)
Expand Down
21 changes: 15 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
- High performance.
- Fully usable with TypeScript (3+).
- Compatible with Zeromq 4/5 via "zeromq/v5-compat"
- Secure Curve protocol support
- Zeromq Draft API support

## Useful links

Expand All @@ -28,7 +30,7 @@
- [Installation](#installation)
- [Prebuilt binaries](#prebuilt-binaries)
- [Building from source](#building-from-source)
- [Available Build Options](#available-build-options)
- [Available Build Options](#available-build-options)
- [Curve with Libsodium support](#curve-with-libsodium-support)
- [Draft support](#draft-support)
- [Websocket support](#websocket-support)
Expand Down Expand Up @@ -104,31 +106,38 @@ source:

To install from source, specify `build_from_source=true` in a `.npmrc` file

```
```ini
build_from_source=true
```

#### Available Build Options

When building from source, you can also specify additional build options in a
`.npmrc` file in your project:

### Available Build Options

<details>
<summary>👉🏻 Options</summary>

### Curve with Libsodium support

Enables CURVE security for encrypted communications. Zeromq uses libsodium for CURVE security. To enable CURVE support, add the following to your .npmrc:
(Enabled by default)

Enables CURVE security for encrypted communications. Zeromq uses libsodium for
CURVE security. To enable CURVE support, add the following to your .npmrc:

```ini
zmq_curve="true"
zmq_sodium="true"
```

Building libsodium requires these dependencies on Linux/MacOS: `autoconf automake libtool`, which can be installed via `apt-get` or `brew`, etc.
Building libsodium requires these dependencies on Linux/MacOS:
`autoconf automake libtool`, which can be installed via `apt-get` or `brew`,
etc.

#### Draft support

(Enabled by default)

By default `libzmq` is built with support for `Draft` patterns (e.g.
`server-client`, `radio-dish`, `scatter-gather`). If you want to build `libzmq`
without support for `Draft`, you can specify the following in `.npmrc`:
Expand Down
3 changes: 2 additions & 1 deletion src/module.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ struct Terminator {
});

using namespace std::chrono_literals;
if (terminate.wait_for(500ms) == std::future_status::timeout) {
const auto timeout = 500ms;
if (terminate.wait_for(timeout) == std::future_status::timeout) {
/* We can't use process.emitWarning, because the Node.js runtime
has already shut down. So we mimic it instead. */
(void)fprintf(stderr,
Expand Down
17 changes: 9 additions & 8 deletions src/outgoing_msg.cc
Original file line number Diff line number Diff line change
Expand Up @@ -121,18 +121,18 @@ bool OutgoingMsg::Parts::SetGroup(Napi::Value value) {
auto group = [&]() {
if (value.IsString()) {
return std::string(value.As<Napi::String>());
} else if (value.IsBuffer()) {
Napi::Object buf = value.As<Napi::Object>();
}
if (value.IsBuffer()) {
auto buf = value.As<Napi::Object>();
auto length = buf.As<Napi::Buffer<char>>().Length();
auto value = buf.As<Napi::Buffer<char>>().Data();
auto* value = buf.As<Napi::Buffer<char>>().Data();
return std::string(value, length);
} else {
return std::string();
}
return std::string();
}();

for (auto& part : parts) {
if (zmq_msg_set_group(part, group.c_str()) < 0) {
if (zmq_msg_set_group(part.get(), group.c_str()) < 0) {
ErrnoException(value.Env(), zmq_errno()).ThrowAsJavaScriptException();
return false;
}
Expand All @@ -143,14 +143,15 @@ bool OutgoingMsg::Parts::SetGroup(Napi::Value value) {

bool OutgoingMsg::Parts::SetRoutingId(Napi::Value value) {
if (value.IsUndefined()) {
// https://clang.llvm.org/extra/clang-tidy/checks/readability/identifier-length.html
ErrnoException(value.Env(), EINVAL).ThrowAsJavaScriptException();
return false;
}

auto id = value.As<Napi::Number>().Uint32Value();
auto routing_id = value.As<Napi::Number>().Uint32Value();

for (auto& part : parts) {
if (zmq_msg_set_routing_id(part, id) < 0) {
if (zmq_msg_set_routing_id(part.get(), routing_id) < 0) {
ErrnoException(value.Env(), zmq_errno()).ThrowAsJavaScriptException();
return false;
}
Expand Down
55 changes: 32 additions & 23 deletions src/socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ Socket::Socket(const Napi::CallbackInfo& info)
}

uv_os_sock_t file_descriptor = 0;
std::function<void()> const finalize = nullptr;

const auto error = [this]() {
[[maybe_unused]] auto err = zmq_close(socket);
Expand All @@ -125,20 +124,22 @@ Socket::Socket(const Napi::CallbackInfo& info)
}
#endif

std::function<void()> finalize = nullptr;

/* Currently only some DRAFT sockets are threadsafe. */
if (thread_safe) {
#ifdef ZMQ_HAS_POLLABLE_THREAD_SAFE
/* Threadsafe sockets do not expose an FD we can integrate into the
event loop, so we have to construct one by creating a zmq_poller. */
auto poll = zmq_poller_new();
auto* poll = zmq_poller_new();
if (poll == nullptr) {
ErrnoException(Env(), zmq_errno()).ThrowAsJavaScriptException();
error();
}

/* Callback to free the underlying poller. Move the poller to transfer
ownership after the constructor has completed. */
finalize = [=]() mutable {
finalize = [&]() {
[[maybe_unused]] auto err = zmq_poller_destroy(&poll);
assert(err == 0);
};
Expand All @@ -149,7 +150,7 @@ Socket::Socket(const Napi::CallbackInfo& info)
error();
}

if (zmq_poller_fd(poll, &fd) < 0) {
if (zmq_poller_fd(poll, &file_descriptor) < 0) {
ErrnoException(Env(), zmq_errno()).ThrowAsJavaScriptException();
finalize();
error();
Expand Down Expand Up @@ -327,17 +328,17 @@ void Socket::Receive(const Napi::Promise::Deferred& res) {
switch (type) {
case ZMQ_SERVER: {
auto meta = Napi::Object::New(Env());
meta.Set("routingId", zmq_msg_routing_id(part));
list[i++] = meta;
meta.Set("routingId", zmq_msg_routing_id(part.get()));
list[i_part++] = meta;
break;
}

case ZMQ_DISH: {
auto meta = Napi::Object::New(Env());
auto data = zmq_msg_group(part);
const auto* data = zmq_msg_group(part.get());
auto length = strnlen(data, ZMQ_GROUP_MAX_LENGTH);
meta.Set("group", Napi::Buffer<char>::Copy(Env(), data, length));
list[i++] = meta;
list[i_part++] = meta;
break;
}
}
Expand Down Expand Up @@ -534,7 +535,9 @@ Napi::Value Socket::Send(const Napi::CallbackInfo& info) {
Arg::Required<Arg::Object>("Options must be an object"),
};

if (args.ThrowIfInvalid(info)) return Env().Undefined();
if (args.ThrowIfInvalid(info)) {
return Env().Undefined();
}

break;
}
Expand Down Expand Up @@ -676,19 +679,22 @@ void Socket::Join([[maybe_unused]] const Napi::CallbackInfo& info) {
Arg::Required<Arg::String, Arg::Buffer>("Group must be a string or buffer"),
};

if (args.ThrowIfInvalid(info)) return;
if (args.ThrowIfInvalid(info)) {
return;
}

if (!ValidateOpen()) return;
if (!ValidateOpen()) {
return;
}

auto str = [&]() {
if (info[0].IsString()) {
return std::string(info[0].As<Napi::String>());
} else {
Napi::Object buf = info[0].As<Napi::Object>();
auto length = buf.As<Napi::Buffer<char>>().Length();
auto value = buf.As<Napi::Buffer<char>>().Data();
return std::string(value, length);
}
auto buf = info[0].As<Napi::Object>();
auto length = buf.As<Napi::Buffer<char>>().Length();
auto* value = buf.As<Napi::Buffer<char>>().Data();
return std::string(value, length);
}();

if (zmq_join(socket, str.c_str()) < 0) {
Expand All @@ -704,19 +710,22 @@ void Socket::Leave([[maybe_unused]] const Napi::CallbackInfo& info) {
Arg::Required<Arg::String, Arg::Buffer>("Group must be a string or buffer"),
};

if (args.ThrowIfInvalid(info)) return;
if (args.ThrowIfInvalid(info)) {
return;
}

if (!ValidateOpen()) return;
if (!ValidateOpen()) {
return;
}

auto str = [&]() {
if (info[0].IsString()) {
return std::string(info[0].As<Napi::String>());
} else {
Napi::Object buf = info[0].As<Napi::Object>();
auto length = buf.As<Napi::Buffer<char>>().Length();
auto value = buf.As<Napi::Buffer<char>>().Data();
return std::string(value, length);
}
auto buf = info[0].As<Napi::Object>();
auto length = buf.As<Napi::Buffer<char>>().Length();
auto* value = buf.As<Napi::Buffer<char>>().Data();
return std::string(value, length);
}();

if (zmq_leave(socket, str.c_str()) < 0) {
Expand Down

0 comments on commit f1172a0

Please sign in to comment.