Skip to content

Commit

Permalink
feat(json-pack): 🎸 add skipping ability to streaming RESP decoder
Browse files Browse the repository at this point in the history
  • Loading branch information
streamich committed Dec 16, 2023
1 parent 61f4d3a commit 948c6cf
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 11 deletions.
22 changes: 22 additions & 0 deletions src/json-pack/resp/RespStreamingDecoder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,26 @@ export class RespStreamingDecoder {
} else throw error;
}
}

/**
* Skips one value from the stream. If `undefined` is returned, then
* there is not enough data to skip or the stream is finished.
* @returns `null` if a value was skipped, `undefined` if there is not
* enough data to skip.
*/
public skip(): null | undefined {
const reader = this.reader;
if (reader.size() === 0) return undefined;
const x = reader.x;
try {
this.decoder.skipAny();
reader.consume();
return null;
} catch (error) {
if (error instanceof RangeError) {
reader.x = x;
return undefined;
} else throw error;
}
}
}
40 changes: 29 additions & 11 deletions src/json-pack/resp/__tests__/skipping.spec.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,41 @@
import {RespEncoder} from '../RespEncoder';
import {RespDecoder} from '../RespDecoder';
import {RespStreamingDecoder} from '../RespStreamingDecoder';
import {documents} from '../../../__tests__/json-documents';
import {binaryDocuments} from '../../../__tests__/binary-documents';

const docs = [...documents, ...binaryDocuments];

const encoder = new RespEncoder();
const decoder = new RespDecoder();
const streamingDecoder = new RespStreamingDecoder();

describe('skipping', () => {
for (const t of docs) {
(t.only ? test.only : test)(t.name, () => {
encoder.writeAny(t.json);
encoder.writeAny({foo: 'bar'});
const encoded = encoder.writer.flush();
decoder.reader.reset(encoded);
decoder.skipAny();
const decoded = decoder.val();
expect(decoded).toEqual({foo: 'bar'});
});
}
describe('RespDecoder', () => {
for (const t of docs) {
(t.only ? test.only : test)(t.name, () => {
encoder.writeAny(t.json);
encoder.writeAny({foo: 'bar'});
const encoded = encoder.writer.flush();
decoder.reader.reset(encoded);
decoder.skipAny();
const decoded = decoder.val();
expect(decoded).toEqual({foo: 'bar'});
});
}
});

describe('RespStreamingDecoder', () => {
for (const t of docs) {
(t.only ? test.only : test)(t.name, () => {
encoder.writeAny(t.json);
encoder.writeAny({foo: 'bar'});
const encoded = encoder.writer.flush();
streamingDecoder.push(encoded);
streamingDecoder.skip();
const decoded = streamingDecoder.read();
expect(decoded).toEqual({foo: 'bar'});
});
}
});
});

0 comments on commit 948c6cf

Please sign in to comment.