From ab83df52ae57bee08790363937831c093b912b38 Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Mon, 13 Jan 2025 17:30:40 -0500 Subject: [PATCH] mvcc: restore tombstone index if it's first revision The tombstone could be the only one available revision in database. It happens when all historical revisions have been deleted in previous compactions. Since tombstone revision is still in database, we should restore it as valid key index. Otherwise, we lost that event. Signed-off-by: Wei Fu --- server/storage/mvcc/key_index.go | 9 ++++ server/storage/mvcc/key_index_test.go | 41 +++++++++++++++++ server/storage/mvcc/kv_test.go | 20 ++++++++- server/storage/mvcc/kvstore.go | 8 +++- tests/e2e/watch_test.go | 64 +++++++++++++++++++++++++++ 5 files changed, 138 insertions(+), 4 deletions(-) diff --git a/server/storage/mvcc/key_index.go b/server/storage/mvcc/key_index.go index 27b22fd4899..14035197417 100644 --- a/server/storage/mvcc/key_index.go +++ b/server/storage/mvcc/key_index.go @@ -116,6 +116,15 @@ func (ki *keyIndex) restore(lg *zap.Logger, created, modified Revision, ver int6 keysGauge.Inc() } +// restoreTombstone is used to restore a tombstone revision, which is the only +// revision so far for a key. We don't know the creating revision (i.e. already +// compacted) of the key, so set it empty. +func (ki *keyIndex) restoreTombstone(lg *zap.Logger, main, sub int64) { + ki.restore(lg, Revision{}, Revision{main, sub}, 1) + ki.generations = append(ki.generations, generation{}) + keysGauge.Dec() +} + // tombstone puts a revision, pointing to a tombstone, to the keyIndex. // It also creates a new empty generation in the keyIndex. // It returns ErrRevisionNotFound when tombstone on an empty generation. diff --git a/server/storage/mvcc/key_index_test.go b/server/storage/mvcc/key_index_test.go index f86e027e525..643ba11da99 100644 --- a/server/storage/mvcc/key_index_test.go +++ b/server/storage/mvcc/key_index_test.go @@ -20,10 +20,51 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.uber.org/zap" "go.uber.org/zap/zaptest" ) +func TestRestoreTombstone(t *testing.T) { + lg := zaptest.NewLogger(t) + + // restore from tombstone + // + // key: "foo" + // modified: 16 + // "created": 16 + // generations: + // {empty} + // {{16, 0}(t)[0]} + // + ki := &keyIndex{key: []byte("foo")} + ki.restoreTombstone(lg, 16, 0) + + // get should return not found + for retAt := 16; retAt <= 20; retAt++ { + _, _, _, err := ki.get(lg, int64(retAt)) + require.ErrorIs(t, err, ErrRevisionNotFound) + } + + // doCompact should keep that tombstone + availables := map[Revision]struct{}{} + ki.doCompact(16, availables) + require.Len(t, availables, 1) + _, ok := availables[Revision{Main: 16}] + require.True(t, ok) + + // should be able to put new revisions + ki.put(lg, 17, 0) + ki.put(lg, 18, 0) + revs := ki.since(lg, 16) + require.Equal(t, []Revision{{16, 0}, {17, 0}, {18, 0}}, revs) + + // compaction should remove restored tombstone + ki.compact(lg, 17, map[Revision]struct{}{}) + require.Len(t, ki.generations, 1) + require.Equal(t, []Revision{{17, 0}, {18, 0}}, ki.generations[0].revs) +} + func TestKeyIndexGet(t *testing.T) { // key: "foo" // modified: 16 diff --git a/server/storage/mvcc/kv_test.go b/server/storage/mvcc/kv_test.go index c727b444af7..790b534395b 100644 --- a/server/storage/mvcc/kv_test.go +++ b/server/storage/mvcc/kv_test.go @@ -25,6 +25,7 @@ import ( "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" + "github.com/stretchr/testify/assert" "go.uber.org/zap/zaptest" "go.etcd.io/etcd/api/v3/mvccpb" @@ -657,6 +658,8 @@ func TestKVHash(t *testing.T) { } func TestKVRestore(t *testing.T) { + compactBatchLimit := 5 + tests := []func(kv KV){ func(kv KV) { kv.Put([]byte("foo"), []byte("bar0"), 1) @@ -674,10 +677,23 @@ func TestKVRestore(t *testing.T) { kv.Put([]byte("foo"), []byte("bar1"), 2) kv.Compact(traceutil.TODO(), 1) }, + func(kv KV) { // after restore, foo1 key only has tombstone revision + kv.Put([]byte("foo1"), []byte("bar1"), 0) + kv.Put([]byte("foo2"), []byte("bar2"), 0) + kv.Put([]byte("foo3"), []byte("bar3"), 0) + kv.Put([]byte("foo4"), []byte("bar4"), 0) + kv.Put([]byte("foo5"), []byte("bar5"), 0) + _, delAtRev := kv.DeleteRange([]byte("foo1"), nil) + assert.Equal(t, int64(7), delAtRev) + + // after compaction and restore, foo1 key only has tombstone revision + ch, _ := kv.Compact(traceutil.TODO(), delAtRev) + <-ch + }, } for i, tt := range tests { b, _ := betesting.NewDefaultTmpBackend(t) - s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{CompactionBatchLimit: compactBatchLimit}) tt(s) var kvss [][]mvccpb.KeyValue for k := int64(0); k < 10; k++ { @@ -689,7 +705,7 @@ func TestKVRestore(t *testing.T) { s.Close() // ns should recover the previous state from backend. - ns := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + ns := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{CompactionBatchLimit: compactBatchLimit}) if keysRestore := readGaugeInt(keysGauge); keysBefore != keysRestore { t.Errorf("#%d: got %d key count, expected %d", i, keysRestore, keysBefore) diff --git a/server/storage/mvcc/kvstore.go b/server/storage/mvcc/kvstore.go index 3e1226c9174..44c2a625204 100644 --- a/server/storage/mvcc/kvstore.go +++ b/server/storage/mvcc/kvstore.go @@ -473,8 +473,12 @@ func restoreIntoIndex(lg *zap.Logger, idx index) (chan<- revKeyValue, <-chan int continue } ki.put(lg, rev.Main, rev.Sub) - } else if !isTombstone(rkv.key) { - ki.restore(lg, Revision{Main: rkv.kv.CreateRevision}, rev, rkv.kv.Version) + } else { + if isTombstone(rkv.key) { + ki.restoreTombstone(lg, rev.Main, rev.Sub) + } else { + ki.restore(lg, Revision{Main: rkv.kv.CreateRevision}, rev, rkv.kv.Version) + } idx.Insert(ki) kiCache[rkv.kstr] = ki } diff --git a/tests/e2e/watch_test.go b/tests/e2e/watch_test.go index f167b8f8752..9af175d3cd8 100644 --- a/tests/e2e/watch_test.go +++ b/tests/e2e/watch_test.go @@ -488,3 +488,67 @@ func testStartWatcherFromCompactedRevision(t *testing.T, performCompactOnTombsto } } } + +// TestResumeCompactionOnTombstone verifies whether a deletion event is preserved +// when etcd restarts and resumes compaction on a key that only has a tombstone revision. +func TestResumeCompactionOnTombstone(t *testing.T) { + e2e.BeforeTest(t) + + ctx := context.Background() + compactBatchLimit := 5 + + cfg := e2e.DefaultConfig() + clus, err := e2e.NewEtcdProcessCluster(context.Background(), + t, + e2e.WithConfig(cfg), + e2e.WithClusterSize(1), + e2e.WithCompactionBatchLimit(compactBatchLimit), + e2e.WithGoFailEnabled(true), + e2e.WithWatchProcessNotifyInterval(100*time.Millisecond), + ) + require.NoError(t, err) + defer clus.Close() + + c1 := newClient(t, clus.EndpointsGRPC(), cfg.Client) + defer c1.Close() + + keyPrefix := "/key-" + for i := 0; i < compactBatchLimit; i++ { + key := fmt.Sprintf("%s%d", keyPrefix, i) + value := fmt.Sprintf("%d", i) + + t.Logf("PUT key=%s, val=%s", key, value) + _, err = c1.KV.Put(ctx, key, value) + require.NoError(t, err) + } + + firstKey := keyPrefix + "0" + t.Logf("DELETE key=%s", firstKey) + deleteResp, err := c1.KV.Delete(ctx, firstKey) + require.NoError(t, err) + + require.NoError(t, clus.Procs[0].Failpoints().SetupHTTP(ctx, "compactBeforeSetFinishedCompact", `panic`)) + + t.Logf("COMPACT rev=%d", deleteResp.Header.Revision) + _, err = c1.KV.Compact(ctx, deleteResp.Header.Revision, clientv3.WithCompactPhysical()) + require.Error(t, err) + + require.NoError(t, clus.Restart(ctx)) + + c2 := newClient(t, clus.EndpointsGRPC(), cfg.Client) + defer c2.Close() + + watchChan := c2.Watch(ctx, firstKey, clientv3.WithRev(deleteResp.Header.Revision)) + select { + case watchResp := <-watchChan: + require.Len(t, watchResp.Events, 1) + + require.Equal(t, mvccpb.DELETE, watchResp.Events[0].Type) + deletedKey := string(watchResp.Events[0].Kv.Key) + require.Equal(t, firstKey, deletedKey) + case <-time.After(100 * time.Millisecond): + // we care only about the first response, but have an + // escape hatch in case the watch response is delayed. + t.Fatal("timed out getting watch response") + } +}