From e95aa128913326e04d818447b5e614618d5150b7 Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Mon, 13 Jan 2025 17:30:40 -0500 Subject: [PATCH] mvcc: restore tombstone index if it's first revision The tombstone could be the only one available revision in database. It happens when all historical revisions have been deleted in previous compactions. Since tombstone revision is still in database, we should restore it as valid key index. Otherwise, we lost that event. Signed-off-by: Wei Fu --- server/storage/mvcc/key_index.go | 8 ++++ server/storage/mvcc/key_index_test.go | 41 +++++++++++++++++ server/storage/mvcc/kv_test.go | 20 ++++++++- server/storage/mvcc/kvstore.go | 4 ++ tests/e2e/watch_test.go | 64 +++++++++++++++++++++++++++ 5 files changed, 135 insertions(+), 2 deletions(-) diff --git a/server/storage/mvcc/key_index.go b/server/storage/mvcc/key_index.go index 27b22fd4899..f5ce818142f 100644 --- a/server/storage/mvcc/key_index.go +++ b/server/storage/mvcc/key_index.go @@ -116,6 +116,14 @@ func (ki *keyIndex) restore(lg *zap.Logger, created, modified Revision, ver int6 keysGauge.Inc() } +// restoreTombstone is only used to build index when there is no previous revision. +// Since there is no historical information, CreateRevision always is empty. +func (ki *keyIndex) restoreTombstone(lg *zap.Logger, main, sub int64) { + ki.restore(lg, Revision{}, Revision{main, sub}, 1) + ki.generations = append(ki.generations, generation{}) + keysGauge.Dec() +} + // tombstone puts a revision, pointing to a tombstone, to the keyIndex. // It also creates a new empty generation in the keyIndex. // It returns ErrRevisionNotFound when tombstone on an empty generation. diff --git a/server/storage/mvcc/key_index_test.go b/server/storage/mvcc/key_index_test.go index f86e027e525..643ba11da99 100644 --- a/server/storage/mvcc/key_index_test.go +++ b/server/storage/mvcc/key_index_test.go @@ -20,10 +20,51 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.uber.org/zap" "go.uber.org/zap/zaptest" ) +func TestRestoreTombstone(t *testing.T) { + lg := zaptest.NewLogger(t) + + // restore from tombstone + // + // key: "foo" + // modified: 16 + // "created": 16 + // generations: + // {empty} + // {{16, 0}(t)[0]} + // + ki := &keyIndex{key: []byte("foo")} + ki.restoreTombstone(lg, 16, 0) + + // get should return not found + for retAt := 16; retAt <= 20; retAt++ { + _, _, _, err := ki.get(lg, int64(retAt)) + require.ErrorIs(t, err, ErrRevisionNotFound) + } + + // doCompact should keep that tombstone + availables := map[Revision]struct{}{} + ki.doCompact(16, availables) + require.Len(t, availables, 1) + _, ok := availables[Revision{Main: 16}] + require.True(t, ok) + + // should be able to put new revisions + ki.put(lg, 17, 0) + ki.put(lg, 18, 0) + revs := ki.since(lg, 16) + require.Equal(t, []Revision{{16, 0}, {17, 0}, {18, 0}}, revs) + + // compaction should remove restored tombstone + ki.compact(lg, 17, map[Revision]struct{}{}) + require.Len(t, ki.generations, 1) + require.Equal(t, []Revision{{17, 0}, {18, 0}}, ki.generations[0].revs) +} + func TestKeyIndexGet(t *testing.T) { // key: "foo" // modified: 16 diff --git a/server/storage/mvcc/kv_test.go b/server/storage/mvcc/kv_test.go index c727b444af7..790b534395b 100644 --- a/server/storage/mvcc/kv_test.go +++ b/server/storage/mvcc/kv_test.go @@ -25,6 +25,7 @@ import ( "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/assert" "go.uber.org/zap/zaptest" "go.etcd.io/etcd/api/v3/mvccpb" @@ -657,6 +658,8 @@ func TestKVHash(t *testing.T) { } func TestKVRestore(t *testing.T) { + compactBatchLimit := 5 + tests := []func(kv KV){ func(kv KV) { kv.Put([]byte("foo"), []byte("bar0"), 1) @@ -674,10 +677,23 @@ func TestKVRestore(t *testing.T) { kv.Put([]byte("foo"), []byte("bar1"), 2) kv.Compact(traceutil.TODO(), 1) }, + func(kv KV) { // after restore, foo1 key only has tombstone revision + kv.Put([]byte("foo1"), []byte("bar1"), 0) + kv.Put([]byte("foo2"), []byte("bar2"), 0) + kv.Put([]byte("foo3"), []byte("bar3"), 0) + kv.Put([]byte("foo4"), []byte("bar4"), 0) + kv.Put([]byte("foo5"), []byte("bar5"), 0) + _, delAtRev := kv.DeleteRange([]byte("foo1"), nil) + assert.Equal(t, int64(7), delAtRev) + + // after compaction and restore, foo1 key only has tombstone revision + ch, _ := kv.Compact(traceutil.TODO(), delAtRev) + <-ch + }, } for i, tt := range tests { b, _ := betesting.NewDefaultTmpBackend(t) - s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{CompactionBatchLimit: compactBatchLimit}) tt(s) var kvss [][]mvccpb.KeyValue for k := int64(0); k < 10; k++ { @@ -689,7 +705,7 @@ func TestKVRestore(t *testing.T) { s.Close() // ns should recover the previous state from backend. - ns := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + ns := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{CompactionBatchLimit: compactBatchLimit}) if keysRestore := readGaugeInt(keysGauge); keysBefore != keysRestore { t.Errorf("#%d: got %d key count, expected %d", i, keysRestore, keysBefore) diff --git a/server/storage/mvcc/kvstore.go b/server/storage/mvcc/kvstore.go index 3e1226c9174..af3b8263c95 100644 --- a/server/storage/mvcc/kvstore.go +++ b/server/storage/mvcc/kvstore.go @@ -477,6 +477,10 @@ func restoreIntoIndex(lg *zap.Logger, idx index) (chan<- revKeyValue, <-chan int ki.restore(lg, Revision{Main: rkv.kv.CreateRevision}, rev, rkv.kv.Version) idx.Insert(ki) kiCache[rkv.kstr] = ki + } else { + ki.restoreTombstone(lg, rev.Main, rev.Sub) + idx.Insert(ki) + kiCache[rkv.kstr] = ki } } }() diff --git a/tests/e2e/watch_test.go b/tests/e2e/watch_test.go index f167b8f8752..9af175d3cd8 100644 --- a/tests/e2e/watch_test.go +++ b/tests/e2e/watch_test.go @@ -488,3 +488,67 @@ func testStartWatcherFromCompactedRevision(t *testing.T, performCompactOnTombsto } } } + +// TestResumeCompactionOnTombstone verifies whether a deletion event is preserved +// when etcd restarts and resumes compaction on a key that only has a tombstone revision. +func TestResumeCompactionOnTombstone(t *testing.T) { + e2e.BeforeTest(t) + + ctx := context.Background() + compactBatchLimit := 5 + + cfg := e2e.DefaultConfig() + clus, err := e2e.NewEtcdProcessCluster(context.Background(), + t, + e2e.WithConfig(cfg), + e2e.WithClusterSize(1), + e2e.WithCompactionBatchLimit(compactBatchLimit), + e2e.WithGoFailEnabled(true), + e2e.WithWatchProcessNotifyInterval(100*time.Millisecond), + ) + require.NoError(t, err) + defer clus.Close() + + c1 := newClient(t, clus.EndpointsGRPC(), cfg.Client) + defer c1.Close() + + keyPrefix := "/key-" + for i := 0; i < compactBatchLimit; i++ { + key := fmt.Sprintf("%s%d", keyPrefix, i) + value := fmt.Sprintf("%d", i) + + t.Logf("PUT key=%s, val=%s", key, value) + _, err = c1.KV.Put(ctx, key, value) + require.NoError(t, err) + } + + firstKey := keyPrefix + "0" + t.Logf("DELETE key=%s", firstKey) + deleteResp, err := c1.KV.Delete(ctx, firstKey) + require.NoError(t, err) + + require.NoError(t, clus.Procs[0].Failpoints().SetupHTTP(ctx, "compactBeforeSetFinishedCompact", `panic`)) + + t.Logf("COMPACT rev=%d", deleteResp.Header.Revision) + _, err = c1.KV.Compact(ctx, deleteResp.Header.Revision, clientv3.WithCompactPhysical()) + require.Error(t, err) + + require.NoError(t, clus.Restart(ctx)) + + c2 := newClient(t, clus.EndpointsGRPC(), cfg.Client) + defer c2.Close() + + watchChan := c2.Watch(ctx, firstKey, clientv3.WithRev(deleteResp.Header.Revision)) + select { + case watchResp := <-watchChan: + require.Len(t, watchResp.Events, 1) + + require.Equal(t, mvccpb.DELETE, watchResp.Events[0].Type) + deletedKey := string(watchResp.Events[0].Kv.Key) + require.Equal(t, firstKey, deletedKey) + case <-time.After(100 * time.Millisecond): + // we care only about the first response, but have an + // escape hatch in case the watch response is delayed. + t.Fatal("timed out getting watch response") + } +}