diff --git a/server/etcdserver/api/v3rpc/maintenance.go b/server/etcdserver/api/v3rpc/maintenance.go index dcacbf4978cf..38cc91371630 100644 --- a/server/etcdserver/api/v3rpc/maintenance.go +++ b/server/etcdserver/api/v3rpc/maintenance.go @@ -34,7 +34,7 @@ import ( ) type KVGetter interface { - KV() mvcc.ConsistentWatchableKV + KV() mvcc.WatchableKV } type BackendGetter interface { diff --git a/server/etcdserver/cindex/cindex.go b/server/etcdserver/cindex/cindex.go index 73e96fd70c43..83f329615c13 100644 --- a/server/etcdserver/cindex/cindex.go +++ b/server/etcdserver/cindex/cindex.go @@ -66,7 +66,9 @@ func (ci *consistentIndex) ConsistentIndex() uint64 { } ci.mutex.Lock() defer ci.mutex.Unlock() + v := ReadConsistentIndex(ci.tx) + atomic.StoreUint64(&ci.consistentIndex, v) return v } @@ -76,7 +78,6 @@ func (ci *consistentIndex) SetConsistentIndex(v uint64) { func (ci *consistentIndex) UnsafeSave(tx backend.BatchTx) { index := atomic.LoadUint64(&ci.consistentIndex) - if index == 0 { // Never save 0 as it means that we didn't loaded the real index yet. return @@ -102,8 +103,8 @@ func (f *fakeConsistentIndex) SetConsistentIndex(index uint64) { atomic.StoreUint64(&f.index, index) } -func (f *fakeConsistentIndex) UnsafeSave(tx backend.BatchTx) {} -func (f *fakeConsistentIndex) SetBatchTx(tx backend.BatchTx) {} +func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {} +func (f *fakeConsistentIndex) SetBatchTx(_ backend.BatchTx) {} func UnsafeCreateMetaBucket(tx backend.BatchTx) { tx.UnsafeCreateBucket(MetaBucketName) diff --git a/server/etcdserver/cindex/cindex_test.go b/server/etcdserver/cindex/cindex_test.go index eb577b8fd643..58556a2b61af 100644 --- a/server/etcdserver/cindex/cindex_test.go +++ b/server/etcdserver/cindex/cindex_test.go @@ -34,6 +34,7 @@ func TestConsistentIndex(t *testing.T) { t.Fatal("batch tx is nil") } tx.Lock() + UnsafeCreateMetaBucket(tx) tx.Unlock() be.ForceCommit() diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 922dc8813723..c7dcd59af4a8 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -256,7 +256,7 @@ type EtcdServer struct { applyV3Internal applierV3Internal applyWait wait.WaitTime - kv mvcc.ConsistentWatchableKV + kv mvcc.WatchableKV lessor lease.Lessor bemu sync.Mutex be backend.Backend @@ -1218,8 +1218,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { lg.Panic("failed to restore mvcc store", zap.Error(err)) } - s.consistIndex.SetConsistentIndex(s.kv.ConsistentIndex()) - lg.Info("restored mvcc store") + lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex())) // Closing old backend might block until all the txns // on the backend are finished. @@ -2530,7 +2529,7 @@ func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error { } } -func (s *EtcdServer) KV() mvcc.ConsistentWatchableKV { return s.kv } +func (s *EtcdServer) KV() mvcc.WatchableKV { return s.kv } func (s *EtcdServer) Backend() backend.Backend { s.bemu.Lock() defer s.bemu.Unlock() diff --git a/server/lease/lessor.go b/server/lease/lessor.go index a12591e46ef1..7a1544e9395c 100644 --- a/server/lease/lessor.go +++ b/server/lease/lessor.go @@ -294,7 +294,7 @@ func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) { } le.leaseMap[id] = l - l.persistTo(le.b, le.ci) + l.persistTo(le.b) leaseTotalTTLs.Observe(float64(l.ttl)) leaseGranted.Inc() @@ -341,10 +341,6 @@ func (le *lessor) Revoke(id LeaseID) error { // kv deletion. Or we might end up with not executing the revoke or not // deleting the keys if etcdserver fails in between. le.b.BatchTx().UnsafeDelete(leaseBucketName, int64ToBytes(int64(l.ID))) - // if len(keys) > 0, txn.End() will call ci.UnsafeSave function. - if le.ci != nil && len(keys) == 0 { - le.ci.UnsafeSave(le.b.BatchTx()) - } txn.End() @@ -828,7 +824,7 @@ func (l *Lease) expired() bool { return l.Remaining() <= 0 } -func (l *Lease) persistTo(b backend.Backend, ci cindex.ConsistentIndexer) { +func (l *Lease) persistTo(b backend.Backend) { key := int64ToBytes(int64(l.ID)) lpb := leasepb.Lease{ID: int64(l.ID), TTL: l.ttl, RemainingTTL: l.remainingTTL} @@ -839,9 +835,6 @@ func (l *Lease) persistTo(b backend.Backend, ci cindex.ConsistentIndexer) { b.BatchTx().Lock() b.BatchTx().UnsafePut(leaseBucketName, key, val) - if ci != nil { - ci.UnsafeSave(b.BatchTx()) - } b.BatchTx().Unlock() } diff --git a/server/mvcc/kv.go b/server/mvcc/kv.go index b8cd982da6eb..15ad263288a9 100644 --- a/server/mvcc/kv.go +++ b/server/mvcc/kv.go @@ -139,14 +139,3 @@ type Watchable interface { // watch events happened or happening on the KV. NewWatchStream() WatchStream } - -// ConsistentWatchableKV is a WatchableKV that understands the consistency -// algorithm and consistent index. -// If the consistent index of executing entry is not larger than the -// consistent index of ConsistentWatchableKV, all operations in -// this entry are skipped and return empty response. -type ConsistentWatchableKV interface { - WatchableKV - // ConsistentIndex returns the current consistent index of the KV. - ConsistentIndex() uint64 -} diff --git a/server/mvcc/kvstore.go b/server/mvcc/kvstore.go index ac79b07b1c5c..95df281a4c8d 100644 --- a/server/mvcc/kvstore.go +++ b/server/mvcc/kvstore.go @@ -314,11 +314,6 @@ func init() { func (s *store) Commit() { s.mu.Lock() defer s.mu.Unlock() - - tx := s.b.BatchTx() - tx.Lock() - s.saveIndex(tx) - tx.Unlock() s.b.ForceCommit() } @@ -436,9 +431,7 @@ func (s *store) restore() error { tx.Unlock() - s.lg.Info("kvstore restored", - zap.Uint64("consistent-index", s.ConsistentIndex()), - zap.Int64("current-rev", s.currentRev)) + s.lg.Info("kvstore restored", zap.Int64("current-rev", s.currentRev)) if scheduledCompact != 0 { if _, err := s.compactLockfree(scheduledCompact); err != nil { @@ -533,19 +526,6 @@ func (s *store) Close() error { return nil } -func (s *store) saveIndex(tx backend.BatchTx) { - if s.ci != nil { - s.ci.UnsafeSave(tx) - } -} - -func (s *store) ConsistentIndex() uint64 { - if s.ci != nil { - return s.ci.ConsistentIndex() - } - return 0 -} - func (s *store) setupMetricsReporter() { b := s.b reportDbTotalSizeInBytesMu.Lock() diff --git a/server/mvcc/kvstore_bench_test.go b/server/mvcc/kvstore_bench_test.go index 910bacf30502..aeeddbc41717 100644 --- a/server/mvcc/kvstore_bench_test.go +++ b/server/mvcc/kvstore_bench_test.go @@ -73,19 +73,23 @@ func benchmarkStoreRange(b *testing.B, n int) { } func BenchmarkConsistentIndex(b *testing.B) { - be, tmpPath := betesting.NewDefaultTmpBackend(b) - s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, cindex.NewConsistentIndex(be.BatchTx()), StoreConfig{}) - defer cleanup(s, be, tmpPath) + be, _ := betesting.NewDefaultTmpBackend(b) + ci := cindex.NewConsistentIndex(be.BatchTx()) + defer betesting.Close(b, be) + + // This will force the index to be reread from scratch on each call. + ci.SetConsistentIndex(0) - tx := s.b.BatchTx() + tx := be.BatchTx() tx.Lock() - s.saveIndex(tx) + cindex.UnsafeCreateMetaBucket(tx) + ci.UnsafeSave(tx) tx.Unlock() b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - s.ConsistentIndex() + ci.ConsistentIndex() } } diff --git a/server/mvcc/kvstore_txn.go b/server/mvcc/kvstore_txn.go index 870c710ec242..155a13d0730b 100644 --- a/server/mvcc/kvstore_txn.go +++ b/server/mvcc/kvstore_txn.go @@ -104,7 +104,6 @@ func (tw *storeTxnWrite) Put(key, value []byte, lease lease.LeaseID) int64 { func (tw *storeTxnWrite) End() { // only update index if the txn modifies the mvcc state. if len(tw.changes) != 0 { - tw.s.saveIndex(tw.tx) // hold revMu lock to prevent new read txns from opening until writeback. tw.s.revMu.Lock() tw.s.currentRev++ diff --git a/server/mvcc/watchable_store.go b/server/mvcc/watchable_store.go index ff5bdb78492e..def2abda54b4 100644 --- a/server/mvcc/watchable_store.go +++ b/server/mvcc/watchable_store.go @@ -70,7 +70,7 @@ type watchableStore struct { // cancel operations. type cancelFunc func() -func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, ci cindex.ConsistentIndexer, cfg StoreConfig) ConsistentWatchableKV { +func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, ci cindex.ConsistentIndexer, cfg StoreConfig) WatchableKV { return newWatchableStore(lg, b, le, ci, cfg) }