Skip to content

Commit

Permalink
partially rollback the PR etcd-io#12855
Browse files Browse the repository at this point in the history
  • Loading branch information
ahrtr committed Apr 1, 2022
1 parent e4d34f2 commit 42c4392
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 7 deletions.
2 changes: 1 addition & 1 deletion server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
CompactionBatchLimit: cfg.CompactionBatchLimit,
CompactionSleepInterval: cfg.CompactionSleepInterval,
}
srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig)
srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, srv.consistIndex, mvccStoreConfig)

srv.authStore = auth.NewAuthStore(srv.Logger(), schema.NewAuthBackend(srv.Logger(), srv.be), tp, int(cfg.BcryptCost))

Expand Down
1 change: 1 addition & 0 deletions server/storage/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ type backend struct {
stopc chan struct{}
donec chan struct{}

// todo: remove te hooks
hooks Hooks

lg *zap.Logger
Expand Down
1 change: 1 addition & 0 deletions server/storage/backend/batch_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ func (t *batchTxBuffered) CommitAndStop() {
}

func (t *batchTxBuffered) commit(stop bool) {
// (todo): remove the OnPreCommitUnsafe
if t.backend.hooks != nil {
t.backend.hooks.OnPreCommitUnsafe(t)
}
Expand Down
30 changes: 28 additions & 2 deletions server/storage/mvcc/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/pkg/v3/schedule"
"go.etcd.io/etcd/pkg/v3/traceutil"
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
"go.etcd.io/etcd/server/v3/lease"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/schema"
Expand Down Expand Up @@ -65,6 +66,8 @@ type store struct {
// mu read locks for txns and write locks for non-txn store changes.
mu sync.RWMutex

ci cindex.ConsistentIndexer

b backend.Backend
kvindex index

Expand All @@ -88,7 +91,7 @@ type store struct {

// NewStore returns a new store. It is useful to create a store inside
// mvcc pkg. It should only be used for testing externally.
func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *store {
func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ci cindex.ConsistentIndexer, cfg StoreConfig) *store {
if lg == nil {
lg = zap.NewNop()
}
Expand All @@ -101,6 +104,7 @@ func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfi
s := &store{
cfg: cfg,
b: b,
ci: ci,
kvindex: newTreeIndex(lg),

le: le,
Expand Down Expand Up @@ -293,6 +297,12 @@ func (s *store) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, err
func (s *store) Commit() {
s.mu.Lock()
defer s.mu.Unlock()

tx := s.b.BatchTx()
tx.Lock()
s.saveIndex(tx)
tx.Unlock()

s.b.ForceCommit()
}

Expand All @@ -316,6 +326,7 @@ func (s *store) Restore(b backend.Backend) error {

s.fifoSched = schedule.NewFIFOScheduler()
s.stopc = make(chan struct{})
s.ci.SetBackend(b)

return s.restore()
}
Expand Down Expand Up @@ -402,7 +413,9 @@ func (s *store) restore() error {

tx.Unlock()

s.lg.Info("kvstore restored", zap.Int64("current-rev", s.currentRev))
s.lg.Info("kvstore restored",
zap.Uint64("consistent-index", s.ConsistentIndex()),
zap.Int64("current-rev", s.currentRev))

if scheduledCompact != 0 {
if _, err := s.compactLockfree(scheduledCompact); err != nil {
Expand Down Expand Up @@ -495,6 +508,19 @@ func (s *store) Close() error {
return nil
}

func (s *store) saveIndex(tx backend.BatchTx) {
if s.ci != nil {
s.ci.UnsafeSave(tx)
}
}

func (s *store) ConsistentIndex() uint64 {
if s.ci != nil {
return s.ci.ConsistentIndex()
}
return 0
}

func (s *store) setupMetricsReporter() {
b := s.b
reportDbTotalSizeInBytesMu.Lock()
Expand Down
1 change: 1 addition & 0 deletions server/storage/mvcc/kvstore_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ func (tw *storeTxnWrite) End() {
// only update index if the txn modifies the mvcc state.
if len(tw.changes) != 0 {
// hold revMu lock to prevent new read txns from opening until writeback.
tw.s.saveIndex(tw.tx)
tw.s.revMu.Lock()
tw.s.currentRev++
}
Expand Down
9 changes: 5 additions & 4 deletions server/storage/mvcc/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/pkg/v3/traceutil"
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
"go.etcd.io/etcd/server/v3/lease"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/schema"
Expand Down Expand Up @@ -70,16 +71,16 @@ type watchableStore struct {
// cancel operations.
type cancelFunc func()

func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) WatchableKV {
return newWatchableStore(lg, b, le, cfg)
func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, ci cindex.ConsistentIndexer, cfg StoreConfig) WatchableKV {
return newWatchableStore(lg, b, le, ci, cfg)
}

func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *watchableStore {
func newWatchableStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, ci cindex.ConsistentIndexer, cfg StoreConfig) *watchableStore {
if lg == nil {
lg = zap.NewNop()
}
s := &watchableStore{
store: NewStore(lg, b, le, cfg),
store: NewStore(lg, b, le, ci, cfg),
victimc: make(chan struct{}, 1),
unsynced: newWatcherGroup(),
synced: newWatcherGroup(),
Expand Down

0 comments on commit 42c4392

Please sign in to comment.