Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: prevent proxy test conflicts via worker-threads #688

Merged
merged 5 commits into from
Dec 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,9 @@ jobs:
uses: nick-fields/retry@v3
with:
timeout_minutes: 5
max_attempts: 1
max_attempts: 2
command: |
pnpm run test.unit
continue-on-error: true

- name: Clean Tmp
run: rm -rf ./tmp
Expand All @@ -172,7 +171,7 @@ jobs:
uses: nick-fields/retry@v3
with:
timeout_minutes: 5
max_attempts: 1
max_attempts: 2
command: |
pnpm run test.unit.compat
continue-on-error: true
Expand All @@ -182,11 +181,11 @@ jobs:
shell: bash

- name: Test Electron Windows/MacOS
if: "${{ !matrix.dockerfile }}"
if: "${{ !contains(matrix.os, 'ubuntu') && !matrix.dockerfile }}"
uses: nick-fields/retry@v3
with:
timeout_minutes: 5
max_attempts: 1
max_attempts: 2
command: |
pnpm run test.electron.main
continue-on-error: true
Expand All @@ -196,7 +195,7 @@ jobs:
uses: nick-fields/retry@v3
with:
timeout_minutes: 5
max_attempts: 1
max_attempts: 2
command: |
sudo apt-get install xvfb
xvfb-run --auto-servernum pnpm run test.electron.main
Expand Down
6 changes: 3 additions & 3 deletions .mocharc.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ const config = {
"expose-gc": true,
"v8-expose-gc": true,
exit: true,
parallel: true,
timeout: 5000,
retries: 1,
parallel: false,
timeout: 6000,
retries: 3,
fullTrace: true,
bail: false,
}
Expand Down
1 change: 1 addition & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
},
"mochaExplorer.files": "test/unit/**/*-test.ts",
"mochaExplorer.mochaPath": "./node_modules/mocha",
"mochaExplorer.timeout": 6000,
"files.exclude": {
"**/.DS_Store": true,
"**/Thumbs.db": true,
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@
"test.unit.debug": "run-s clean.temp build.debug && mocha ./test/unit/*-test.ts",
"test.unit.compat": "run-s clean.temp build && cross-env INCLUDE_COMPAT_TESTS=true mocha ./test/unit/compat/*-test.ts",
"test.unit.nogc": "run-s clean.temp build && cross-env SKIP_GC_TESTS=true mocha",
"test.electron.main": "run-s clean.temp build && electron-mocha ./test/unit/*-test.ts",
"test.electron.main": "run-s clean.temp build && cross-env SKIP_GC_TESTS=true electron-mocha ./test/unit/*-test.ts",
"test.electron.renderer": "run-s build && electron-mocha --renderer ./test/unit/*-test.ts",
"test.smoke": "bash ./script/smoke-test.bash",
"format": "run-s format.prettier format.clang-format",
Expand Down
21 changes: 17 additions & 4 deletions test/unit/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@ if (semver.satisfies(zmq.version, ">= 4.2")) {
* Get a unique id to be used as a port number or IPC path.
* This function is thread-safe and will use a lock file to ensure that the id is unique.
*/
let idFallback = 5000
let idFallback = 6000
async function getUniqueId() {
const idPath = path.resolve(__dirname, "../../tmp/port-id.lock")
await fs.promises.mkdir(path.dirname(idPath), {recursive: true})

try {
// Create the file if it doesn't exist
if (!fs.existsSync(idPath)) {
await fs.promises.writeFile(idPath, "5000", "utf8")
await fs.promises.writeFile(idPath, "6000", "utf8")

/* Windows cannot bind on a ports just above 1014; start higher to be safe. */
return 5000
return 6000
}

await lockfile.lock(idPath, {retries: 10})
Expand Down Expand Up @@ -63,7 +63,7 @@ async function getUniqueId() {
}
}

type Proto = "ipc" | "tcp" | "udp" | "inproc"
export type Proto = "ipc" | "tcp" | "udp" | "inproc"

export async function uniqAddress(proto: Proto) {
const id = await getUniqueId()
Expand All @@ -84,6 +84,19 @@ export async function uniqAddress(proto: Proto) {
}
}

export async function cleanSocket(address: string) {
const [proto, path] = address.split("://")[1]
if (proto !== "ipc" || !path) {
return
}
const exists = await fs.promises
.access(path, fs.constants.F_OK)
.catch(() => false)
if (exists) {
await fs.promises.rm(path)
}
}

export function testProtos(...requested: Proto[]) {
const set = new Set(requested)

Expand Down
110 changes: 11 additions & 99 deletions test/unit/proxy-router-dealer-test.ts
Original file line number Diff line number Diff line change
@@ -1,104 +1,16 @@
import * as semver from "semver"
import * as zmq from "../../src"

import {assert} from "chai"
import {testProtos, uniqAddress} from "./helpers"
import {Worker} from "worker_threads"
import {testProtos} from "./helpers"

for (const proto of testProtos("tcp", "ipc", "inproc")) {
describe(`proxy with ${proto} router/dealer`, function () {
/* ZMQ < 4.0.5 has no steerable proxy support. */
if (semver.satisfies(zmq.version, "< 4.0.5")) {
return
}

let proxy: zmq.Proxy

let frontAddress: string
let backAddress: string

let req: zmq.Request
let rep: zmq.Reply

beforeEach(async function () {
proxy = new zmq.Proxy(new zmq.Router(), new zmq.Dealer())

frontAddress = await uniqAddress(proto)
backAddress = await uniqAddress(proto)

req = new zmq.Request()
rep = new zmq.Reply()
})

afterEach(function () {
/* Closing proxy sockets is only necessary if run() fails. */
proxy.frontEnd.close()
proxy.backEnd.close()

req.close()
rep.close()
global.gc?.()
})

describe("run", function () {
it("should proxy messages", async function () {
/* REQ -> foo -> ROUTER <-> DEALER -> foo -> REP
<- foo <- <- foo <-
-> bar -> -> bar ->
<- bar <- <- bar <-
pause
resume
-> baz -> -> baz ->
<- baz <- <- baz <-
-> qux -> -> qux ->
<- qux <- <- qux <-
*/

await proxy.frontEnd.bind(frontAddress)
await proxy.backEnd.bind(backAddress)

const done = proxy.run()

const messages = ["foo", "bar", "baz", "qux"]
const received: string[] = []

await req.connect(frontAddress)
await rep.connect(backAddress)

const echo = async () => {
for await (const msg of rep) {
await rep.send(msg)
}
}

const send = async () => {
for (const msg of messages) {
if (received.length === 2) {
proxy.pause()
proxy.resume()
}

await req.send(Buffer.from(msg))

const [res] = await req.receive()
received.push(res.toString())
if (received.length === messages.length) {
break
}
}

rep.close()
}

console.log(
`waiting for messages for proxy with ${proto} router/dealer...`,
)

await Promise.all([echo(), send()])
assert.deepEqual(received, messages)

proxy.terminate()
await done
console.log(`Done proxying with ${proto} router/dealer`)
describe(`proxy with ${proto} router/dealer`, () => {
describe("run", () => {
it("should proxy messages", async () => {
const worker = new Worker(__filename, {
workerData: {
proto,
},
})
await worker.terminate()
})
})
})
Expand Down
98 changes: 98 additions & 0 deletions test/unit/proxy-router-dealer-worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import {assert} from "chai"
import * as semver from "semver"
import * as zmq from "../../src"
import type {Proto} from "./helpers"
import {cleanSocket, uniqAddress} from "./helpers"
import {workerData} from "worker_threads"

async function testProxyRouterDealer(proto: Proto) {
/* ZMQ < 4.0.5 has no steerable proxy support. */
if (semver.satisfies(zmq.version, "< 4.0.5")) {
return
}

const proxy = new zmq.Proxy(new zmq.Router(), new zmq.Dealer())

const frontAddress = await uniqAddress(proto)
const backAddress = await uniqAddress(proto)

const req = new zmq.Request()
const rep = new zmq.Reply()

try {
/* REQ -> foo -> ROUTER <-> DEALER -> foo -> REP
<- foo <- <- foo <-
-> bar -> -> bar ->
<- bar <- <- bar <-
pause
resume
-> baz -> -> baz ->
<- baz <- <- baz <-
-> qux -> -> qux ->
<- qux <- <- qux <-
*/
await proxy.frontEnd.bind(frontAddress)
await proxy.backEnd.bind(backAddress)

const done = proxy.run()

const messages = ["foo", "bar", "baz", "qux"]
const received: string[] = []

await req.connect(frontAddress)
await rep.connect(backAddress)

const echo = async () => {
for await (const msg of rep) {
await rep.send(msg)
}
}

const send = async () => {
for (const msg of messages) {
if (received.length === 2) {
proxy.pause()
proxy.resume()
}

await req.send(Buffer.from(msg))

const [res] = await req.receive()
received.push(res.toString())
if (received.length === messages.length) {
break
}
}

rep.close()
}

console.log(`waiting for messages for proxy with ${proto} router/dealer...`)

await Promise.all([echo(), send()])
assert.deepEqual(received, messages)

proxy.terminate()
await done
console.log(`Done proxying with ${proto} router/dealer`)
} catch (err) {
/* Closing proxy sockets is only necessary if run() fails. */
proxy.frontEnd.close()
proxy.backEnd.close()
throw err
} finally {
req.close()
rep.close()
global.gc?.()
await Promise.all([cleanSocket(frontAddress), cleanSocket(backAddress)])
}
}

// Receive the proto from the main thread
testProxyRouterDealer(workerData.proto as Proto).catch(err => {
console.error(
`Error testing proxy with ${workerData.proto} router/dealer:`,
err,
)
process.exit(1)
})
2 changes: 1 addition & 1 deletion test/unit/proxy-run-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import * as semver from "semver"
import * as zmq from "../../src"

import {assert} from "chai"
import {testProtos, uniqAddress} from "./helpers"
import {cleanSocket, testProtos, uniqAddress} from "./helpers"
import {isFullError} from "../../src/errors"

for (const proto of testProtos("tcp", "ipc", "inproc")) {
Expand Down
6 changes: 5 additions & 1 deletion test/unit/typings-compatibility-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,11 @@ describe("compatibility of typings for typescript versions", async function () {
})

afterEach(async () => {
await remove(tscTargetPath)
try {
await remove(tscTargetPath)
} catch (err) {
console.error(`Failed to remove ${tscTargetPath}:`, err)
}
})
}
})
Expand Down
Loading