From bf90a5d0f53b6e70f5aac462ac966ad53f6b1bfb Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Mon, 13 Jan 2025 17:30:40 -0500 Subject: [PATCH] mvcc: restore tombstone index if it's first revision The tombstone could be the only one available revision in database. It happens when all historical revisions have been deleted in previous compactions. Since tombstone revision is still in database, we should restore it as valid key index. Otherwise, we lost that event. Signed-off-by: Wei Fu --- server/storage/mvcc/key_index.go | 23 ++++++++++ server/storage/mvcc/key_index_test.go | 40 +++++++++++++++++ server/storage/mvcc/kv_test.go | 20 ++++++++- server/storage/mvcc/kvstore.go | 5 +++ tests/e2e/watch_test.go | 62 +++++++++++++++++++++++++++ 5 files changed, 148 insertions(+), 2 deletions(-) diff --git a/server/storage/mvcc/key_index.go b/server/storage/mvcc/key_index.go index 27b22fd4899..97e1f5bf756 100644 --- a/server/storage/mvcc/key_index.go +++ b/server/storage/mvcc/key_index.go @@ -116,6 +116,29 @@ func (ki *keyIndex) restore(lg *zap.Logger, created, modified Revision, ver int6 keysGauge.Inc() } +// restoreTombstone is only used to build index when there is no previous revision. +// Since there is no historical information, CreateRevision always is empty. +func (ki *keyIndex) restoreTombstone(lg *zap.Logger, main, sub int64) { + if len(ki.generations) != 0 { + lg.Panic( + "'restore' got an unexpected non-empty generations", + zap.Int("generations-size", len(ki.generations)), + ) + } + + rev := Revision{Main: main, Sub: sub} + + ki.modified = rev + ki.generations = append(ki.generations, + generation{ + ver: 1, + created: Revision{}, // unknown for first revision as tombstone + revs: []Revision{rev}, + }, + generation{}, + ) +} + // tombstone puts a revision, pointing to a tombstone, to the keyIndex. // It also creates a new empty generation in the keyIndex. // It returns ErrRevisionNotFound when tombstone on an empty generation. diff --git a/server/storage/mvcc/key_index_test.go b/server/storage/mvcc/key_index_test.go index f86e027e525..779e9b61ef6 100644 --- a/server/storage/mvcc/key_index_test.go +++ b/server/storage/mvcc/key_index_test.go @@ -24,6 +24,46 @@ import ( "go.uber.org/zap/zaptest" ) +func TestRestoreTombstone(t *testing.T) { + lg := zaptest.NewLogger(t) + + // restore from tombstone + // + // key: "foo" + // modified: 16 + // "created": 16 + // generations: + // {empty} + // {{16, 0}(t)[0]} + // + ki := &keyIndex{key: []byte("foo")} + ki.restoreTombstone(lg, 16, 0) + + // get should return not found + for retAt := 16; retAt <= 20; retAt++ { + _, _, _, err := ki.get(lg, int64(retAt)) + assert.ErrorIs(t, err, ErrRevisionNotFound) + } + + // doCompact should keep that tombstone + availables := map[Revision]struct{}{} + ki.doCompact(16, availables) + assert.Len(t, availables, 1) + _, ok := availables[Revision{Main: 16}] + assert.True(t, ok) + + // should be able to put new revisions + ki.put(lg, 17, 0) + ki.put(lg, 18, 0) + revs := ki.since(lg, 16) + assert.Equal(t, []Revision{{16, 0}, {17, 0}, {18, 0}}, revs) + + // compaction should remove restored tombstone + ki.compact(lg, 17, map[Revision]struct{}{}) + assert.Len(t, ki.generations, 1) + assert.Equal(t, []Revision{{17, 0}, {18, 0}}, ki.generations[0].revs) +} + func TestKeyIndexGet(t *testing.T) { // key: "foo" // modified: 16 diff --git a/server/storage/mvcc/kv_test.go b/server/storage/mvcc/kv_test.go index c727b444af7..790b534395b 100644 --- a/server/storage/mvcc/kv_test.go +++ b/server/storage/mvcc/kv_test.go @@ -25,6 +25,7 @@ import ( "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/assert" "go.uber.org/zap/zaptest" "go.etcd.io/etcd/api/v3/mvccpb" @@ -657,6 +658,8 @@ func TestKVHash(t *testing.T) { } func TestKVRestore(t *testing.T) { + compactBatchLimit := 5 + tests := []func(kv KV){ func(kv KV) { kv.Put([]byte("foo"), []byte("bar0"), 1) @@ -674,10 +677,23 @@ func TestKVRestore(t *testing.T) { kv.Put([]byte("foo"), []byte("bar1"), 2) kv.Compact(traceutil.TODO(), 1) }, + func(kv KV) { // after restore, foo1 key only has tombstone revision + kv.Put([]byte("foo1"), []byte("bar1"), 0) + kv.Put([]byte("foo2"), []byte("bar2"), 0) + kv.Put([]byte("foo3"), []byte("bar3"), 0) + kv.Put([]byte("foo4"), []byte("bar4"), 0) + kv.Put([]byte("foo5"), []byte("bar5"), 0) + _, delAtRev := kv.DeleteRange([]byte("foo1"), nil) + assert.Equal(t, int64(7), delAtRev) + + // after compaction and restore, foo1 key only has tombstone revision + ch, _ := kv.Compact(traceutil.TODO(), delAtRev) + <-ch + }, } for i, tt := range tests { b, _ := betesting.NewDefaultTmpBackend(t) - s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{CompactionBatchLimit: compactBatchLimit}) tt(s) var kvss [][]mvccpb.KeyValue for k := int64(0); k < 10; k++ { @@ -689,7 +705,7 @@ func TestKVRestore(t *testing.T) { s.Close() // ns should recover the previous state from backend. - ns := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + ns := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{CompactionBatchLimit: compactBatchLimit}) if keysRestore := readGaugeInt(keysGauge); keysBefore != keysRestore { t.Errorf("#%d: got %d key count, expected %d", i, keysRestore, keysBefore) diff --git a/server/storage/mvcc/kvstore.go b/server/storage/mvcc/kvstore.go index 3e1226c9174..4687f260f2c 100644 --- a/server/storage/mvcc/kvstore.go +++ b/server/storage/mvcc/kvstore.go @@ -477,6 +477,11 @@ func restoreIntoIndex(lg *zap.Logger, idx index) (chan<- revKeyValue, <-chan int ki.restore(lg, Revision{Main: rkv.kv.CreateRevision}, rev, rkv.kv.Version) idx.Insert(ki) kiCache[rkv.kstr] = ki + } else { + lg.Warn("historical revision(s) has been compacted", zap.Int64("main", rev.Main), zap.Int64("sub", rev.Sub)) + ki.restoreTombstone(lg, rev.Main, rev.Sub) + idx.Insert(ki) + kiCache[rkv.kstr] = ki } } }() diff --git a/tests/e2e/watch_test.go b/tests/e2e/watch_test.go index f167b8f8752..ccc3648ed06 100644 --- a/tests/e2e/watch_test.go +++ b/tests/e2e/watch_test.go @@ -488,3 +488,65 @@ func testStartWatcherFromCompactedRevision(t *testing.T, performCompactOnTombsto } } } + +func TestRestoreTombstone(t *testing.T) { + e2e.BeforeTest(t) + + ctx := context.Background() + compactBatchLimit := 5 + + cfg := e2e.DefaultConfig() + clus, err := e2e.NewEtcdProcessCluster(context.Background(), + t, + e2e.WithConfig(cfg), + e2e.WithClusterSize(1), + e2e.WithCompactionBatchLimit(compactBatchLimit), + e2e.WithGoFailEnabled(true), + e2e.WithWatchProcessNotifyInterval(100*time.Millisecond), + ) + require.NoError(t, err) + defer clus.Close() + + c1 := newClient(t, clus.EndpointsGRPC(), cfg.Client) + defer c1.Close() + + keyPrefix := "/key-" + for idx := 1; idx <= compactBatchLimit; idx++ { + key := fmt.Sprintf("%s%d", keyPrefix, idx) + value := fmt.Sprintf("%d", idx) + + t.Logf("PUT key=%s, val=%s", key, value) + _, err = c1.KV.Put(ctx, key, value) + require.NoError(t, err) + } + + firstKey := keyPrefix + "1" + t.Logf("DELETE key=%s", firstKey) + deleteResp, err := c1.KV.Delete(ctx, firstKey) + require.NoError(t, err) + + require.NoError(t, clus.Procs[0].Failpoints().SetupHTTP(ctx, "compactBeforeSetFinishedCompact", `panic`)) + + t.Logf("COMPACT rev=%d", deleteResp.Header.Revision) + _, err = c1.KV.Compact(ctx, deleteResp.Header.Revision, clientv3.WithCompactPhysical()) + require.Error(t, err) + + require.NoError(t, clus.Restart(ctx)) + + c2 := newClient(t, clus.EndpointsGRPC(), cfg.Client) + defer c2.Close() + + watchChan := c2.Watch(ctx, firstKey, clientv3.WithRev(deleteResp.Header.Revision)) + select { + case watchResp := <-watchChan: + require.Len(t, watchResp.Events, 1) + + require.Equal(t, mvccpb.DELETE, watchResp.Events[0].Type) + deletedKey := string(watchResp.Events[0].Kv.Key) + require.Equal(t, firstKey, deletedKey) + case <-time.After(100 * time.Millisecond): + // we care only about the first response, but have an + // escape hatch in case the watch response is delayed. + t.Fatal("timed out getting watch response") + } +}