Skip to content

Commit

Permalink
Merge pull request #688 from zeromq/proxy-tests [skip ci]
Browse files Browse the repository at this point in the history
  • Loading branch information
aminya authored Dec 30, 2024
2 parents 319032c + f52055c commit d473c10
Show file tree
Hide file tree
Showing 9 changed files with 142 additions and 115 deletions.
11 changes: 5 additions & 6 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,9 @@ jobs:
uses: nick-fields/retry@v3
with:
timeout_minutes: 5
max_attempts: 1
max_attempts: 2
command: |
pnpm run test.unit
continue-on-error: true
- name: Clean Tmp
run: rm -rf ./tmp
Expand All @@ -172,7 +171,7 @@ jobs:
uses: nick-fields/retry@v3
with:
timeout_minutes: 5
max_attempts: 1
max_attempts: 2
command: |
pnpm run test.unit.compat
continue-on-error: true
Expand All @@ -182,11 +181,11 @@ jobs:
shell: bash

- name: Test Electron Windows/MacOS
if: "${{ !matrix.dockerfile }}"
if: "${{ !contains(matrix.os, 'ubuntu') && !matrix.dockerfile }}"
uses: nick-fields/retry@v3
with:
timeout_minutes: 5
max_attempts: 1
max_attempts: 2
command: |
pnpm run test.electron.main
continue-on-error: true
Expand All @@ -196,7 +195,7 @@ jobs:
uses: nick-fields/retry@v3
with:
timeout_minutes: 5
max_attempts: 1
max_attempts: 2
command: |
sudo apt-get install xvfb
xvfb-run --auto-servernum pnpm run test.electron.main
Expand Down
6 changes: 3 additions & 3 deletions .mocharc.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ const config = {
"expose-gc": true,
"v8-expose-gc": true,
exit: true,
parallel: true,
timeout: 5000,
retries: 1,
parallel: false,
timeout: 6000,
retries: 3,
fullTrace: true,
bail: false,
}
Expand Down
1 change: 1 addition & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
},
"mochaExplorer.files": "test/unit/**/*-test.ts",
"mochaExplorer.mochaPath": "./node_modules/mocha",
"mochaExplorer.timeout": 6000,
"files.exclude": {
"**/.DS_Store": true,
"**/Thumbs.db": true,
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@
"test.unit.debug": "run-s clean.temp build.debug && mocha ./test/unit/*-test.ts",
"test.unit.compat": "run-s clean.temp build && cross-env INCLUDE_COMPAT_TESTS=true mocha ./test/unit/compat/*-test.ts",
"test.unit.nogc": "run-s clean.temp build && cross-env SKIP_GC_TESTS=true mocha",
"test.electron.main": "run-s clean.temp build && electron-mocha ./test/unit/*-test.ts",
"test.electron.main": "run-s clean.temp build && cross-env SKIP_GC_TESTS=true electron-mocha ./test/unit/*-test.ts",
"test.electron.renderer": "run-s build && electron-mocha --renderer ./test/unit/*-test.ts",
"test.smoke": "bash ./script/smoke-test.bash",
"format": "run-s format.prettier format.clang-format",
Expand Down
21 changes: 17 additions & 4 deletions test/unit/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@ if (semver.satisfies(zmq.version, ">= 4.2")) {
* Get a unique id to be used as a port number or IPC path.
* This function is thread-safe and will use a lock file to ensure that the id is unique.
*/
let idFallback = 5000
let idFallback = 6000
async function getUniqueId() {
const idPath = path.resolve(__dirname, "../../tmp/port-id.lock")
await fs.promises.mkdir(path.dirname(idPath), {recursive: true})

try {
// Create the file if it doesn't exist
if (!fs.existsSync(idPath)) {
await fs.promises.writeFile(idPath, "5000", "utf8")
await fs.promises.writeFile(idPath, "6000", "utf8")

/* Windows cannot bind on a ports just above 1014; start higher to be safe. */
return 5000
return 6000
}

await lockfile.lock(idPath, {retries: 10})
Expand Down Expand Up @@ -63,7 +63,7 @@ async function getUniqueId() {
}
}

type Proto = "ipc" | "tcp" | "udp" | "inproc"
export type Proto = "ipc" | "tcp" | "udp" | "inproc"

export async function uniqAddress(proto: Proto) {
const id = await getUniqueId()
Expand All @@ -84,6 +84,19 @@ export async function uniqAddress(proto: Proto) {
}
}

export async function cleanSocket(address: string) {
const [proto, path] = address.split("://")[1]
if (proto !== "ipc" || !path) {
return
}
const exists = await fs.promises
.access(path, fs.constants.F_OK)
.catch(() => false)
if (exists) {
await fs.promises.rm(path)
}
}

export function testProtos(...requested: Proto[]) {
const set = new Set(requested)

Expand Down
110 changes: 11 additions & 99 deletions test/unit/proxy-router-dealer-test.ts
Original file line number Diff line number Diff line change
@@ -1,104 +1,16 @@
import * as semver from "semver"
import * as zmq from "../../src"

import {assert} from "chai"
import {testProtos, uniqAddress} from "./helpers"
import {Worker} from "worker_threads"
import {testProtos} from "./helpers"

for (const proto of testProtos("tcp", "ipc", "inproc")) {
describe(`proxy with ${proto} router/dealer`, function () {
/* ZMQ < 4.0.5 has no steerable proxy support. */
if (semver.satisfies(zmq.version, "< 4.0.5")) {
return
}

let proxy: zmq.Proxy

let frontAddress: string
let backAddress: string

let req: zmq.Request
let rep: zmq.Reply

beforeEach(async function () {
proxy = new zmq.Proxy(new zmq.Router(), new zmq.Dealer())

frontAddress = await uniqAddress(proto)
backAddress = await uniqAddress(proto)

req = new zmq.Request()
rep = new zmq.Reply()
})

afterEach(function () {
/* Closing proxy sockets is only necessary if run() fails. */
proxy.frontEnd.close()
proxy.backEnd.close()

req.close()
rep.close()
global.gc?.()
})

describe("run", function () {
it("should proxy messages", async function () {
/* REQ -> foo -> ROUTER <-> DEALER -> foo -> REP
<- foo <- <- foo <-
-> bar -> -> bar ->
<- bar <- <- bar <-
pause
resume
-> baz -> -> baz ->
<- baz <- <- baz <-
-> qux -> -> qux ->
<- qux <- <- qux <-
*/

await proxy.frontEnd.bind(frontAddress)
await proxy.backEnd.bind(backAddress)

const done = proxy.run()

const messages = ["foo", "bar", "baz", "qux"]
const received: string[] = []

await req.connect(frontAddress)
await rep.connect(backAddress)

const echo = async () => {
for await (const msg of rep) {
await rep.send(msg)
}
}

const send = async () => {
for (const msg of messages) {
if (received.length === 2) {
proxy.pause()
proxy.resume()
}

await req.send(Buffer.from(msg))

const [res] = await req.receive()
received.push(res.toString())
if (received.length === messages.length) {
break
}
}

rep.close()
}

console.log(
`waiting for messages for proxy with ${proto} router/dealer...`,
)

await Promise.all([echo(), send()])
assert.deepEqual(received, messages)

proxy.terminate()
await done
console.log(`Done proxying with ${proto} router/dealer`)
describe(`proxy with ${proto} router/dealer`, () => {
describe("run", () => {
it("should proxy messages", async () => {
const worker = new Worker(__filename, {
workerData: {
proto,
},
})
await worker.terminate()
})
})
})
Expand Down
98 changes: 98 additions & 0 deletions test/unit/proxy-router-dealer-worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import {assert} from "chai"
import * as semver from "semver"
import * as zmq from "../../src"
import type {Proto} from "./helpers"
import {cleanSocket, uniqAddress} from "./helpers"
import {workerData} from "worker_threads"

async function testProxyRouterDealer(proto: Proto) {
/* ZMQ < 4.0.5 has no steerable proxy support. */
if (semver.satisfies(zmq.version, "< 4.0.5")) {
return
}

const proxy = new zmq.Proxy(new zmq.Router(), new zmq.Dealer())

const frontAddress = await uniqAddress(proto)
const backAddress = await uniqAddress(proto)

const req = new zmq.Request()
const rep = new zmq.Reply()

try {
/* REQ -> foo -> ROUTER <-> DEALER -> foo -> REP
<- foo <- <- foo <-
-> bar -> -> bar ->
<- bar <- <- bar <-
pause
resume
-> baz -> -> baz ->
<- baz <- <- baz <-
-> qux -> -> qux ->
<- qux <- <- qux <-
*/
await proxy.frontEnd.bind(frontAddress)
await proxy.backEnd.bind(backAddress)

const done = proxy.run()

const messages = ["foo", "bar", "baz", "qux"]
const received: string[] = []

await req.connect(frontAddress)
await rep.connect(backAddress)

const echo = async () => {
for await (const msg of rep) {
await rep.send(msg)
}
}

const send = async () => {
for (const msg of messages) {
if (received.length === 2) {
proxy.pause()
proxy.resume()
}

await req.send(Buffer.from(msg))

const [res] = await req.receive()
received.push(res.toString())
if (received.length === messages.length) {
break
}
}

rep.close()
}

console.log(`waiting for messages for proxy with ${proto} router/dealer...`)

await Promise.all([echo(), send()])
assert.deepEqual(received, messages)

proxy.terminate()
await done
console.log(`Done proxying with ${proto} router/dealer`)
} catch (err) {
/* Closing proxy sockets is only necessary if run() fails. */
proxy.frontEnd.close()
proxy.backEnd.close()
throw err
} finally {
req.close()
rep.close()
global.gc?.()
await Promise.all([cleanSocket(frontAddress), cleanSocket(backAddress)])
}
}

// Receive the proto from the main thread
testProxyRouterDealer(workerData.proto as Proto).catch(err => {
console.error(
`Error testing proxy with ${workerData.proto} router/dealer:`,
err,
)
process.exit(1)
})
2 changes: 1 addition & 1 deletion test/unit/proxy-run-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import * as semver from "semver"
import * as zmq from "../../src"

import {assert} from "chai"
import {testProtos, uniqAddress} from "./helpers"
import {cleanSocket, testProtos, uniqAddress} from "./helpers"
import {isFullError} from "../../src/errors"

for (const proto of testProtos("tcp", "ipc", "inproc")) {
Expand Down
6 changes: 5 additions & 1 deletion test/unit/typings-compatibility-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,11 @@ describe("compatibility of typings for typescript versions", async function () {
})

afterEach(async () => {
await remove(tscTargetPath)
try {
await remove(tscTargetPath)
} catch (err) {
console.error(`Failed to remove ${tscTargetPath}:`, err)
}
})
}
})
Expand Down

0 comments on commit d473c10

Please sign in to comment.