From 00e9f8dc8867d3dec51a2b11a8429584685c9580 Mon Sep 17 00:00:00 2001 From: streamich Date: Sat, 30 Dec 2023 16:43:29 +0100 Subject: [PATCH 1/5] =?UTF-8?q?test(reactive-rpc):=20=F0=9F=92=8D=20setup?= =?UTF-8?q?=20testing=20for=20ws=20connection?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../server/ws/server/WsServerConnection.ts | 7 ++++-- .../__tests__/WsServerConnection.spec.ts | 23 +++++++++++++++++++ 2 files changed, 28 insertions(+), 2 deletions(-) create mode 100644 src/reactive-rpc/server/ws/server/__tests__/WsServerConnection.spec.ts diff --git a/src/reactive-rpc/server/ws/server/WsServerConnection.ts b/src/reactive-rpc/server/ws/server/WsServerConnection.ts index 11fbf6bd4b..ee35af8a6b 100644 --- a/src/reactive-rpc/server/ws/server/WsServerConnection.ts +++ b/src/reactive-rpc/server/ws/server/WsServerConnection.ts @@ -1,10 +1,12 @@ -import * as net from 'net'; import * as crypto from 'crypto'; +import * as stream from 'stream'; import {WsCloseFrame, WsFrameDecoder, WsFrameHeader, WsFrameOpcode, WsPingFrame, WsPongFrame} from '../codec'; import {utf8Size} from '../../../../util/strings/utf8'; import {FanOut} from 'thingies/es2020/fanout'; import type {WsFrameEncoder} from '../codec/WsFrameEncoder'; +export type WsServerConnectionSocket = stream.Duplex; + export class WsServerConnection { public closed: boolean = false; public maxIncomingMessage: number = 2 * 1024 * 1024; @@ -21,11 +23,12 @@ export class WsServerConnection { }; public onmessage: (data: Uint8Array, isUtf8: boolean) => void = () => {}; + public onfragment: (isLast: boolean, data: Uint8Array, isUtf8: boolean) => void = () => {}; public onping: (data: Uint8Array | null) => void = this.defaultOnPing; public onpong: (data: Uint8Array | null) => void = () => {}; public onclose: (code: number, reason: string) => void = () => {}; - constructor(protected readonly encoder: WsFrameEncoder, public readonly socket: net.Socket) { + constructor(protected readonly encoder: WsFrameEncoder, public readonly socket: WsServerConnectionSocket) { const decoder = new WsFrameDecoder(); let currentFrame: WsFrameHeader | null = null; const handleData = (data: Uint8Array): void => { diff --git a/src/reactive-rpc/server/ws/server/__tests__/WsServerConnection.spec.ts b/src/reactive-rpc/server/ws/server/__tests__/WsServerConnection.spec.ts new file mode 100644 index 0000000000..6e5652bc4c --- /dev/null +++ b/src/reactive-rpc/server/ws/server/__tests__/WsServerConnection.spec.ts @@ -0,0 +1,23 @@ +import * as stream from 'stream'; +import {WsServerConnection} from '../WsServerConnection'; +import {WsFrameEncoder} from '../../codec/WsFrameEncoder'; +import {until} from 'thingies'; + +const setup = () => { + const socket = new stream.PassThrough(); + const encoder = new WsFrameEncoder(); + const connection = new WsServerConnection(encoder, socket); + return {socket, encoder, connection}; +}; + +test('can parse PING frame', async () => { + const {socket, encoder, connection} = setup(); + const pings: Uint8Array[] = []; + connection.onping = (data: Uint8Array | null): void => { + if (data) pings.push(data); + }; + const pingFrame = encoder.encodePing(Buffer.from([0x01, 0x02, 0x03])); + socket.write(pingFrame); + await until(() => pings.length === 1); + expect(pings[0]).toEqual(new Uint8Array([0x01, 0x02, 0x03])); +}); From f15c9ac1b9baabaec50c54a33f680771d4bc6af4 Mon Sep 17 00:00:00 2001 From: streamich Date: Sat, 30 Dec 2023 17:09:33 +0100 Subject: [PATCH 2/5] =?UTF-8?q?fix(reactive-rpc):=20=F0=9F=90=9B=20decode?= =?UTF-8?q?=20close=20frame=20payload?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../server/ws/server/WsServerConnection.ts | 6 +- .../__tests__/WsServerConnection.spec.ts | 147 ++++++++++++++++-- 2 files changed, 141 insertions(+), 12 deletions(-) diff --git a/src/reactive-rpc/server/ws/server/WsServerConnection.ts b/src/reactive-rpc/server/ws/server/WsServerConnection.ts index ee35af8a6b..eb169d6d71 100644 --- a/src/reactive-rpc/server/ws/server/WsServerConnection.ts +++ b/src/reactive-rpc/server/ws/server/WsServerConnection.ts @@ -49,8 +49,10 @@ export class WsServerConnection { if (!frame) break; else if (frame instanceof WsPingFrame) this.onping(frame.data); else if (frame instanceof WsPongFrame) this.onpong(frame.data); - else if (frame instanceof WsCloseFrame) this.onClose(frame.code, frame.reason); - else if (frame instanceof WsFrameHeader) { + else if (frame instanceof WsCloseFrame) { + decoder.readCloseFrameData(frame); + this.onClose(frame.code, frame.reason); + } else if (frame instanceof WsFrameHeader) { if (this.stream) { if (frame.opcode !== WsFrameOpcode.CONTINUE) { this.onClose(1002, 'DATA'); diff --git a/src/reactive-rpc/server/ws/server/__tests__/WsServerConnection.spec.ts b/src/reactive-rpc/server/ws/server/__tests__/WsServerConnection.spec.ts index 6e5652bc4c..0efcff5535 100644 --- a/src/reactive-rpc/server/ws/server/__tests__/WsServerConnection.spec.ts +++ b/src/reactive-rpc/server/ws/server/__tests__/WsServerConnection.spec.ts @@ -2,6 +2,8 @@ import * as stream from 'stream'; import {WsServerConnection} from '../WsServerConnection'; import {WsFrameEncoder} from '../../codec/WsFrameEncoder'; import {until} from 'thingies'; +import {WsFrameOpcode} from '../../codec'; +import {bufferToUint8Array} from '../../../../../util/buffers/bufferToUint8Array'; const setup = () => { const socket = new stream.PassThrough(); @@ -10,14 +12,139 @@ const setup = () => { return {socket, encoder, connection}; }; -test('can parse PING frame', async () => { - const {socket, encoder, connection} = setup(); - const pings: Uint8Array[] = []; - connection.onping = (data: Uint8Array | null): void => { - if (data) pings.push(data); - }; - const pingFrame = encoder.encodePing(Buffer.from([0x01, 0x02, 0x03])); - socket.write(pingFrame); - await until(() => pings.length === 1); - expect(pings[0]).toEqual(new Uint8Array([0x01, 0x02, 0x03])); +describe('.onping', () => { + test('can parse PING frame', async () => { + const {socket, encoder, connection} = setup(); + const pings: Uint8Array[] = []; + connection.onping = (data: Uint8Array | null): void => { + if (data) pings.push(data); + }; + const pingFrame = encoder.encodePing(Buffer.from([0x01, 0x02, 0x03])); + socket.write(pingFrame); + await until(() => pings.length === 1); + expect(pings[0]).toEqual(new Uint8Array([0x01, 0x02, 0x03])); + }); + + test('can parse empty PING frame', async () => { + const {socket, encoder, connection} = setup(); + const pings: Uint8Array[] = []; + connection.onping = (data: Uint8Array | null): void => { + if (data) pings.push(data); + }; + const pingFrame = encoder.encodePing(Buffer.from([0x01, 0x02, 0x03])); + socket.write(pingFrame); + const pingFrame2 = encoder.encodePing(Buffer.from([])); + socket.write(pingFrame2); + await until(() => pings.length === 2); + expect(pings[0]).toEqual(new Uint8Array([0x01, 0x02, 0x03])); + expect(pings[1]).toEqual(new Uint8Array([])); + }); +}); + +describe('.onping', () => { + test('can parse PONG frame', async () => { + const {socket, encoder, connection} = setup(); + const pongs: Uint8Array[] = []; + connection.onpong = (data: Uint8Array | null): void => { + if (data) pongs.push(data); + }; + const pingFrame = encoder.encodePong(Buffer.from([0x01, 0x02, 0x03])); + socket.write(pingFrame); + await until(() => pongs.length === 1); + expect(pongs[0]).toEqual(new Uint8Array([0x01, 0x02, 0x03])); + }); +}); + +describe('.onclose', () => { + test('can parse CLOSE frame', async () => { + const {socket, encoder, connection} = setup(); + const closes: [code: number, reason: string][] = []; + connection.onclose = (code: number, reason: string): void => { + closes.push([code, reason]); + }; + const pingFrame = encoder.encodePong(Buffer.from([0x01, 0x02, 0x03])); + socket.write(pingFrame); + const closeFrame = encoder.encodeClose('OK', 1000); + socket.write(closeFrame); + await until(() => closes.length === 1); + expect(closes[0]).toEqual([1000, 'OK']); + }); +}); + +describe('.onmessage', () => { + describe('un-masked', () => { + test('binary data frame', async () => { + const {socket, encoder, connection} = setup(); + const messages: [data: Uint8Array, isUtf8: boolean][] = []; + connection.onmessage = (data: Uint8Array, isUtf8: boolean): void => { + messages.push([data, isUtf8]); + }; + const pingFrame = encoder.encodePong(Buffer.from([0x01, 0x02, 0x03])); + const closeFrame = encoder.encodeHdr(1, WsFrameOpcode.BINARY, 3, 0); + encoder.writer.buf(Buffer.from([0x01, 0x02, 0x03]), 3); + const payload = encoder.writer.flush(); + socket.write(pingFrame); + socket.write(closeFrame); + socket.write(payload); + await until(() => messages.length === 1); + expect(messages[0]).toEqual([new Uint8Array([0x01, 0x02, 0x03]), false]); + }); + + test('text frame', async () => { + const {socket, encoder, connection} = setup(); + const messages: [data: Uint8Array, isUtf8: boolean][] = []; + connection.onmessage = (data: Uint8Array, isUtf8: boolean): void => { + messages.push([data, isUtf8]); + }; + const pingFrame1 = encoder.encodePing(Buffer.from([0x01])); + const pingFrame2 = encoder.encodePing(Buffer.from([0x01, 0x02, 0x03])); + const closeFrame = encoder.encodeHdr(1, WsFrameOpcode.TEXT, 4, 0); + encoder.writer.buf(Buffer.from('asdf'), 4); + const payload = encoder.writer.flush(); + socket.write(pingFrame1); + socket.write(pingFrame2); + socket.write(closeFrame); + socket.write(payload); + await until(() => messages.length === 1); + expect(messages[0]).toEqual([bufferToUint8Array(Buffer.from('asdf')), true]); + }); + }); + + describe('masked', () => { + test('binary data frame', async () => { + const {socket, encoder, connection} = setup(); + const messages: [data: Uint8Array, isUtf8: boolean][] = []; + connection.onmessage = (data: Uint8Array, isUtf8: boolean): void => { + messages.push([data, isUtf8]); + }; + const pingFrame = encoder.encodePong(Buffer.from([0x01, 0x02, 0x03])); + const closeFrame = encoder.encodeHdr(1, WsFrameOpcode.BINARY, 3, 0x12345678); + encoder.writeBufXor(Buffer.from([0x01, 0x02, 0x03]), 0x12345678); + const payload = encoder.writer.flush(); + socket.write(pingFrame); + socket.write(closeFrame); + socket.write(payload); + await until(() => messages.length === 1); + expect(messages[0]).toEqual([new Uint8Array([0x01, 0x02, 0x03]), false]); + }); + + test('text frame', async () => { + const {socket, encoder, connection} = setup(); + const messages: [data: Uint8Array, isUtf8: boolean][] = []; + connection.onmessage = (data: Uint8Array, isUtf8: boolean): void => { + messages.push([data, isUtf8]); + }; + const pingFrame1 = encoder.encodePing(Buffer.from([0x01])); + const pingFrame2 = encoder.encodePing(Buffer.from([0x01, 0x02, 0x03])); + const closeFrame = encoder.encodeHdr(1, WsFrameOpcode.TEXT, 4, 0x12345678); + encoder.writeBufXor(Buffer.from('asdf'), 0x12345678); + const payload = encoder.writer.flush(); + socket.write(pingFrame1); + socket.write(pingFrame2); + socket.write(closeFrame); + socket.write(payload); + await until(() => messages.length === 1); + expect(messages[0]).toEqual([bufferToUint8Array(Buffer.from('asdf')), true]); + }); + }); }); From 6e2470fcfed59855ef36f2e98cefe0e67ce19c14 Mon Sep 17 00:00:00 2001 From: streamich Date: Sat, 30 Dec 2023 18:12:11 +0100 Subject: [PATCH 3/5] =?UTF-8?q?feat(reactive-rpc):=20=F0=9F=8E=B8=20add=20?= =?UTF-8?q?support=20for=20fragmented=20ws=20messages?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../server/ws/server/WsServerConnection.ts | 80 ++++++++++--------- .../__tests__/WsServerConnection.spec.ts | 63 ++++++++++++++- 2 files changed, 105 insertions(+), 38 deletions(-) diff --git a/src/reactive-rpc/server/ws/server/WsServerConnection.ts b/src/reactive-rpc/server/ws/server/WsServerConnection.ts index eb169d6d71..bc2b47a841 100644 --- a/src/reactive-rpc/server/ws/server/WsServerConnection.ts +++ b/src/reactive-rpc/server/ws/server/WsServerConnection.ts @@ -2,7 +2,6 @@ import * as crypto from 'crypto'; import * as stream from 'stream'; import {WsCloseFrame, WsFrameDecoder, WsFrameHeader, WsFrameOpcode, WsPingFrame, WsPongFrame} from '../codec'; import {utf8Size} from '../../../../util/strings/utf8'; -import {FanOut} from 'thingies/es2020/fanout'; import type {WsFrameEncoder} from '../codec/WsFrameEncoder'; export type WsServerConnectionSocket = stream.Duplex; @@ -12,12 +11,6 @@ export class WsServerConnection { public maxIncomingMessage: number = 2 * 1024 * 1024; public maxBackpressure: number = 2 * 1024 * 1024; - /** - * If this is not null, then the connection is receiving a stream: a sequence - * of fragment frames. - */ - protected stream: FanOut | null = null; - public readonly defaultOnPing = (data: Uint8Array | null): void => { this.sendPong(data); }; @@ -30,49 +23,64 @@ export class WsServerConnection { constructor(protected readonly encoder: WsFrameEncoder, public readonly socket: WsServerConnectionSocket) { const decoder = new WsFrameDecoder(); - let currentFrame: WsFrameHeader | null = null; + let currentFrameHeader: WsFrameHeader | null = null; + let fragmentStartFrameHeader: WsFrameHeader | null = null; const handleData = (data: Uint8Array): void => { try { decoder.push(data); - if (currentFrame) { - const length = currentFrame.length; - if (length <= decoder.reader.size()) { - const buf = new Uint8Array(length); - decoder.copyFrameData(currentFrame, buf, 0); - const isText = currentFrame.opcode === WsFrameOpcode.TEXT; - currentFrame = null; - this.onmessage(buf, isText); + main: while (true) { + if (currentFrameHeader instanceof WsFrameHeader) { + const length = currentFrameHeader.length; + if (length > this.maxIncomingMessage) { + this.onClose(1009, 'TOO_LARGE'); + return; + } + if (length <= decoder.reader.size()) { + const buf = new Uint8Array(length); + decoder.copyFrameData(currentFrameHeader, buf, 0); + if (fragmentStartFrameHeader instanceof WsFrameHeader) { + const isText = fragmentStartFrameHeader.opcode === WsFrameOpcode.TEXT; + const isLast = currentFrameHeader.fin === 1; + currentFrameHeader = null; + if (isLast) fragmentStartFrameHeader = null; + this.onfragment(isLast, buf, isText) + } else { + const isText = currentFrameHeader.opcode === WsFrameOpcode.TEXT; + currentFrameHeader = null; + this.onmessage(buf, isText); + } + } else break; } - } - while (true) { const frame = decoder.readFrameHeader(); if (!frame) break; - else if (frame instanceof WsPingFrame) this.onping(frame.data); - else if (frame instanceof WsPongFrame) this.onpong(frame.data); - else if (frame instanceof WsCloseFrame) { + if (frame instanceof WsPingFrame) { + this.onping(frame.data); + continue main; + } + if (frame instanceof WsPongFrame) { + this.onpong(frame.data); + continue main; + } + if (frame instanceof WsCloseFrame) { decoder.readCloseFrameData(frame); this.onClose(frame.code, frame.reason); - } else if (frame instanceof WsFrameHeader) { - if (this.stream) { + continue main; + } + if (frame instanceof WsFrameHeader) { + if (fragmentStartFrameHeader) { if (frame.opcode !== WsFrameOpcode.CONTINUE) { this.onClose(1002, 'DATA'); return; } - throw new Error('streaming not implemented'); + currentFrameHeader = frame; } - const length = frame.length; - if (length > this.maxIncomingMessage) { - this.onClose(1009, 'TOO_LARGE'); - return; - } - if (length <= decoder.reader.size()) { - const buf = new Uint8Array(length); - decoder.copyFrameData(frame, buf, 0); - const isText = frame.opcode === WsFrameOpcode.TEXT; - this.onmessage(buf, isText); - } else { - currentFrame = frame; + if (frame.fin === 0) { + fragmentStartFrameHeader = frame; + currentFrameHeader = frame; + continue main; } + currentFrameHeader = frame; + continue main; } } } catch (error) { diff --git a/src/reactive-rpc/server/ws/server/__tests__/WsServerConnection.spec.ts b/src/reactive-rpc/server/ws/server/__tests__/WsServerConnection.spec.ts index 0efcff5535..a40d5aa19a 100644 --- a/src/reactive-rpc/server/ws/server/__tests__/WsServerConnection.spec.ts +++ b/src/reactive-rpc/server/ws/server/__tests__/WsServerConnection.spec.ts @@ -4,6 +4,7 @@ import {WsFrameEncoder} from '../../codec/WsFrameEncoder'; import {until} from 'thingies'; import {WsFrameOpcode} from '../../codec'; import {bufferToUint8Array} from '../../../../../util/buffers/bufferToUint8Array'; +import {listToUint8} from '../../../../../util/buffers/concat'; const setup = () => { const socket = new stream.PassThrough(); @@ -80,15 +81,40 @@ describe('.onmessage', () => { messages.push([data, isUtf8]); }; const pingFrame = encoder.encodePong(Buffer.from([0x01, 0x02, 0x03])); - const closeFrame = encoder.encodeHdr(1, WsFrameOpcode.BINARY, 3, 0); + const frame = encoder.encodeHdr(1, WsFrameOpcode.BINARY, 3, 0); encoder.writer.buf(Buffer.from([0x01, 0x02, 0x03]), 3); const payload = encoder.writer.flush(); socket.write(pingFrame); - socket.write(closeFrame); + socket.write(frame); socket.write(payload); await until(() => messages.length === 1); expect(messages[0]).toEqual([new Uint8Array([0x01, 0x02, 0x03]), false]); }); + + test('two binary data frames', async () => { + const {socket, encoder, connection} = setup(); + const messages: [data: Uint8Array, isUtf8: boolean][] = []; + connection.onmessage = (data: Uint8Array, isUtf8: boolean): void => { + messages.push([data, isUtf8]); + }; + const pingFrame = encoder.encodePong(Buffer.from([0x01, 0x02, 0x03])); + const frame1 = encoder.encodeHdr(1, WsFrameOpcode.BINARY, 3, 0); + encoder.writer.buf(Buffer.from([0x01, 0x02, 0x03]), 3); + const payload1 = encoder.writer.flush(); + const frame2 = encoder.encodeHdr(1, WsFrameOpcode.BINARY, 3, 0); + encoder.writer.buf(Buffer.from([0x04, 0x05, 0x06]), 3); + const payload2 = encoder.writer.flush(); + socket.write(pingFrame); + socket.write(listToUint8([ + frame1, + payload1, + frame2, + payload2, + ])); + await until(() => messages.length === 2); + expect(messages[0]).toEqual([new Uint8Array([0x01, 0x02, 0x03]), false]); + expect(messages[1]).toEqual([new Uint8Array([0x04, 0x05, 0x06]), false]); + }); test('text frame', async () => { const {socket, encoder, connection} = setup(); @@ -148,3 +174,36 @@ describe('.onmessage', () => { }); }); }); + +describe('.fragment', () => { + test('parses out message fragments', async () => { + const {socket, encoder, connection} = setup(); + const fragments: [isLast: boolean, data: Uint8Array, isUtf8: boolean][] = []; + connection.onfragment = (isLast: boolean, data: Uint8Array, isUtf8: boolean): void => { + fragments.push([isLast, data, isUtf8]); + }; + const pingFrame = encoder.encodePong(Buffer.from([0x01, 0x02, 0x03])); + const buf1 = encoder.encodeHdr(0, WsFrameOpcode.BINARY, 3, 0); + encoder.writer.buf(Buffer.from([0x01, 0x02, 0x03]), 3); + const buf2 = encoder.writer.flush(); + const buf3 = encoder.encodeHdr(0, WsFrameOpcode.CONTINUE, 3, 0); + encoder.writer.buf(Buffer.from([0x04, 0x05, 0x06]), 3); + const buf4 = encoder.writer.flush(); + const buf5 = encoder.encodeHdr(1, WsFrameOpcode.CONTINUE, 3, 0); + encoder.writer.buf(Buffer.from([0x07, 0x08, 0x09]), 3); + const buf6 = encoder.writer.flush(); + socket.write(pingFrame); + socket.write(buf1); + socket.write(buf2); + socket.write(buf3); + socket.write(buf4); + socket.write(buf5); + socket.write(buf6); + await until(() => fragments.length === 3); + expect(fragments).toEqual([ + [false, new Uint8Array([0x01, 0x02, 0x03]), false], + [false, new Uint8Array([0x04, 0x05, 0x06]), false], + [true, new Uint8Array([0x07, 0x08, 0x09]), false], + ]); + }); +}); From 9df37a0a7b78d3fa38d04ea563ef6a7f16ddf6f1 Mon Sep 17 00:00:00 2001 From: streamich Date: Sat, 30 Dec 2023 18:31:55 +0100 Subject: [PATCH 4/5] =?UTF-8?q?feat(reactive-rpc):=20=F0=9F=8E=B8=20build?= =?UTF-8?q?=20full=20messages=20out=20of=20fragments=20by=20default?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../server/ws/server/WsServerConnection.ts | 20 +++- .../__tests__/WsServerConnection.spec.ts | 94 ++++++++++++++++++- 2 files changed, 112 insertions(+), 2 deletions(-) diff --git a/src/reactive-rpc/server/ws/server/WsServerConnection.ts b/src/reactive-rpc/server/ws/server/WsServerConnection.ts index bc2b47a841..ec766e0259 100644 --- a/src/reactive-rpc/server/ws/server/WsServerConnection.ts +++ b/src/reactive-rpc/server/ws/server/WsServerConnection.ts @@ -2,6 +2,7 @@ import * as crypto from 'crypto'; import * as stream from 'stream'; import {WsCloseFrame, WsFrameDecoder, WsFrameHeader, WsFrameOpcode, WsPingFrame, WsPongFrame} from '../codec'; import {utf8Size} from '../../../../util/strings/utf8'; +import {listToUint8} from '../../../../util/buffers/concat'; import type {WsFrameEncoder} from '../codec/WsFrameEncoder'; export type WsServerConnectionSocket = stream.Duplex; @@ -15,8 +16,25 @@ export class WsServerConnection { this.sendPong(data); }; + private _fragments: Uint8Array[] = []; + private _fragmentsSize: number = 0; + public readonly defaultOnFragment = (isLast: boolean, data: Uint8Array, isUtf8: boolean): void => { + const fragments = this._fragments; + this._fragmentsSize += data.length; + if (this._fragmentsSize > this.maxIncomingMessage) { + this.onClose(1009, 'TOO_LARGE'); + return; + } + fragments.push(data); + if (!isLast) return; + this._fragments = []; + this._fragmentsSize = 0; + const message = listToUint8(fragments); + this.onmessage(message, isUtf8); + }; + public onmessage: (data: Uint8Array, isUtf8: boolean) => void = () => {}; - public onfragment: (isLast: boolean, data: Uint8Array, isUtf8: boolean) => void = () => {}; + public onfragment: (isLast: boolean, data: Uint8Array, isUtf8: boolean) => void = this.defaultOnFragment; public onping: (data: Uint8Array | null) => void = this.defaultOnPing; public onpong: (data: Uint8Array | null) => void = () => {}; public onclose: (code: number, reason: string) => void = () => {}; diff --git a/src/reactive-rpc/server/ws/server/__tests__/WsServerConnection.spec.ts b/src/reactive-rpc/server/ws/server/__tests__/WsServerConnection.spec.ts index a40d5aa19a..f09ace68ad 100644 --- a/src/reactive-rpc/server/ws/server/__tests__/WsServerConnection.spec.ts +++ b/src/reactive-rpc/server/ws/server/__tests__/WsServerConnection.spec.ts @@ -115,6 +115,37 @@ describe('.onmessage', () => { expect(messages[0]).toEqual([new Uint8Array([0x01, 0x02, 0x03]), false]); expect(messages[1]).toEqual([new Uint8Array([0x04, 0x05, 0x06]), false]); }); + + test('errors when incoming message is too large', async () => { + const {socket, encoder, connection} = setup(); + connection.maxIncomingMessage = 3; + const messages: [data: Uint8Array, isUtf8: boolean][] = []; + connection.onmessage = (data: Uint8Array, isUtf8: boolean): void => { + messages.push([data, isUtf8]); + }; + const closes: [code: number, reason: string][] = []; + connection.onclose = (code: number, reason: string): void => { + closes.push([code, reason]); + }; + const pingFrame = encoder.encodePong(Buffer.from([0x01, 0x02, 0x03])); + const frame1 = encoder.encodeHdr(1, WsFrameOpcode.BINARY, 3, 0); + encoder.writer.buf(Buffer.from([0x01, 0x02, 0x03]), 3); + const payload1 = encoder.writer.flush(); + const frame2 = encoder.encodeHdr(1, WsFrameOpcode.BINARY, 4, 0); + encoder.writer.buf(Buffer.from([0x04, 0x05, 0x06, 0x07]), 4); + const payload2 = encoder.writer.flush(); + socket.write(pingFrame); + socket.write(listToUint8([ + frame1, + payload1, + frame2, + payload2, + ])); + await until(() => messages.length === 1); + await until(() => closes.length === 1); + expect(messages[0]).toEqual([new Uint8Array([0x01, 0x02, 0x03]), false]); + expect(closes[0]).toEqual([1009, 'TOO_LARGE']); + }); test('text frame', async () => { const {socket, encoder, connection} = setup(); @@ -175,7 +206,7 @@ describe('.onmessage', () => { }); }); -describe('.fragment', () => { +describe('.onfragment', () => { test('parses out message fragments', async () => { const {socket, encoder, connection} = setup(); const fragments: [isLast: boolean, data: Uint8Array, isUtf8: boolean][] = []; @@ -206,4 +237,65 @@ describe('.fragment', () => { [true, new Uint8Array([0x07, 0x08, 0x09]), false], ]); }); + + describe('when .onfragment is not defined', () => { + test('emits an .onmessage with fully assembled message', async () => { + const {socket, encoder, connection} = setup(); + const messages: [data: Uint8Array, isUtf8: boolean][] = []; + connection.onmessage = (data: Uint8Array, isUtf8: boolean): void => { + messages.push([data, isUtf8]); + }; + const pingFrame = encoder.encodePong(Buffer.from([0x01, 0x02, 0x03])); + const buf1 = encoder.encodeHdr(0, WsFrameOpcode.BINARY, 3, 0); + encoder.writer.buf(Buffer.from([0x01, 0x02, 0x03]), 3); + const buf2 = encoder.writer.flush(); + const buf3 = encoder.encodeHdr(0, WsFrameOpcode.CONTINUE, 3, 0); + encoder.writer.buf(Buffer.from([0x04, 0x05, 0x06]), 3); + const buf4 = encoder.writer.flush(); + const buf5 = encoder.encodeHdr(1, WsFrameOpcode.CONTINUE, 3, 0); + encoder.writer.buf(Buffer.from([0x07, 0x08, 0x09]), 3); + const buf6 = encoder.writer.flush(); + socket.write(pingFrame); + socket.write(buf1); + socket.write(buf2); + socket.write(buf3); + socket.write(buf4); + socket.write(buf5); + socket.write(buf6); + await until(() => messages.length === 1); + expect(messages).toEqual([ + [new Uint8Array([0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07,0x08, 0x09]), false], + ]); + }); + + test('errors out when incoming message is too large', async () => { + const {socket, encoder, connection} = setup(); + connection.maxIncomingMessage = 8; + const closes: [code: number, reason: string][] = []; + connection.onclose = (code: number, reason: string): void => { + closes.push([code, reason]); + }; + const pingFrame = encoder.encodePong(Buffer.from([0x01, 0x02, 0x03])); + const buf1 = encoder.encodeHdr(0, WsFrameOpcode.BINARY, 3, 0); + encoder.writer.buf(Buffer.from([0x01, 0x02, 0x03]), 3); + const buf2 = encoder.writer.flush(); + const buf3 = encoder.encodeHdr(0, WsFrameOpcode.CONTINUE, 3, 0); + encoder.writer.buf(Buffer.from([0x04, 0x05, 0x06]), 3); + const buf4 = encoder.writer.flush(); + const buf5 = encoder.encodeHdr(1, WsFrameOpcode.CONTINUE, 3, 0); + encoder.writer.buf(Buffer.from([0x07, 0x08, 0x09]), 3); + const buf6 = encoder.writer.flush(); + socket.write(pingFrame); + socket.write(buf1); + socket.write(buf2); + socket.write(buf3); + socket.write(buf4); + socket.write(buf5); + socket.write(buf6); + await until(() => closes.length === 1); + expect(closes).toEqual([ + [1009, 'TOO_LARGE'], + ]); + }); + }); }); From c209dc20c2ddc319c6a3f0e6cce828aab826f1df Mon Sep 17 00:00:00 2001 From: streamich Date: Sat, 30 Dec 2023 18:32:14 +0100 Subject: [PATCH 5/5] =?UTF-8?q?style(reactive-rpc):=20=F0=9F=92=84=20run?= =?UTF-8?q?=20Prettier?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../server/ws/server/WsServerConnection.ts | 2 +- .../__tests__/WsServerConnection.spec.ts | 26 +++++-------------- 2 files changed, 7 insertions(+), 21 deletions(-) diff --git a/src/reactive-rpc/server/ws/server/WsServerConnection.ts b/src/reactive-rpc/server/ws/server/WsServerConnection.ts index ec766e0259..0e842e7c74 100644 --- a/src/reactive-rpc/server/ws/server/WsServerConnection.ts +++ b/src/reactive-rpc/server/ws/server/WsServerConnection.ts @@ -61,7 +61,7 @@ export class WsServerConnection { const isLast = currentFrameHeader.fin === 1; currentFrameHeader = null; if (isLast) fragmentStartFrameHeader = null; - this.onfragment(isLast, buf, isText) + this.onfragment(isLast, buf, isText); } else { const isText = currentFrameHeader.opcode === WsFrameOpcode.TEXT; currentFrameHeader = null; diff --git a/src/reactive-rpc/server/ws/server/__tests__/WsServerConnection.spec.ts b/src/reactive-rpc/server/ws/server/__tests__/WsServerConnection.spec.ts index f09ace68ad..4b50147b73 100644 --- a/src/reactive-rpc/server/ws/server/__tests__/WsServerConnection.spec.ts +++ b/src/reactive-rpc/server/ws/server/__tests__/WsServerConnection.spec.ts @@ -105,12 +105,7 @@ describe('.onmessage', () => { encoder.writer.buf(Buffer.from([0x04, 0x05, 0x06]), 3); const payload2 = encoder.writer.flush(); socket.write(pingFrame); - socket.write(listToUint8([ - frame1, - payload1, - frame2, - payload2, - ])); + socket.write(listToUint8([frame1, payload1, frame2, payload2])); await until(() => messages.length === 2); expect(messages[0]).toEqual([new Uint8Array([0x01, 0x02, 0x03]), false]); expect(messages[1]).toEqual([new Uint8Array([0x04, 0x05, 0x06]), false]); @@ -135,18 +130,13 @@ describe('.onmessage', () => { encoder.writer.buf(Buffer.from([0x04, 0x05, 0x06, 0x07]), 4); const payload2 = encoder.writer.flush(); socket.write(pingFrame); - socket.write(listToUint8([ - frame1, - payload1, - frame2, - payload2, - ])); + socket.write(listToUint8([frame1, payload1, frame2, payload2])); await until(() => messages.length === 1); await until(() => closes.length === 1); expect(messages[0]).toEqual([new Uint8Array([0x01, 0x02, 0x03]), false]); expect(closes[0]).toEqual([1009, 'TOO_LARGE']); }); - + test('text frame', async () => { const {socket, encoder, connection} = setup(); const messages: [data: Uint8Array, isUtf8: boolean][] = []; @@ -184,7 +174,7 @@ describe('.onmessage', () => { await until(() => messages.length === 1); expect(messages[0]).toEqual([new Uint8Array([0x01, 0x02, 0x03]), false]); }); - + test('text frame', async () => { const {socket, encoder, connection} = setup(); const messages: [data: Uint8Array, isUtf8: boolean][] = []; @@ -263,9 +253,7 @@ describe('.onfragment', () => { socket.write(buf5); socket.write(buf6); await until(() => messages.length === 1); - expect(messages).toEqual([ - [new Uint8Array([0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07,0x08, 0x09]), false], - ]); + expect(messages).toEqual([[new Uint8Array([0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09]), false]]); }); test('errors out when incoming message is too large', async () => { @@ -293,9 +281,7 @@ describe('.onfragment', () => { socket.write(buf5); socket.write(buf6); await until(() => closes.length === 1); - expect(closes).toEqual([ - [1009, 'TOO_LARGE'], - ]); + expect(closes).toEqual([[1009, 'TOO_LARGE']]); }); }); });