From 8c2a6e7fa18e52b4708cf29accb98b7e0f047ece Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Mon, 13 Jan 2025 17:30:40 -0500 Subject: [PATCH] mvcc: restore tombstone index if it's first revision The tombstone could be the only one available revision in database. It happens when all historical revisions have been deleted in previous compactions. Since tombstone revision is still in database, we should restore it as valid key index. Otherwise, we lost that event. Signed-off-by: Wei Fu --- server/storage/mvcc/key_index.go | 23 ++++++++++ server/storage/mvcc/key_index_test.go | 41 ++++++++++++++++++ server/storage/mvcc/kv_test.go | 20 ++++++++- server/storage/mvcc/kvstore.go | 4 ++ tests/e2e/watch_test.go | 62 +++++++++++++++++++++++++++ 5 files changed, 148 insertions(+), 2 deletions(-) diff --git a/server/storage/mvcc/key_index.go b/server/storage/mvcc/key_index.go index 27b22fd4899..97e1f5bf756 100644 --- a/server/storage/mvcc/key_index.go +++ b/server/storage/mvcc/key_index.go @@ -116,6 +116,29 @@ func (ki *keyIndex) restore(lg *zap.Logger, created, modified Revision, ver int6 keysGauge.Inc() } +// restoreTombstone is only used to build index when there is no previous revision. +// Since there is no historical information, CreateRevision always is empty. +func (ki *keyIndex) restoreTombstone(lg *zap.Logger, main, sub int64) { + if len(ki.generations) != 0 { + lg.Panic( + "'restore' got an unexpected non-empty generations", + zap.Int("generations-size", len(ki.generations)), + ) + } + + rev := Revision{Main: main, Sub: sub} + + ki.modified = rev + ki.generations = append(ki.generations, + generation{ + ver: 1, + created: Revision{}, // unknown for first revision as tombstone + revs: []Revision{rev}, + }, + generation{}, + ) +} + // tombstone puts a revision, pointing to a tombstone, to the keyIndex. // It also creates a new empty generation in the keyIndex. // It returns ErrRevisionNotFound when tombstone on an empty generation. diff --git a/server/storage/mvcc/key_index_test.go b/server/storage/mvcc/key_index_test.go index f86e027e525..643ba11da99 100644 --- a/server/storage/mvcc/key_index_test.go +++ b/server/storage/mvcc/key_index_test.go @@ -20,10 +20,51 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.uber.org/zap" "go.uber.org/zap/zaptest" ) +func TestRestoreTombstone(t *testing.T) { + lg := zaptest.NewLogger(t) + + // restore from tombstone + // + // key: "foo" + // modified: 16 + // "created": 16 + // generations: + // {empty} + // {{16, 0}(t)[0]} + // + ki := &keyIndex{key: []byte("foo")} + ki.restoreTombstone(lg, 16, 0) + + // get should return not found + for retAt := 16; retAt <= 20; retAt++ { + _, _, _, err := ki.get(lg, int64(retAt)) + require.ErrorIs(t, err, ErrRevisionNotFound) + } + + // doCompact should keep that tombstone + availables := map[Revision]struct{}{} + ki.doCompact(16, availables) + require.Len(t, availables, 1) + _, ok := availables[Revision{Main: 16}] + require.True(t, ok) + + // should be able to put new revisions + ki.put(lg, 17, 0) + ki.put(lg, 18, 0) + revs := ki.since(lg, 16) + require.Equal(t, []Revision{{16, 0}, {17, 0}, {18, 0}}, revs) + + // compaction should remove restored tombstone + ki.compact(lg, 17, map[Revision]struct{}{}) + require.Len(t, ki.generations, 1) + require.Equal(t, []Revision{{17, 0}, {18, 0}}, ki.generations[0].revs) +} + func TestKeyIndexGet(t *testing.T) { // key: "foo" // modified: 16 diff --git a/server/storage/mvcc/kv_test.go b/server/storage/mvcc/kv_test.go index c727b444af7..790b534395b 100644 --- a/server/storage/mvcc/kv_test.go +++ b/server/storage/mvcc/kv_test.go @@ -25,6 +25,7 @@ import ( "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/assert" "go.uber.org/zap/zaptest" "go.etcd.io/etcd/api/v3/mvccpb" @@ -657,6 +658,8 @@ func TestKVHash(t *testing.T) { } func TestKVRestore(t *testing.T) { + compactBatchLimit := 5 + tests := []func(kv KV){ func(kv KV) { kv.Put([]byte("foo"), []byte("bar0"), 1) @@ -674,10 +677,23 @@ func TestKVRestore(t *testing.T) { kv.Put([]byte("foo"), []byte("bar1"), 2) kv.Compact(traceutil.TODO(), 1) }, + func(kv KV) { // after restore, foo1 key only has tombstone revision + kv.Put([]byte("foo1"), []byte("bar1"), 0) + kv.Put([]byte("foo2"), []byte("bar2"), 0) + kv.Put([]byte("foo3"), []byte("bar3"), 0) + kv.Put([]byte("foo4"), []byte("bar4"), 0) + kv.Put([]byte("foo5"), []byte("bar5"), 0) + _, delAtRev := kv.DeleteRange([]byte("foo1"), nil) + assert.Equal(t, int64(7), delAtRev) + + // after compaction and restore, foo1 key only has tombstone revision + ch, _ := kv.Compact(traceutil.TODO(), delAtRev) + <-ch + }, } for i, tt := range tests { b, _ := betesting.NewDefaultTmpBackend(t) - s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{CompactionBatchLimit: compactBatchLimit}) tt(s) var kvss [][]mvccpb.KeyValue for k := int64(0); k < 10; k++ { @@ -689,7 +705,7 @@ func TestKVRestore(t *testing.T) { s.Close() // ns should recover the previous state from backend. - ns := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + ns := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{CompactionBatchLimit: compactBatchLimit}) if keysRestore := readGaugeInt(keysGauge); keysBefore != keysRestore { t.Errorf("#%d: got %d key count, expected %d", i, keysRestore, keysBefore) diff --git a/server/storage/mvcc/kvstore.go b/server/storage/mvcc/kvstore.go index 3e1226c9174..af3b8263c95 100644 --- a/server/storage/mvcc/kvstore.go +++ b/server/storage/mvcc/kvstore.go @@ -477,6 +477,10 @@ func restoreIntoIndex(lg *zap.Logger, idx index) (chan<- revKeyValue, <-chan int ki.restore(lg, Revision{Main: rkv.kv.CreateRevision}, rev, rkv.kv.Version) idx.Insert(ki) kiCache[rkv.kstr] = ki + } else { + ki.restoreTombstone(lg, rev.Main, rev.Sub) + idx.Insert(ki) + kiCache[rkv.kstr] = ki } } }() diff --git a/tests/e2e/watch_test.go b/tests/e2e/watch_test.go index f167b8f8752..6810cd06184 100644 --- a/tests/e2e/watch_test.go +++ b/tests/e2e/watch_test.go @@ -488,3 +488,65 @@ func testStartWatcherFromCompactedRevision(t *testing.T, performCompactOnTombsto } } } + +func TestRestoreTombstone(t *testing.T) { + e2e.BeforeTest(t) + + ctx := context.Background() + compactBatchLimit := 5 + + cfg := e2e.DefaultConfig() + clus, err := e2e.NewEtcdProcessCluster(context.Background(), + t, + e2e.WithConfig(cfg), + e2e.WithClusterSize(1), + e2e.WithCompactionBatchLimit(compactBatchLimit), + e2e.WithGoFailEnabled(true), + e2e.WithWatchProcessNotifyInterval(100*time.Millisecond), + ) + require.NoError(t, err) + defer clus.Close() + + c1 := newClient(t, clus.EndpointsGRPC(), cfg.Client) + defer c1.Close() + + keyPrefix := "/key-" + for i := 0; i < compactBatchLimit; i++ { + key := fmt.Sprintf("%s%d", keyPrefix, i) + value := fmt.Sprintf("%d", i) + + t.Logf("PUT key=%s, val=%s", key, value) + _, err = c1.KV.Put(ctx, key, value) + require.NoError(t, err) + } + + firstKey := keyPrefix + "0" + t.Logf("DELETE key=%s", firstKey) + deleteResp, err := c1.KV.Delete(ctx, firstKey) + require.NoError(t, err) + + require.NoError(t, clus.Procs[0].Failpoints().SetupHTTP(ctx, "compactBeforeSetFinishedCompact", `panic`)) + + t.Logf("COMPACT rev=%d", deleteResp.Header.Revision) + _, err = c1.KV.Compact(ctx, deleteResp.Header.Revision, clientv3.WithCompactPhysical()) + require.Error(t, err) + + require.NoError(t, clus.Restart(ctx)) + + c2 := newClient(t, clus.EndpointsGRPC(), cfg.Client) + defer c2.Close() + + watchChan := c2.Watch(ctx, firstKey, clientv3.WithRev(deleteResp.Header.Revision)) + select { + case watchResp := <-watchChan: + require.Len(t, watchResp.Events, 1) + + require.Equal(t, mvccpb.DELETE, watchResp.Events[0].Type) + deletedKey := string(watchResp.Events[0].Kv.Key) + require.Equal(t, firstKey, deletedKey) + case <-time.After(100 * time.Millisecond): + // we care only about the first response, but have an + // escape hatch in case the watch response is delayed. + t.Fatal("timed out getting watch response") + } +}