Skip to content

Commit

Permalink
Add migration to remove any dangling rows.
Browse files Browse the repository at this point in the history
  • Loading branch information
rkistner committed Nov 8, 2024
1 parent ebfbbef commit 7edb71f
Show file tree
Hide file tree
Showing 8 changed files with 219 additions and 4 deletions.
47 changes: 47 additions & 0 deletions crates/core/src/fix035.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use alloc::format;

use crate::error::{PSResult, SQLiteError};
use sqlite_nostd as sqlite;
use sqlite_nostd::{Connection, ResultCode};

use crate::ext::SafeManagedStmt;
use crate::util::quote_identifier;

// Apply a data migration to fix any existing data affected by the issue
// fixed in v0.3.5.
//
// The issue was that the `ps_updated_rows` table was not being populated
// with remove operations in some cases. This causes the rows to be removed
// from ps_oplog, but not from the ps_data__tables, resulting in dangling rows.
//
// The fix here is to find these dangling rows, and add them to ps_updated_rows.
// The next time the sync_local operation is run, these rows will be removed.
pub fn apply_v035_fix(db: *mut sqlite::sqlite3) -> Result<i64, SQLiteError> {
// language=SQLite
let statement = db
.prepare_v2("SELECT name, powersync_external_table_name(name) FROM sqlite_master WHERE type='table' AND name GLOB 'ps_data__*'")
.into_db_result(db)?;

while statement.step()? == ResultCode::ROW {
let full_name = statement.column_text(0)?;
let short_name = statement.column_text(1)?;
let quoted = quote_identifier(full_name);

// language=SQLite
let statement = db.prepare_v2(&format!(
"
INSERT OR IGNORE INTO ps_updated_rows(row_type, row_id)
SELECT ?1, id FROM {}
WHERE NOT EXISTS (
SELECT 1 FROM ps_oplog
WHERE row_type = ?1 AND row_id = {}.id
);",
quoted, quoted
))?;
statement.bind_text(1, short_name, sqlite::Destructor::STATIC)?;

statement.exec()?;
}

Ok(1)
}
1 change: 1 addition & 0 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ mod crud_vtab;
mod diff;
mod error;
mod ext;
mod fix035;
mod kv;
mod macros;
mod migrations;
Expand Down
18 changes: 18 additions & 0 deletions crates/core/src/migrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use sqlite_nostd as sqlite;
use sqlite_nostd::{Connection, Context};

use crate::error::{PSResult, SQLiteError};
use crate::fix035::apply_v035_fix;

pub fn powersync_migrate(
ctx: *mut sqlite::context,
Expand Down Expand Up @@ -283,5 +284,22 @@ VALUES(5,
.into_db_result(local_db)?;
}

if current_version < 6 && target_version >= 6 {
// language=SQLite
apply_v035_fix(local_db)?;

local_db
.exec_safe(
"\
INSERT INTO ps_migration(id, down_migrations)
VALUES(6,
json_array(
json_object('sql', 'DELETE FROM ps_migration WHERE id >= 6')
));
",
)
.into_db_result(local_db)?;
}

Ok(())
}
2 changes: 1 addition & 1 deletion crates/core/src/view_admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ fn powersync_init_impl(

setup_internal_views(local_db)?;

powersync_migrate(ctx, 5)?;
powersync_migrate(ctx, 6)?;

Ok(String::from(""))
}
Expand Down
36 changes: 36 additions & 0 deletions dart/test/migration_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import 'package:test/test.dart';

import 'utils/native_test_utils.dart';
import 'utils/migration_fixtures.dart' as fixtures;
import 'utils/fix_035_fixtures.dart' as fix035;
import 'utils/schema.dart';

void main() {
Expand Down Expand Up @@ -175,5 +176,40 @@ void main() {
'${fixtures.expectedState[3]!.replaceAll(RegExp(r';INSERT INTO ps_migration.*'), '').trim()}\n${fixtures.schemaDown3.trim()}';
expect(schema, equals(expected));
});

test('migrate from 5 with broken data', () async {
var tableSchema = {
'tables': [
{
'name': 'lists',
'columns': [
{'name': 'description', 'type': 'TEXT'}
]
},
{
'name': 'todos',
'columns': [
{'name': 'description', 'type': 'TEXT'}
]
}
]
};
db.select('select powersync_init()');
db.select(
'select powersync_replace_schema(?)', [jsonEncode(tableSchema)]);

db.select('select powersync_test_migration(5)');
db.execute(fix035.dataBroken);

db.select('select powersync_init()');
final data = getData(db);
expect(data, equals(fix035.dataMigrated.trim()));

db.select('insert into powersync_operations(op, data) values(?, ?)',
['sync_local', '']);

final data2 = getData(db);
expect(data2, equals(fix035.dataFixed.trim()));
});
});
}
54 changes: 54 additions & 0 deletions dart/test/utils/fix_035_fixtures.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/// Data with some records in actual tables but not in ps_oplog
const dataBroken = '''
;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete) VALUES
(1, 'b1', 0, 0, 0, 0, 120, 0),
(2, 'b2', 0, 0, 0, 0, 3, 0)
;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES
(1, 1, 'todos', 't1', '', '{}', 100),
(1, 2, 'todos', 't2', '', '{}', 20),
(2, 3, 'lists', 'l1', '', '{}', 3)
;INSERT INTO ps_data__lists(id, data) VALUES
('l1', '{}'),
('l3', '{}')
;INSERT INTO ps_data__todos(id, data) VALUES
('t1', '{}'),
('t2', '{}'),
('t3', '{}')
''';

/// Data after applying the migration fix, but before sync_local
const dataMigrated = '''
;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete) VALUES
(1, 'b1', 0, 0, 0, 0, 120, 0),
(2, 'b2', 0, 0, 0, 0, 3, 0)
;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES
(1, 1, 'todos', 't1', '', '{}', 100),
(1, 2, 'todos', 't2', '', '{}', 20),
(2, 3, 'lists', 'l1', '', '{}', 3)
;INSERT INTO ps_updated_rows(row_type, row_id) VALUES
('lists', 'l3'),
('todos', 't3')
;INSERT INTO ps_data__lists(id, data) VALUES
('l1', '{}'),
('l3', '{}')
;INSERT INTO ps_data__todos(id, data) VALUES
('t1', '{}'),
('t2', '{}'),
('t3', '{}')
''';

/// Data after applying the migration fix and sync_local
const dataFixed = '''
;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete) VALUES
(1, 'b1', 0, 0, 0, 0, 120, 0),
(2, 'b2', 0, 0, 0, 0, 3, 0)
;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES
(1, 1, 'todos', 't1', '', '{}', 100),
(1, 2, 'todos', 't2', '', '{}', 20),
(2, 3, 'lists', 'l1', '', '{}', 3)
;INSERT INTO ps_data__lists(id, data) VALUES
('l1', '{}')
;INSERT INTO ps_data__todos(id, data) VALUES
('t1', '{}'),
('t2', '{}')
''';
57 changes: 54 additions & 3 deletions dart/test/utils/migration_fixtures.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/// The current database version
const databaseVersion = 5;
const databaseVersion = 6;

/// This is the base database state that we expect at various schema versions.
/// Generated by loading the specific library version, and exporting the schema.
Expand Down Expand Up @@ -133,6 +133,45 @@ const expectedState = <int, String>{
;INSERT INTO ps_migration(id, down_migrations) VALUES(3, '[{"sql":"DELETE FROM ps_migration WHERE id >= 3"},{"sql":"DROP TABLE ps_kv"}]')
;INSERT INTO ps_migration(id, down_migrations) VALUES(4, '[{"sql":"DELETE FROM ps_migration WHERE id >= 4"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN op_checksum"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN remove_operations"}]')
;INSERT INTO ps_migration(id, down_migrations) VALUES(5, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"ALTER TABLE ps_buckets RENAME TO ps_buckets_5"},{"sql":"ALTER TABLE ps_oplog RENAME TO ps_oplog_5"},{"sql":"CREATE TABLE ps_buckets(\n name TEXT PRIMARY KEY,\n last_applied_op INTEGER NOT NULL DEFAULT 0,\n last_op INTEGER NOT NULL DEFAULT 0,\n target_op INTEGER NOT NULL DEFAULT 0,\n add_checksum INTEGER NOT NULL DEFAULT 0,\n pending_delete INTEGER NOT NULL DEFAULT 0\n, op_checksum INTEGER NOT NULL DEFAULT 0, remove_operations INTEGER NOT NULL DEFAULT 0)"},{"sql":"INSERT INTO ps_buckets(name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete)\n SELECT name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete FROM ps_buckets_5"},{"sql":"CREATE TABLE ps_oplog(\n bucket TEXT NOT NULL,\n op_id INTEGER NOT NULL,\n op INTEGER NOT NULL,\n row_type TEXT,\n row_id TEXT,\n key TEXT,\n data TEXT,\n hash INTEGER NOT NULL,\n superseded INTEGER NOT NULL)"},{"sql":"CREATE INDEX ps_oplog_by_row ON ps_oplog (row_type, row_id) WHERE superseded = 0"},{"sql":"CREATE INDEX ps_oplog_by_opid ON ps_oplog (bucket, op_id)"},{"sql":"CREATE INDEX ps_oplog_by_key ON ps_oplog (bucket, key) WHERE superseded = 0"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, key, data, hash, superseded)\n SELECT ps_buckets_5.name, oplog.op_id, 3, oplog.row_type, oplog.row_id, oplog.key, oplog.data, oplog.hash, 0\n FROM ps_oplog_5 oplog\n JOIN ps_buckets_5\n ON ps_buckets_5.id = oplog.bucket"},{"sql":"DROP TABLE ps_oplog_5"},{"sql":"DROP TABLE ps_buckets_5"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, hash, superseded)\n SELECT ''$local'', 1, 4, r.row_type, r.row_id, 0, 0\n FROM ps_updated_rows r"},{"sql":"INSERT OR REPLACE INTO ps_buckets(name, pending_delete, last_op, target_op) VALUES(''$local'', 1, 0, 9223372036854775807)"},{"sql":"DROP TABLE ps_updated_rows"},{"sql":"DELETE FROM ps_migration WHERE id >= 5"}]')
''',
6: r'''
;CREATE TABLE ps_buckets(
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
last_applied_op INTEGER NOT NULL DEFAULT 0,
last_op INTEGER NOT NULL DEFAULT 0,
target_op INTEGER NOT NULL DEFAULT 0,
add_checksum INTEGER NOT NULL DEFAULT 0,
op_checksum INTEGER NOT NULL DEFAULT 0,
pending_delete INTEGER NOT NULL DEFAULT 0
) STRICT
;CREATE TABLE ps_crud (id INTEGER PRIMARY KEY AUTOINCREMENT, data TEXT, tx_id INTEGER)
;CREATE TABLE ps_kv(key TEXT PRIMARY KEY NOT NULL, value BLOB)
;CREATE TABLE ps_migration(id INTEGER PRIMARY KEY, down_migrations TEXT)
;CREATE TABLE ps_oplog(
bucket INTEGER NOT NULL,
op_id INTEGER NOT NULL,
row_type TEXT,
row_id TEXT,
key TEXT,
data TEXT,
hash INTEGER NOT NULL) STRICT
;CREATE TABLE ps_tx(id INTEGER PRIMARY KEY NOT NULL, current_tx INTEGER, next_tx INTEGER)
;CREATE TABLE ps_untyped(type TEXT NOT NULL, id TEXT NOT NULL, data TEXT, PRIMARY KEY (type, id))
;CREATE TABLE ps_updated_rows(
row_type TEXT,
row_id TEXT,
PRIMARY KEY(row_type, row_id)) STRICT, WITHOUT ROWID
;CREATE UNIQUE INDEX ps_buckets_name ON ps_buckets (name)
;CREATE INDEX ps_oplog_key ON ps_oplog (bucket, key)
;CREATE INDEX ps_oplog_opid ON ps_oplog (bucket, op_id)
;CREATE INDEX ps_oplog_row ON ps_oplog (row_type, row_id)
;INSERT INTO ps_migration(id, down_migrations) VALUES(1, null)
;INSERT INTO ps_migration(id, down_migrations) VALUES(2, '[{"sql":"DELETE FROM ps_migration WHERE id >= 2","params":[]},{"sql":"DROP TABLE ps_tx","params":[]},{"sql":"ALTER TABLE ps_crud DROP COLUMN tx_id","params":[]}]')
;INSERT INTO ps_migration(id, down_migrations) VALUES(3, '[{"sql":"DELETE FROM ps_migration WHERE id >= 3"},{"sql":"DROP TABLE ps_kv"}]')
;INSERT INTO ps_migration(id, down_migrations) VALUES(4, '[{"sql":"DELETE FROM ps_migration WHERE id >= 4"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN op_checksum"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN remove_operations"}]')
;INSERT INTO ps_migration(id, down_migrations) VALUES(5, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"ALTER TABLE ps_buckets RENAME TO ps_buckets_5"},{"sql":"ALTER TABLE ps_oplog RENAME TO ps_oplog_5"},{"sql":"CREATE TABLE ps_buckets(\n name TEXT PRIMARY KEY,\n last_applied_op INTEGER NOT NULL DEFAULT 0,\n last_op INTEGER NOT NULL DEFAULT 0,\n target_op INTEGER NOT NULL DEFAULT 0,\n add_checksum INTEGER NOT NULL DEFAULT 0,\n pending_delete INTEGER NOT NULL DEFAULT 0\n, op_checksum INTEGER NOT NULL DEFAULT 0, remove_operations INTEGER NOT NULL DEFAULT 0)"},{"sql":"INSERT INTO ps_buckets(name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete)\n SELECT name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete FROM ps_buckets_5"},{"sql":"CREATE TABLE ps_oplog(\n bucket TEXT NOT NULL,\n op_id INTEGER NOT NULL,\n op INTEGER NOT NULL,\n row_type TEXT,\n row_id TEXT,\n key TEXT,\n data TEXT,\n hash INTEGER NOT NULL,\n superseded INTEGER NOT NULL)"},{"sql":"CREATE INDEX ps_oplog_by_row ON ps_oplog (row_type, row_id) WHERE superseded = 0"},{"sql":"CREATE INDEX ps_oplog_by_opid ON ps_oplog (bucket, op_id)"},{"sql":"CREATE INDEX ps_oplog_by_key ON ps_oplog (bucket, key) WHERE superseded = 0"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, key, data, hash, superseded)\n SELECT ps_buckets_5.name, oplog.op_id, 3, oplog.row_type, oplog.row_id, oplog.key, oplog.data, oplog.hash, 0\n FROM ps_oplog_5 oplog\n JOIN ps_buckets_5\n ON ps_buckets_5.id = oplog.bucket"},{"sql":"DROP TABLE ps_oplog_5"},{"sql":"DROP TABLE ps_buckets_5"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, hash, superseded)\n SELECT ''$local'', 1, 4, r.row_type, r.row_id, 0, 0\n FROM ps_updated_rows r"},{"sql":"INSERT OR REPLACE INTO ps_buckets(name, pending_delete, last_op, target_op) VALUES(''$local'', 1, 0, 9223372036854775807)"},{"sql":"DROP TABLE ps_updated_rows"},{"sql":"DELETE FROM ps_migration WHERE id >= 5"}]')
;INSERT INTO ps_migration(id, down_migrations) VALUES(6, '[{"sql":"DELETE FROM ps_migration WHERE id >= 6"}]')
'''
};

Expand Down Expand Up @@ -180,13 +219,24 @@ const data1 = <int, String>{
(2, 3, 'lists', 'l1', '', '{}', 3)
;INSERT INTO ps_updated_rows(row_type, row_id) VALUES
('lists', 'l2')
''',
6: r'''
;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete) VALUES
(1, 'b1', 0, 0, 0, 0, 120, 0),
(2, 'b2', 0, 0, 0, 1005, 3, 0)
;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES
(1, 1, 'todos', 't1', '', '{}', 100),
(1, 2, 'todos', 't2', '', '{}', 20),
(2, 3, 'lists', 'l1', '', '{}', 3)
;INSERT INTO ps_updated_rows(row_type, row_id) VALUES
('lists', 'l2')
'''
};

/// data to test "down" migrations
/// This is slightly different from the above,
/// since we don't preserve all data in the migration process
const dataDown1 = <int, String>{
final dataDown1 = <int, String>{
2: r'''
;INSERT INTO ps_buckets(name, last_applied_op, last_op, target_op, add_checksum, pending_delete) VALUES
('$local', 0, 0, 9223372036854775807, 0, 1),
Expand Down Expand Up @@ -219,7 +269,8 @@ const dataDown1 = <int, String>{
('b1', 1, 3, 'todos', 't1', '', '{}', 100, 0),
('b1', 2, 3, 'todos', 't2', '', '{}', 20, 0),
('b2', 3, 3, 'lists', 'l1', '', '{}', 3, 0)
'''
''',
5: data1[5]!
};

final finalData1 = data1[databaseVersion]!;
Expand Down
8 changes: 8 additions & 0 deletions dart/test/utils/schema.dart
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ String getData(CommonDatabase db) {
{
'table': 'ps_updated_rows',
'query': 'select * from ps_updated_rows order by row_type, row_id'
},
{
'table': 'ps_data__lists',
'query': 'select * from ps_data__lists order by id'
},
{
'table': 'ps_data__todos',
'query': 'select * from ps_data__todos order by id'
}
];
List<String> result = [];
Expand Down

0 comments on commit 7edb71f

Please sign in to comment.