Skip to content

Commit

Permalink
fix: support arrays in C++ join/leave
Browse files Browse the repository at this point in the history
  • Loading branch information
aminya committed Dec 29, 2024
1 parent f1172a0 commit d8a95c8
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 54 deletions.
23 changes: 12 additions & 11 deletions src/draft.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,27 +41,28 @@ interface RadioGroupOptions {
export interface Radio extends Writable<MessageLike, [RadioGroupOptions]> {}
allowMethods(Radio.prototype, ["send"])

const join = (Socket.prototype as any).join
const leave = (Socket.prototype as any).leave
const join = (
Socket.prototype as Socket & {
join: (value: Array<string | Buffer>) => void
}
).join
const leave = (
Socket.prototype as Socket & {
leave: (value: Array<string | Buffer>) => void
}
).leave

export class Dish extends Socket {
constructor(options?: SocketOptions<Dish>) {
super(SocketType.Dish, options)
}

/* TODO: These methods might accept arrays in their C++ implementation for
the sake of simplicity. */

join(...values: Array<Buffer | string>): void {
for (const value of values) {
join(value)
}
join(values)
}

leave(...values: Array<Buffer | string>): void {
for (const value of values) {
leave(value)
}
leave(values)
}
}

Expand Down
14 changes: 2 additions & 12 deletions src/outgoing_msg.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "./module.h"
#include "util/error.h"
#include "util/string_or_buffer.h"

namespace zmq {
OutgoingMsg::OutgoingMsg(Napi::Value value, std::reference_wrapper<Module> module) {
Expand Down Expand Up @@ -118,18 +119,7 @@ bool OutgoingMsg::Parts::SetGroup(Napi::Value value) {
return false;
}

auto group = [&]() {
if (value.IsString()) {
return std::string(value.As<Napi::String>());
}
if (value.IsBuffer()) {
auto buf = value.As<Napi::Object>();
auto length = buf.As<Napi::Buffer<char>>().Length();
auto* value = buf.As<Napi::Buffer<char>>().Data();
return std::string(value, length);
}
return std::string();
}();
const auto group = convert_string_or_buffer(value);

for (auto& part : parts) {
if (zmq_msg_set_group(part.get(), group.c_str()) < 0) {
Expand Down
48 changes: 18 additions & 30 deletions src/socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
#include "./incoming_msg.h"
#include "./module.h"
#include "./observer.h"
#include "proxy.h"
#include "util/arguments.h"
#include "util/async_scope.h"
#include "util/error.h"
#include "util/object.h"
#include "util/string_or_buffer.h"
#include "util/take.h"
#include "util/uvdelayed.h"
#include "util/uvwork.h"
Expand Down Expand Up @@ -675,27 +677,20 @@ Napi::Value Socket::Receive(const Napi::CallbackInfo& info) {

void Socket::Join([[maybe_unused]] const Napi::CallbackInfo& info) {
#ifdef ZMQ_HAS_POLLABLE_THREAD_SAFE
Arg::Validator args{
Arg::Required<Arg::String, Arg::Buffer>("Group must be a string or buffer"),
};

if (args.ThrowIfInvalid(info)) {
return;
for (size_t i_value = 0; i_value < info.Length(); ++i_value) {
const auto& value = info[i_value];
this->JoinElement(value);
}
#endif
}

void Socket::JoinElement([[maybe_unused]] const Napi::Value& value) {
#ifdef ZMQ_HAS_POLLABLE_THREAD_SAFE
if (!ValidateOpen()) {
return;
}

auto str = [&]() {
if (info[0].IsString()) {
return std::string(info[0].As<Napi::String>());
}
auto buf = info[0].As<Napi::Object>();
auto length = buf.As<Napi::Buffer<char>>().Length();
auto* value = buf.As<Napi::Buffer<char>>().Data();
return std::string(value, length);
}();
const auto str = convert_string_or_buffer(value);

if (zmq_join(socket, str.c_str()) < 0) {
ErrnoException(Env(), zmq_errno()).ThrowAsJavaScriptException();
Expand All @@ -706,27 +701,20 @@ void Socket::Join([[maybe_unused]] const Napi::CallbackInfo& info) {

void Socket::Leave([[maybe_unused]] const Napi::CallbackInfo& info) {
#ifdef ZMQ_HAS_POLLABLE_THREAD_SAFE
Arg::Validator args{
Arg::Required<Arg::String, Arg::Buffer>("Group must be a string or buffer"),
};

if (args.ThrowIfInvalid(info)) {
return;
for (size_t i_value = 0; i_value < info.Length(); ++i_value) {
const auto& value = info[i_value];
this->LeaveElement(value);
}
#endif
}

void Socket::LeaveElement([[maybe_unused]] const Napi::Value& value) {
#ifdef ZMQ_HAS_POLLABLE_THREAD_SAFE
if (!ValidateOpen()) {
return;
}

auto str = [&]() {
if (info[0].IsString()) {
return std::string(info[0].As<Napi::String>());
}
auto buf = info[0].As<Napi::Object>();
auto length = buf.As<Napi::Buffer<char>>().Length();
auto* value = buf.As<Napi::Buffer<char>>().Data();
return std::string(value, length);
}();
const auto str = convert_string_or_buffer(value);

if (zmq_leave(socket, str.c_str()) < 0) {
ErrnoException(Env(), zmq_errno()).ThrowAsJavaScriptException();
Expand Down
3 changes: 3 additions & 0 deletions src/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ class Socket : public Napi::ObjectWrap<Socket>, public Closable {
force_inline void Send(const Napi::Promise::Deferred& res, OutgoingMsg::Parts& parts);
force_inline void Receive(const Napi::Promise::Deferred& res);

inline void JoinElement(const Napi::Value& value);
inline void LeaveElement(const Napi::Value& value);

class Poller : public zmq::Poller<Poller> {
std::reference_wrapper<Socket> socket;
std::optional<Napi::Promise::Deferred> read_deferred;
Expand Down
3 changes: 2 additions & 1 deletion src/util/arguments.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ class Validator {
if constexpr (I == NumArgs) {
if (info.Length() > NumArgs) {
auto msg = "Expected " + std::to_string(NumArgs) + " argument"
+ (NumArgs != 1 ? "s" : "");
+ (NumArgs != 1 ? "s" : "") + " but received "
+ std::to_string(info.Length());
return Napi::TypeError::New(info.Env(), msg);
}

Expand Down
22 changes: 22 additions & 0 deletions src/util/string_or_buffer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#pragma once

#include <napi.h>

#include <string>

namespace zmq {

inline std::string convert_string_or_buffer(const Napi::Value& value) {
if (value.IsString()) {
return std::string(value.As<Napi::String>());
}
if (value.IsBuffer()) {
auto buf = value.As<Napi::Object>();
auto length = buf.As<Napi::Buffer<char>>().Length();
auto* value = buf.As<Napi::Buffer<char>>().Data();
return {value, length};
}
throw Napi::TypeError::New(value.Env(), "Value must be a string or buffer");
}

} // namespace zmq

0 comments on commit d8a95c8

Please sign in to comment.