Skip to content

Commit

Permalink
Simplify KVStore interaction with cindex thanks to hooks.
Browse files Browse the repository at this point in the history
  • Loading branch information
ptabor committed Apr 29, 2021
1 parent af0409c commit 444ab94
Show file tree
Hide file tree
Showing 10 changed files with 23 additions and 57 deletions.
2 changes: 1 addition & 1 deletion server/etcdserver/api/v3rpc/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
)

type KVGetter interface {
KV() mvcc.ConsistentWatchableKV
KV() mvcc.WatchableKV
}

type BackendGetter interface {
Expand Down
7 changes: 4 additions & 3 deletions server/etcdserver/cindex/cindex.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ func (ci *consistentIndex) ConsistentIndex() uint64 {
}
ci.mutex.Lock()
defer ci.mutex.Unlock()

v := ReadConsistentIndex(ci.tx)
atomic.StoreUint64(&ci.consistentIndex, v)
return v
}

Expand All @@ -76,7 +78,6 @@ func (ci *consistentIndex) SetConsistentIndex(v uint64) {

func (ci *consistentIndex) UnsafeSave(tx backend.BatchTx) {
index := atomic.LoadUint64(&ci.consistentIndex)

if index == 0 {
// Never save 0 as it means that we didn't loaded the real index yet.
return
Expand All @@ -102,8 +103,8 @@ func (f *fakeConsistentIndex) SetConsistentIndex(index uint64) {
atomic.StoreUint64(&f.index, index)
}

func (f *fakeConsistentIndex) UnsafeSave(tx backend.BatchTx) {}
func (f *fakeConsistentIndex) SetBatchTx(tx backend.BatchTx) {}
func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {}
func (f *fakeConsistentIndex) SetBatchTx(_ backend.BatchTx) {}

func UnsafeCreateMetaBucket(tx backend.BatchTx) {
tx.UnsafeCreateBucket(MetaBucketName)
Expand Down
1 change: 1 addition & 0 deletions server/etcdserver/cindex/cindex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func TestConsistentIndex(t *testing.T) {
t.Fatal("batch tx is nil")
}
tx.Lock()

UnsafeCreateMetaBucket(tx)
tx.Unlock()
be.ForceCommit()
Expand Down
7 changes: 3 additions & 4 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ type EtcdServer struct {
applyV3Internal applierV3Internal
applyWait wait.WaitTime

kv mvcc.ConsistentWatchableKV
kv mvcc.WatchableKV
lessor lease.Lessor
bemu sync.Mutex
be backend.Backend
Expand Down Expand Up @@ -1218,8 +1218,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
lg.Panic("failed to restore mvcc store", zap.Error(err))
}

s.consistIndex.SetConsistentIndex(s.kv.ConsistentIndex())
lg.Info("restored mvcc store")
lg.Info("restored mvcc store", zap.Uint64("consistent-index", s.consistIndex.ConsistentIndex()))

// Closing old backend might block until all the txns
// on the backend are finished.
Expand Down Expand Up @@ -2530,7 +2529,7 @@ func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error {
}
}

func (s *EtcdServer) KV() mvcc.ConsistentWatchableKV { return s.kv }
func (s *EtcdServer) KV() mvcc.WatchableKV { return s.kv }
func (s *EtcdServer) Backend() backend.Backend {
s.bemu.Lock()
defer s.bemu.Unlock()
Expand Down
11 changes: 2 additions & 9 deletions server/lease/lessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) {
}

le.leaseMap[id] = l
l.persistTo(le.b, le.ci)
l.persistTo(le.b)

leaseTotalTTLs.Observe(float64(l.ttl))
leaseGranted.Inc()
Expand Down Expand Up @@ -341,10 +341,6 @@ func (le *lessor) Revoke(id LeaseID) error {
// kv deletion. Or we might end up with not executing the revoke or not
// deleting the keys if etcdserver fails in between.
le.b.BatchTx().UnsafeDelete(leaseBucketName, int64ToBytes(int64(l.ID)))
// if len(keys) > 0, txn.End() will call ci.UnsafeSave function.
if le.ci != nil && len(keys) == 0 {
le.ci.UnsafeSave(le.b.BatchTx())
}

txn.End()

Expand Down Expand Up @@ -828,7 +824,7 @@ func (l *Lease) expired() bool {
return l.Remaining() <= 0
}

func (l *Lease) persistTo(b backend.Backend, ci cindex.ConsistentIndexer) {
func (l *Lease) persistTo(b backend.Backend) {
key := int64ToBytes(int64(l.ID))

lpb := leasepb.Lease{ID: int64(l.ID), TTL: l.ttl, RemainingTTL: l.remainingTTL}
Expand All @@ -839,9 +835,6 @@ func (l *Lease) persistTo(b backend.Backend, ci cindex.ConsistentIndexer) {

b.BatchTx().Lock()
b.BatchTx().UnsafePut(leaseBucketName, key, val)
if ci != nil {
ci.UnsafeSave(b.BatchTx())
}
b.BatchTx().Unlock()
}

Expand Down
11 changes: 0 additions & 11 deletions server/mvcc/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,3 @@ type Watchable interface {
// watch events happened or happening on the KV.
NewWatchStream() WatchStream
}

// ConsistentWatchableKV is a WatchableKV that understands the consistency
// algorithm and consistent index.
// If the consistent index of executing entry is not larger than the
// consistent index of ConsistentWatchableKV, all operations in
// this entry are skipped and return empty response.
type ConsistentWatchableKV interface {
WatchableKV
// ConsistentIndex returns the current consistent index of the KV.
ConsistentIndex() uint64
}
22 changes: 1 addition & 21 deletions server/mvcc/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,11 +314,6 @@ func init() {
func (s *store) Commit() {
s.mu.Lock()
defer s.mu.Unlock()

tx := s.b.BatchTx()
tx.Lock()
s.saveIndex(tx)
tx.Unlock()
s.b.ForceCommit()
}

Expand Down Expand Up @@ -436,9 +431,7 @@ func (s *store) restore() error {

tx.Unlock()

s.lg.Info("kvstore restored",
zap.Uint64("consistent-index", s.ConsistentIndex()),
zap.Int64("current-rev", s.currentRev))
s.lg.Info("kvstore restored", zap.Int64("current-rev", s.currentRev))

if scheduledCompact != 0 {
if _, err := s.compactLockfree(scheduledCompact); err != nil {
Expand Down Expand Up @@ -533,19 +526,6 @@ func (s *store) Close() error {
return nil
}

func (s *store) saveIndex(tx backend.BatchTx) {
if s.ci != nil {
s.ci.UnsafeSave(tx)
}
}

func (s *store) ConsistentIndex() uint64 {
if s.ci != nil {
return s.ci.ConsistentIndex()
}
return 0
}

func (s *store) setupMetricsReporter() {
b := s.b
reportDbTotalSizeInBytesMu.Lock()
Expand Down
16 changes: 10 additions & 6 deletions server/mvcc/kvstore_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,23 @@ func benchmarkStoreRange(b *testing.B, n int) {
}

func BenchmarkConsistentIndex(b *testing.B) {
be, tmpPath := betesting.NewDefaultTmpBackend(b)
s := NewStore(zap.NewExample(), be, &lease.FakeLessor{}, cindex.NewConsistentIndex(be.BatchTx()), StoreConfig{})
defer cleanup(s, be, tmpPath)
be, _ := betesting.NewDefaultTmpBackend(b)
ci := cindex.NewConsistentIndex(be.BatchTx())
defer betesting.Close(b, be)

// This will force the index to be reread from scratch on each call.
ci.SetConsistentIndex(0)

tx := s.b.BatchTx()
tx := be.BatchTx()
tx.Lock()
s.saveIndex(tx)
cindex.UnsafeCreateMetaBucket(tx)
ci.UnsafeSave(tx)
tx.Unlock()

b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
s.ConsistentIndex()
ci.ConsistentIndex()
}
}

Expand Down
1 change: 0 additions & 1 deletion server/mvcc/kvstore_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ func (tw *storeTxnWrite) Put(key, value []byte, lease lease.LeaseID) int64 {
func (tw *storeTxnWrite) End() {
// only update index if the txn modifies the mvcc state.
if len(tw.changes) != 0 {
tw.s.saveIndex(tw.tx)
// hold revMu lock to prevent new read txns from opening until writeback.
tw.s.revMu.Lock()
tw.s.currentRev++
Expand Down
2 changes: 1 addition & 1 deletion server/mvcc/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type watchableStore struct {
// cancel operations.
type cancelFunc func()

func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, ci cindex.ConsistentIndexer, cfg StoreConfig) ConsistentWatchableKV {
func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, ci cindex.ConsistentIndexer, cfg StoreConfig) WatchableKV {
return newWatchableStore(lg, b, le, ci, cfg)
}

Expand Down

0 comments on commit 444ab94

Please sign in to comment.