Skip to content

Commit

Permalink
[Postgres] Fix slot recovery & improve log output (#162)
Browse files Browse the repository at this point in the history
* Fix replication slot recovery.

* Reduce log noise for "Waiting until ... before creating checkpoint".

* Improved logs for clearing data.

* Remove .only.

* Add changesets.
  • Loading branch information
rkistner authored Dec 12, 2024
1 parent b3e28f7 commit e3a9343
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 8 deletions.
5 changes: 5 additions & 0 deletions .changeset/chatty-boxes-repeat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/service-module-postgres': patch
---

Fix replication slot recovery
5 changes: 5 additions & 0 deletions .changeset/serious-rivers-sin.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/service-core': patch
---

Reduce noise in log output
17 changes: 13 additions & 4 deletions modules/module-postgres/src/replication/WalStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,15 @@ export class WalStream {
if (slotExists) {
// This checks that the slot is still valid
const r = await this.checkReplicationSlot();
if (snapshotDone && r.needsNewSlot) {
// We keep the current snapshot, and create a new replication slot
throw new MissingReplicationSlotError(`Replication slot ${slotName} is not valid anymore`);
}
// We can have:
// needsInitialSync: true, needsNewSlot: true -> initial sync from scratch
// needsInitialSync: true, needsNewSlot: false -> resume initial sync
// needsInitialSync: false, needsNewSlot: true -> handled above
// needsInitialSync: false, needsNewSlot: false -> resume streaming replication
return {
needsInitialSync: !snapshotDone,
needsNewSlot: r.needsNewSlot
Expand All @@ -204,7 +213,7 @@ export class WalStream {
/**
* If a replication slot exists, check that it is healthy.
*/
private async checkReplicationSlot(): Promise<InitResult> {
private async checkReplicationSlot(): Promise<{ needsNewSlot: boolean }> {
let last_error = null;
const slotName = this.slot_name;

Expand Down Expand Up @@ -244,7 +253,7 @@ export class WalStream {

// Success
logger.info(`Slot ${slotName} appears healthy`);
return { needsInitialSync: false, needsNewSlot: false };
return { needsNewSlot: false };
} catch (e) {
last_error = e;
logger.warn(`${slotName} Replication slot error`, e);
Expand Down Expand Up @@ -274,9 +283,9 @@ export class WalStream {
// Sample: publication "powersync" does not exist
// Happens when publication deleted or never created.
// Slot must be re-created in this case.
logger.info(`${slotName} does not exist anymore, will create new slot`);
logger.info(`${slotName} is not valid anymore`);

return { needsInitialSync: true, needsNewSlot: true };
return { needsNewSlot: true };
}
// Try again after a pause
await new Promise((resolve) => setTimeout(resolve, 1000));
Expand Down
49 changes: 49 additions & 0 deletions modules/module-postgres/test/src/wal_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { pgwireRows } from '@powersync/service-jpgwire';
import * as crypto from 'crypto';
import { describe, expect, test } from 'vitest';
import { WalStreamTestContext } from './wal_stream_utils.js';
import { MissingReplicationSlotError } from '@module/replication/WalStream.js';

type StorageFactory = () => Promise<BucketStorageFactory>;

Expand Down Expand Up @@ -291,4 +292,52 @@ bucket_definitions:
expect(endRowCount - startRowCount).toEqual(0);
expect(endTxCount - startTxCount).toEqual(1);
});

test('reporting slot issues', async () => {
{
await using context = await WalStreamTestContext.open(factory);
const { pool } = context;
await context.updateSyncRules(`
bucket_definitions:
global:
data:
- SELECT id, description FROM "test_data"`);

await pool.query(
`CREATE TABLE test_data(id uuid primary key default uuid_generate_v4(), description text, num int8)`
);
await pool.query(
`INSERT INTO test_data(id, description) VALUES('8133cd37-903b-4937-a022-7c8294015a3a', 'test1') returning id as test_id`
);
await context.replicateSnapshot();
await context.startStreaming();

const data = await context.getBucketData('global[]');

expect(data).toMatchObject([
putOp('test_data', {
id: '8133cd37-903b-4937-a022-7c8294015a3a',
description: 'test1'
})
]);

expect(await context.storage!.getStatus()).toMatchObject({ active: true, snapshot_done: true });
}

{
await using context = await WalStreamTestContext.open(factory, { doNotClear: true });
const { pool } = context;
await pool.query('DROP PUBLICATION powersync');
await pool.query(`UPDATE test_data SET description = 'updated'`);
await pool.query('CREATE PUBLICATION powersync FOR ALL TABLES');

await context.loadActiveSyncRules();
await expect(async () => {
await context.replicateSnapshot();
}).rejects.toThrowError(MissingReplicationSlotError);

// The error is handled on a higher level, which triggers
// creating a new replication slot.
}
});
}
10 changes: 10 additions & 0 deletions modules/module-postgres/test/src/wal_stream_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,16 @@ export class WalStreamTestContext implements AsyncDisposable {
return this.storage!;
}

async loadActiveSyncRules() {
const syncRules = await this.factory.getActiveSyncRulesContent();
if (syncRules == null) {
throw new Error(`Active sync rules not available`);
}

this.storage = this.factory.getInstance(syncRules);
return this.storage!;
}

get walStream() {
if (this.storage == null) {
throw new Error('updateSyncRules() first');
Expand Down
11 changes: 8 additions & 3 deletions packages/service-core/src/storage/mongo/MongoBucketBatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,8 @@ export class MongoBucketBatch extends DisposableObserver<BucketBatchStorageListe
super[Symbol.dispose]();
}

private lastWaitingLogThottled = 0;

async commit(lsn: string): Promise<boolean> {
await this.flush();

Expand All @@ -619,9 +621,12 @@ export class MongoBucketBatch extends DisposableObserver<BucketBatchStorageListe
return false;
}
if (lsn < this.no_checkpoint_before_lsn) {
logger.info(
`Waiting until ${this.no_checkpoint_before_lsn} before creating checkpoint, currently at ${lsn}. Persisted op: ${this.persisted_op}`
);
if (Date.now() - this.lastWaitingLogThottled > 5_000) {
logger.info(
`Waiting until ${this.no_checkpoint_before_lsn} before creating checkpoint, currently at ${lsn}. Persisted op: ${this.persisted_op}`
);
this.lastWaitingLogThottled = Date.now();
}

// Edge case: During initial replication, we have a no_checkpoint_before_lsn set,
// and don't actually commit the snapshot.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -522,11 +522,13 @@ export class MongoSyncBucketStorage
while (true) {
try {
await this.clearIteration();

logger.info(`${this.slot_name} Done clearing data`);
return;
} catch (e: unknown) {
if (e instanceof mongo.MongoServerError && e.codeName == 'MaxTimeMSExpired') {
logger.info(
`Clearing took longer than ${db.mongo.MONGO_CLEAR_OPERATION_TIMEOUT_MS}ms, waiting and triggering another iteration.`
`${this.slot_name} Cleared batch of data in ${db.mongo.MONGO_CLEAR_OPERATION_TIMEOUT_MS}ms, continuing...`
);
await timers.setTimeout(db.mongo.MONGO_CLEAR_OPERATION_TIMEOUT_MS / 5);
continue;
Expand Down

0 comments on commit e3a9343

Please sign in to comment.