From a1fefff403eab859a9b05e27665e9d2b7366ad7f Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Thu, 7 Apr 2022 17:32:21 +0200 Subject: [PATCH] server: Save consistency index and term to backend even when they decrease Reason to store CI and term in backend was to make db fully independent snapshot, it was never meant to interfere with apply logic. Skip of CI was introduced for v2->v3 migration where we wanted to prevent it from decreasing when replaying wal in https://github.com/etcd-io/etcd/pull/5391. By mistake it was added to apply flow during refactor in https://github.com/etcd-io/etcd/pull/12855#commitcomment-70713670. Consistency index and term should only be negotiated and used by raft to make decisions. Their values should only driven by raft state machine and backend should only be responsible for storing them. --- etcdutl/etcdutl/backup_command.go | 2 +- etcdutl/snapshot/v3_snapshot.go | 2 +- server/etcdserver/cindex/cindex.go | 19 +++------ server/etcdserver/cindex/cindex_test.go | 52 +++++++++++++++++++++++++ 4 files changed, 59 insertions(+), 16 deletions(-) diff --git a/etcdutl/etcdutl/backup_command.go b/etcdutl/etcdutl/backup_command.go index c09bcf14a79..6bebaf920a1 100644 --- a/etcdutl/etcdutl/backup_command.go +++ b/etcdutl/etcdutl/backup_command.go @@ -322,7 +322,7 @@ func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, term uint64, desir tx.Lock() defer tx.Unlock() cindex.UnsafeCreateMetaBucket(tx) - cindex.UnsafeUpdateConsistentIndex(tx, idx, term, false) + cindex.UnsafeUpdateConsistentIndex(tx, idx, term) } else { // Thanks to translateWAL not moving entries, but just replacing them with // 'empty', there is no need to update the consistency index. diff --git a/etcdutl/snapshot/v3_snapshot.go b/etcdutl/snapshot/v3_snapshot.go index 5d517a03f4c..b17cd90aac1 100644 --- a/etcdutl/snapshot/v3_snapshot.go +++ b/etcdutl/snapshot/v3_snapshot.go @@ -476,6 +476,6 @@ func (s *v3Manager) updateCIndex(commit uint64, term uint64) error { be := backend.NewDefaultBackend(s.outDbPath()) defer be.Close() - cindex.UpdateConsistentIndex(be.BatchTx(), commit, term, false) + cindex.UpdateConsistentIndex(be.BatchTx(), commit, term) return nil } diff --git a/server/etcdserver/cindex/cindex.go b/server/etcdserver/cindex/cindex.go index 64b98b6fff3..4978124baa5 100644 --- a/server/etcdserver/cindex/cindex.go +++ b/server/etcdserver/cindex/cindex.go @@ -25,6 +25,7 @@ import ( type Backend interface { BatchTx() backend.BatchTx + ReadTx() backend.ReadTx } // ConsistentIndexer is an interface that wraps the Get/Set/Save method for consistentIndex. @@ -87,7 +88,7 @@ func (ci *consistentIndex) SetConsistentIndex(v uint64, term uint64) { func (ci *consistentIndex) UnsafeSave(tx backend.BatchTx) { index := atomic.LoadUint64(&ci.consistentIndex) term := atomic.LoadUint64(&ci.term) - UnsafeUpdateConsistentIndex(tx, index, term, true) + UnsafeUpdateConsistentIndex(tx, index, term) } func (ci *consistentIndex) SetBackend(be Backend) { @@ -154,22 +155,12 @@ func ReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) { return unsafeReadConsistentIndex(tx) } -func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) { +func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64) { if index == 0 { // Never save 0 as it means that we didn't loaded the real index yet. return } - if onlyGrow { - oldi, oldTerm := unsafeReadConsistentIndex(tx) - if term < oldTerm { - return - } - if term == oldTerm && index <= oldi { - return - } - } - bs1 := make([]byte, 8) binary.BigEndian.PutUint64(bs1, index) // put the index into the underlying backend @@ -182,8 +173,8 @@ func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, } } -func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64, onlyGrow bool) { +func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64) { tx.Lock() defer tx.Unlock() - UnsafeUpdateConsistentIndex(tx, index, term, onlyGrow) + UnsafeUpdateConsistentIndex(tx, index, term) } diff --git a/server/etcdserver/cindex/cindex_test.go b/server/etcdserver/cindex/cindex_test.go index 1e111b9e823..44bd0ef8339 100644 --- a/server/etcdserver/cindex/cindex_test.go +++ b/server/etcdserver/cindex/cindex_test.go @@ -63,6 +63,58 @@ func TestConsistentIndex(t *testing.T) { assert.Equal(t, r, index) } +func TestConsistentIndexDecrease(t *testing.T) { + initIndex := uint64(100) + initTerm := uint64(10) + + tcs := []struct { + name string + index uint64 + term uint64 + }{ + { + name: "Decrease term", + index: initIndex + 1, + term: initTerm - 1, + }, + { + name: "Decrease CI", + index: initIndex - 1, + term: initTerm + 1, + }, + { + name: "Decrease CI and term", + index: initIndex - 1, + term: initTerm - 1, + }, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + be, tmpPath := betesting.NewTmpBackend(t, time.Microsecond, 10) + tx := be.BatchTx() + tx.Lock() + UnsafeCreateMetaBucket(tx) + UnsafeUpdateConsistentIndex(tx, initIndex, initTerm) + tx.Unlock() + be.ForceCommit() + be.Close() + + be = backend.NewDefaultBackend(tmpPath) + defer be.Close() + ci := NewConsistentIndex(be) + ci.SetConsistentIndex(tc.index, tc.term) + tx = be.BatchTx() + tx.Lock() + ci.UnsafeSave(tx) + tx.Unlock() + assert.Equal(t, tc.index, ci.ConsistentIndex()) + + ci = NewConsistentIndex(be) + assert.Equal(t, tc.index, ci.ConsistentIndex()) + }) + } +} + func TestFakeConsistentIndex(t *testing.T) { r := rand.Uint64()