diff --git a/src/json-crdt/file/File.ts b/src/json-crdt/file/File.ts index bc5fd3e112..15dd480931 100644 --- a/src/json-crdt/file/File.ts +++ b/src/json-crdt/file/File.ts @@ -1,5 +1,5 @@ import {Model} from '../model'; -import {PatchLog} from './PatchLog'; +import {PatchLog} from '../history/PatchLog'; import {printTree} from '../../util/print/printTree'; import {decodeModel, decodeNdjsonComponents, decodePatch, decodeSeqCborComponents} from './util'; import {Patch} from '../../json-crdt-patch'; @@ -46,9 +46,8 @@ export class File implements Printable { if (history) { const [start, patches] = history; if (start) { - const startModel = decodeModel(start); - log = new PatchLog(startModel); - for (const patch of patches) log.push(decodePatch(patch)); + log = new PatchLog(() => decodeModel(start)); + for (const patch of patches) log.end.applyPatch(decodePatch(patch)); } } if (!log) throw new Error('NO_HISTORY'); @@ -57,7 +56,7 @@ export class File implements Printable { for (const patch of frontier) { const patchDecoded = decodePatch(patch); decodedModel.applyPatch(patchDecoded); - log.push(patchDecoded); + log.end.applyPatch(patchDecoded); } } const file = new File(decodedModel, log); @@ -75,7 +74,7 @@ export class File implements Printable { } public static fromModel(model: Model, options: FileOptions = {}): File { - return new File(model, PatchLog.fromModel(model), options); + return new File(model, PatchLog.fromNewModel(model), options); } constructor( @@ -88,7 +87,7 @@ export class File implements Printable { const id = patch.getId(); if (!id) return; this.model.applyPatch(patch); - this.log.push(patch); + this.log.end.applyPatch(patch); } /** @@ -100,10 +99,10 @@ export class File implements Printable { const api = model.api; const autoflushUnsubscribe = api.autoFlush(); const onPatchUnsubscribe = api.onPatch.listen((patch) => { - log.push(patch); + log.end.applyPatch(patch); }); const onFlushUnsubscribe = api.onFlush.listen((patch) => { - log.push(patch); + log.end.applyPatch(patch); }); return () => { autoflushUnsubscribe(); @@ -153,7 +152,7 @@ export class File implements Printable { const patchFormat = params.history ?? 'binary'; switch (patchFormat) { case 'binary': { - history[0] = this.log.start.toBinary(); + history[0] = this.log.start().toBinary(); this.log.patches.forEach(({v}) => { history[1].push(v.toBinary()); }); @@ -162,7 +161,7 @@ export class File implements Printable { case 'compact': { const encoder = this.options.structuralCompactEncoder; if (!encoder) throw new Error('NO_COMPACT_ENCODER'); - history[0] = encoder.encode(this.log.start); + history[0] = encoder.encode(this.log.start()); const encodeCompact = this.options.patchCompactEncoder; if (!encodeCompact) throw new Error('NO_COMPACT_PATCH_ENCODER'); const list = history[1]; @@ -174,7 +173,7 @@ export class File implements Printable { case 'verbose': { const encoder = this.options.structuralVerboseEncoder; if (!encoder) throw new Error('NO_VERBOSE_ENCODER'); - history[0] = encoder.encode(this.log.start); + history[0] = encoder.encode(this.log.start()); const encodeVerbose = this.options.patchVerboseEncoder; if (!encodeVerbose) throw new Error('NO_VERBOSE_PATCH_ENCODER'); const list = history[1]; diff --git a/src/json-crdt/file/PatchLog.ts b/src/json-crdt/file/PatchLog.ts deleted file mode 100644 index a2040eb573..0000000000 --- a/src/json-crdt/file/PatchLog.ts +++ /dev/null @@ -1,61 +0,0 @@ -import {ITimestampStruct, Patch, compare} from '../../json-crdt-patch'; -import {printTree} from '../../util/print/printTree'; -import {AvlMap} from '../../util/trees/avl/AvlMap'; -import {Model} from '../model'; -import type {Printable} from '../../util/print/types'; -import {first, next} from '../../util/trees/util'; - -export class PatchLog implements Printable { - public static fromModel(model: Model): PatchLog { - const start = new Model(model.clock.clone()); - const log = new PatchLog(start); - if (model.api.builder.patch.ops.length) { - const patch = model.api.flush(); - log.push(patch); - } - return log; - } - - public readonly patches = new AvlMap(compare); - - constructor(public readonly start: Model) {} - - public push(patch: Patch): void { - const id = patch.getId(); - if (!id) return; - this.patches.set(id, patch); - } - - public replayToEnd(): Model { - const clone = this.start.clone(); - for (let node = first(this.patches.root); node; node = next(node)) clone.applyPatch(node.v); - return clone; - } - - public replayTo(ts: ITimestampStruct): Model { - const clone = this.start.clone(); - for (let node = first(this.patches.root); node && compare(ts, node.k) >= 0; node = next(node)) - clone.applyPatch(node.v); - return clone; - } - - // ---------------------------------------------------------------- Printable - - public toString(tab?: string) { - const log: Patch[] = []; - this.patches.forEach(({v}) => log.push(v)); - return ( - `log` + - printTree(tab, [ - (tab) => this.start.toString(tab), - () => '', - (tab) => - 'history' + - printTree( - tab, - log.map((patch, i) => (tab) => `${i}: ${patch.toString(tab)}`), - ), - ]) - ); - } -} diff --git a/src/json-crdt/file/__tests__/File.spec.ts b/src/json-crdt/file/__tests__/File.spec.ts index c97bcc7a51..7dd2015809 100644 --- a/src/json-crdt/file/__tests__/File.spec.ts +++ b/src/json-crdt/file/__tests__/File.spec.ts @@ -20,11 +20,11 @@ test('can create File from new model', () => { }), ); const file = File.fromModel(model); - expect(file.log.start.view()).toBe(undefined); + expect(file.log.start().view()).toBe(undefined); expect(file.model.view()).toEqual({ foo: 'bar', }); - expect(file.log.start.clock.sid).toBe(file.model.clock.sid); + expect(file.log.start().clock.sid).toBe(file.model.clock.sid); }); test.todo('patches are flushed and stored in memory'); @@ -56,7 +56,7 @@ describe('.toBinary()', () => { const file2 = File.fromNdjson(blob); expect(file2.model.view()).toEqual({foo: 'bar'}); expect(file2.model !== file.model).toBe(true); - expect(file.log.start.view()).toEqual(undefined); + expect(file.log.start().view()).toEqual(undefined); expect(file.log.replayToEnd().view()).toEqual({foo: 'bar'}); }); @@ -66,7 +66,7 @@ describe('.toBinary()', () => { const file2 = File.fromSeqCbor(blob); expect(file2.model.view()).toEqual({foo: 'bar'}); expect(file2.model !== file.model).toBe(true); - expect(file.log.start.view()).toEqual(undefined); + expect(file.log.start().view()).toEqual(undefined); expect(file.log.replayToEnd().view()).toEqual({foo: 'bar'}); }); }); @@ -78,7 +78,7 @@ describe('.toBinary()', () => { params.format === 'seq.cbor' ? File.fromSeqCbor(blob, fileEncoders) : File.fromNdjson(blob, fileEncoders); expect(file2.model.view()).toEqual(file.model.view()); expect(file2.model !== file.model).toBe(true); - expect(file2.log.start.view()).toEqual(undefined); + expect(file2.log.start().view()).toEqual(undefined); expect(file2.log.replayToEnd().view()).toEqual(file.model.view()); expect(file2.log.patches.size()).toBe(file.log.patches.size()); }; diff --git a/src/json-crdt/file/__tests__/PatchLog.spec.ts b/src/json-crdt/file/__tests__/PatchLog.spec.ts deleted file mode 100644 index c15e43c5c6..0000000000 --- a/src/json-crdt/file/__tests__/PatchLog.spec.ts +++ /dev/null @@ -1,29 +0,0 @@ -import {Model} from '../../model'; -import {File} from '../File'; - -const setup = (view: unknown) => { - const model = Model.withServerClock(); - model.api.root(view); - const file = File.fromModel(model); - return {model, file}; -}; - -test('can replay to specific patch', () => { - const {file} = setup({foo: 'bar'}); - const model = file.model.clone(); - model.api.obj([]).set({x: 1}); - const patch1 = model.api.flush(); - model.api.obj([]).set({y: 2}); - const patch2 = model.api.flush(); - file.apply(patch1); - file.apply(patch2); - const model2 = file.log.replayToEnd(); - const model3 = file.log.replayTo(patch1.getId()!); - const model4 = file.log.replayTo(patch2.getId()!); - expect(model.view()).toEqual({foo: 'bar', x: 1, y: 2}); - expect(file.model.view()).toEqual({foo: 'bar', x: 1, y: 2}); - expect(file.log.start.view()).toEqual(undefined); - expect(model2.view()).toEqual({foo: 'bar', x: 1, y: 2}); - expect(model3.view()).toEqual({foo: 'bar', x: 1}); - expect(model4.view()).toEqual({foo: 'bar', x: 1, y: 2}); -}); diff --git a/src/json-crdt/history/PatchLog.ts b/src/json-crdt/history/PatchLog.ts new file mode 100644 index 0000000000..34edc31883 --- /dev/null +++ b/src/json-crdt/history/PatchLog.ts @@ -0,0 +1,149 @@ +import {FanOutUnsubscribe} from 'thingies/es2020/fanout'; +import {ITimestampStruct, Patch, compare} from '../../json-crdt-patch'; +import {printTree} from '../../util/print/printTree'; +import {AvlMap} from '../../util/trees/avl/AvlMap'; +import {Model} from '../model'; +import {first, next} from '../../util/trees/util'; +import type {Printable} from '../../util/print/types'; + +export class PatchLog implements Printable { + /** + * Creates a `PatchLog` instance from a newly JSON CRDT model. Checks if + * the model API buffer has any initial operations applied, if yes, it + * uses them to create the initial state of the log. + * + * @param model A new JSON CRDT model, just created with + * `Model.withLogicalClock()` or `Model.withServerClock()`. + * @returns A new `PatchLog` instance. + */ + public static fromNewModel(model: Model): PatchLog { + const clock = model.clock.clone(); + const log = new PatchLog(() => new Model(clock)); + const api = model.api; + if (api.builder.patch.ops.length) log.end.applyPatch(api.flush()); + return log; + } + + /** + * Model factory function that creates a new JSON CRDT model instance, which + * is used as the starting point of the log. It is called every time a new + * model is needed to replay the log. + * + * @readonly Internally this function may be updated, but externally it is + * read-only. + */ + public start: () => Model; + + /** + * The end of the log, the current state of the document. It is the model + * instance that is used to apply new patches to the log. + * + * @readonly + */ + public readonly end: Model; + + /** + * The patches in the log, stored in an AVL tree for efficient replaying. The + * collection of patches which are applied to the `start()` model to reach + * the `end` model. + * + * @readonly + */ + public readonly patches = new AvlMap(compare); + + private __onPatch: FanOutUnsubscribe; + private __onFlush: FanOutUnsubscribe; + + constructor(start: () => Model) { + this.start = start; + const end = (this.end = start()); + const onPatch = (patch: Patch) => { + const id = patch.getId(); + if (!id) return; + this.patches.set(id, patch); + }; + const api = end.api; + this.__onPatch = api.onPatch.listen(onPatch); + this.__onFlush = api.onFlush.listen(onPatch); + } + + /** + * Call this method to destroy the `PatchLog` instance. It unsubscribes patch + * and flush listeners from the `end` model and clears the patch log. + */ + public destroy() { + this.__onPatch(); + this.__onFlush(); + this.patches.clear(); + } + + /** + * Creates a new model instance using the `start()` factory function and + * replays all patches in the log to reach the current state of the document. + * + * @returns A new model instance with all patches replayed. + */ + public replayToEnd(): Model { + const clone = this.start().clone(); + for (let node = first(this.patches.root); node; node = next(node)) clone.applyPatch(node.v); + return clone; + } + + /** + * Replays the patch log until a specified timestamp, including the patch + * at the given timestamp. The model returned is a new instance of `start()` + * with patches replayed up to the given timestamp. + * + * @param ts Timestamp ID of the patch to replay to. + * @returns A new model instance with patches replayed up to the given timestamp. + */ + public replayTo(ts: ITimestampStruct): Model { + const clone = this.start().clone(); + for (let node = first(this.patches.root); node && compare(ts, node.k) >= 0; node = next(node)) + clone.applyPatch(node.v); + return clone; + } + + /** + * Advance the start of the log to a specified timestamp, excluding the patch + * at the given timestamp. This method removes all patches from the log that + * are older than the given timestamp and updates the `start()` factory + * function to replay the log from the new start. + * + * @param ts Timestamp ID of the patch to advance to. + */ + public advanceTo(ts: ITimestampStruct): void { + const newStartPatches: Patch[] = []; + let node = first(this.patches.root); + for (; node && compare(ts, node.k) >= 0; node = next(node)) newStartPatches.push(node.v); + for (const patch of newStartPatches) this.patches.del(patch.getId()!); + const oldStart = this.start; + this.start = (): Model => { + const model = oldStart(); + for (const patch of newStartPatches) model.applyPatch(patch); + return model; + }; + } + + // ---------------------------------------------------------------- Printable + + public toString(tab?: string) { + const patches: Patch[] = []; + this.patches.forEach(({v}) => patches.push(v)); + return ( + `log` + + printTree(tab, [ + (tab) => `start` + printTree(tab, [(tab) => this.start().toString(tab)]), + () => '', + (tab) => + 'history' + + printTree( + tab, + patches.map((patch, i) => (tab) => `${i}: ${patch.toString(tab)}`), + ), + () => '', + (tab) => `end` + printTree(tab, [(tab) => this.end.toString(tab)]), + ]) + ); + } +} diff --git a/src/json-crdt/history/__tests__/PatchLog.spec.ts b/src/json-crdt/history/__tests__/PatchLog.spec.ts new file mode 100644 index 0000000000..4eb694dce4 --- /dev/null +++ b/src/json-crdt/history/__tests__/PatchLog.spec.ts @@ -0,0 +1,67 @@ +import {Model} from '../../model'; +import {PatchLog} from '../PatchLog'; + +const setup = (view: unknown) => { + const model = Model.withServerClock(); + model.api.root(view); + const log = PatchLog.fromNewModel(model); + return {log}; +}; + +test('can replay to specific patch', () => { + const {log} = setup({foo: 'bar'}); + const model = log.end.clone(); + model.api.obj([]).set({x: 1}); + const patch1 = model.api.flush(); + model.api.obj([]).set({y: 2}); + const patch2 = model.api.flush(); + log.end.applyPatch(patch1); + log.end.applyPatch(patch2); + const model2 = log.replayToEnd(); + const model3 = log.replayTo(patch1.getId()!); + const model4 = log.replayTo(patch2.getId()!); + expect(model.view()).toEqual({foo: 'bar', x: 1, y: 2}); + expect(log.end.view()).toEqual({foo: 'bar', x: 1, y: 2}); + expect(log.start().view()).toEqual(undefined); + expect(model2.view()).toEqual({foo: 'bar', x: 1, y: 2}); + expect(model3.view()).toEqual({foo: 'bar', x: 1}); + expect(model4.view()).toEqual({foo: 'bar', x: 1, y: 2}); +}); + +test('can advance the log from start', () => { + const {log} = setup({foo: 'bar'}); + log.end.api.obj([]).set({x: 1}); + const patch1 = log.end.api.flush(); + log.end.api.obj([]).set({y: 2}); + const patch2 = log.end.api.flush(); + log.end.api.obj([]).set({foo: 'baz'}); + const patch3 = log.end.api.flush(); + expect(log.end.view()).toEqual({foo: 'baz', x: 1, y: 2}); + expect(log.start().view()).toEqual(undefined); + log.advanceTo(patch1.getId()!); + expect(log.end.view()).toEqual({foo: 'baz', x: 1, y: 2}); + expect(log.start().view()).toEqual({foo: 'bar', x: 1}); + log.advanceTo(patch2.getId()!); + expect(log.end.view()).toEqual({foo: 'baz', x: 1, y: 2}); + expect(log.start().view()).toEqual({foo: 'bar', x: 1, y: 2}); + expect(log.patches.size()).toBe(1); + log.advanceTo(patch3.getId()!); + expect(log.end.view()).toEqual({foo: 'baz', x: 1, y: 2}); + expect(log.start().view()).toEqual({foo: 'baz', x: 1, y: 2}); + expect(log.patches.size()).toBe(0); +}); + +test('can advance multiple patches at once', () => { + const {log} = setup({foo: 'bar'}); + log.end.api.obj([]).set({x: 1}); + log.end.api.flush(); + log.end.api.obj([]).set({y: 2}); + const patch2 = log.end.api.flush(); + log.end.api.obj([]).set({foo: 'baz'}); + log.end.api.flush(); + expect(log.end.view()).toEqual({foo: 'baz', x: 1, y: 2}); + expect(log.start().view()).toEqual(undefined); + log.advanceTo(patch2.getId()!); + expect(log.end.view()).toEqual({foo: 'baz', x: 1, y: 2}); + expect(log.start().view()).toEqual({foo: 'bar', x: 1, y: 2}); +}); diff --git a/src/json-crdt/history/types.ts b/src/json-crdt/history/types.ts new file mode 100644 index 0000000000..45eade6e40 --- /dev/null +++ b/src/json-crdt/history/types.ts @@ -0,0 +1,50 @@ +import {Patch} from '../../json-crdt-patch'; +import {PatchLog} from '../history/PatchLog'; +import {Model} from '../model'; + +/** + * A history of patches that have been applied to a model, stored on the + * "remote": (1) server; (2) content addressable storage; or (3) peer-to-peer + * network. + * + * Cases: + * + * - `RemoteHistoryServer` + * - `RemoteHistoryServerIdempotent` + * - `RemoteHistoryCAS` + * - `RemoteHistoryP2P` + */ +export interface RemoteHistory { + /** + * Load latest state of the model, and any unmerged "tip" of patches + * it might have. + * + * @todo Maybe `state` and `tip` should be serialized to JSON? + */ + loadLatest(id: string): Promise<[cursor: Cursor, state: Model]>; + + loadAfter(id: string, cursor: Cursor): Promise<[cursor: Cursor, tip: Patch[]]>; + + loadBefore(id: string, cursor: Cursor): Promise<[cursor: Cursor, state: Model, tip: Patch[]]>; + + apply(id: string, patches: Patch[]): Promise; + + /** + * Subscribe to the latest changes to the model. + * @param callback + */ + subscribe(id: string, cursor: Cursor, callback: (changes: Patch[]) => void): void; +} + +export interface LocalHistory { + load(id: string): Promise; + // loadHistory(id: string): Promise; + apply(id: string, patches: Patch[]): Promise; +} + +export interface EditingSessionHistory { + load(id: string): Promise; + loadHistory(id: string): Promise; + undo(id: string): Promise; + redo(id: string): Promise; +}