diff --git a/server/etcdserver/backend.go b/server/etcdserver/backend.go index 346ab43037a7..8303994cabeb 100644 --- a/server/etcdserver/backend.go +++ b/server/etcdserver/backend.go @@ -28,7 +28,7 @@ import ( "go.uber.org/zap" ) -func newBackend(cfg config.ServerConfig) backend.Backend { +func newBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend { bcfg := backend.DefaultBackendConfig() bcfg.Path = cfg.BackendPath() bcfg.UnsafeNoFsync = cfg.UnsafeNoFsync @@ -50,11 +50,12 @@ func newBackend(cfg config.ServerConfig) backend.Backend { // permit 10% excess over quota for disarm bcfg.MmapSize = uint64(cfg.QuotaBackendBytes + cfg.QuotaBackendBytes/10) } + bcfg.Hooks = hooks return backend.New(bcfg) } // openSnapshotBackend renames a snapshot db to the current etcd db and opens it. -func openSnapshotBackend(cfg config.ServerConfig, ss *snap.Snapshotter, snapshot raftpb.Snapshot) (backend.Backend, error) { +func openSnapshotBackend(cfg config.ServerConfig, ss *snap.Snapshotter, snapshot raftpb.Snapshot, hooks backend.Hooks) (backend.Backend, error) { snapPath, err := ss.DBFilePath(snapshot.Metadata.Index) if err != nil { return nil, fmt.Errorf("failed to find database snapshot file (%v)", err) @@ -62,16 +63,16 @@ func openSnapshotBackend(cfg config.ServerConfig, ss *snap.Snapshotter, snapshot if err := os.Rename(snapPath, cfg.BackendPath()); err != nil { return nil, fmt.Errorf("failed to rename database snapshot file (%v)", err) } - return openBackend(cfg), nil + return openBackend(cfg, hooks), nil } // openBackend returns a backend using the current etcd db. -func openBackend(cfg config.ServerConfig) backend.Backend { +func openBackend(cfg config.ServerConfig, hooks backend.Hooks) backend.Backend { fn := cfg.BackendPath() now, beOpened := time.Now(), make(chan backend.Backend) go func() { - beOpened <- newBackend(cfg) + beOpened <- newBackend(cfg, hooks) }() select { @@ -94,7 +95,7 @@ func openBackend(cfg config.ServerConfig) backend.Backend { // before updating the backend db after persisting raft snapshot to disk, // violating the invariant snapshot.Metadata.Index < db.consistentIndex. In this // case, replace the db with the snapshot db sent by the leader. -func recoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool) (backend.Backend, error) { +func recoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot, beExist bool, hooks backend.Hooks) (backend.Backend, error) { consistentIndex := uint64(0) if beExist { ci := cindex.NewConsistentIndex(oldbe.BatchTx()) @@ -104,5 +105,5 @@ func recoverSnapshotBackend(cfg config.ServerConfig, oldbe backend.Backend, snap return oldbe, nil } oldbe.Close() - return openSnapshotBackend(cfg, snap.New(cfg.Logger, cfg.SnapDir()), snapshot) + return openSnapshotBackend(cfg, snap.New(cfg.Logger, cfg.SnapDir()), snapshot, hooks) } diff --git a/server/etcdserver/cindex/cindex.go b/server/etcdserver/cindex/cindex.go index e47e186e4834..4e2f02140db7 100644 --- a/server/etcdserver/cindex/cindex.go +++ b/server/etcdserver/cindex/cindex.go @@ -52,14 +52,11 @@ type consistentIndex struct { // it caches the "consistent_index" key's value. Accessed // through atomics so must be 64-bit aligned. consistentIndex uint64 - // bytesBuf8 is a byte slice of length 8 - // to avoid a repetitive allocation in saveIndex. - bytesBuf8 []byte - mutex sync.Mutex + mutex sync.Mutex } func NewConsistentIndex(tx backend.BatchTx) ConsistentIndexer { - return &consistentIndex{tx: tx, bytesBuf8: make([]byte, 8)} + return &consistentIndex{tx: tx} } func (ci *consistentIndex) ConsistentIndex() uint64 { @@ -85,11 +82,14 @@ func (ci *consistentIndex) SetConsistentIndex(v uint64) { } func (ci *consistentIndex) UnsafeSave(tx backend.BatchTx) { - bs := ci.bytesBuf8 - binary.BigEndian.PutUint64(bs, ci.consistentIndex) - // put the index into the underlying backend - // tx has been locked in TxnBegin, so there is no need to lock it again - tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs) + index := atomic.LoadUint64(&ci.consistentIndex) + if index > 0 { + bs := make([]byte, 8) // this is kept on stack (not heap) so its quick. + binary.BigEndian.PutUint64(bs, index) + // put the index into the underlying backend + // tx has been locked in TxnBegin, so there is no need to lock it again + tx.UnsafePut(metaBucketName, consistentIndexKeyName, bs) + } } func (ci *consistentIndex) SetBatchTx(tx backend.BatchTx) { diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 8797ca6b286c..cda90c7b0d69 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -259,6 +259,7 @@ type EtcdServer struct { lessor lease.Lessor bemu sync.Mutex be backend.Backend + beHooks backend.Hooks authStore auth.AuthStore alarmStore *v3alarm.AlarmStore @@ -294,6 +295,17 @@ type EtcdServer struct { *AccessController } +type backendHooks struct { + indexer cindex.ConsistentIndexer + lg *zap.Logger +} + +func (bh *backendHooks) OnPreCommitUnsafe(tx backend.BatchTx) { + if bh.indexer != nil { + bh.indexer.UnsafeSave(tx) + } +} + // NewServer creates a new EtcdServer from the supplied configuration. The // configuration is considered static for the lifetime of the EtcdServer. func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { @@ -342,7 +354,9 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { bepath := cfg.BackendPath() beExist := fileutil.Exist(bepath) - be := openBackend(cfg) + + beHooks := &backendHooks{lg: cfg.Logger} + be := openBackend(cfg, beHooks) defer func() { if err != nil { @@ -460,7 +474,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { zap.String("snapshot-size", humanize.Bytes(uint64(snapshot.Size()))), ) - if be, err = recoverSnapshotBackend(cfg, be, *snapshot, beExist); err != nil { + if be, err = recoverSnapshotBackend(cfg, be, *snapshot, beExist, beHooks); err != nil { cfg.Logger.Panic("failed to recover v3 backend from snapshot", zap.Error(err)) } s1, s2 := be.Size(), be.SizeInUse() @@ -534,6 +548,7 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster) srv.be = be + srv.beHooks = beHooks minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases. @@ -560,6 +575,10 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) { return nil, err } srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit}) + + // Backend should contain 'meta' bucket going forward. + beHooks.indexer = srv.consistIndex + kvindex := srv.consistIndex.ConsistentIndex() srv.lg.Debug("restore consistentIndex", zap.Uint64("index", kvindex)) if beExist { @@ -1161,7 +1180,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) { // wait for raftNode to persist snapshot onto the disk <-apply.notifyc - newbe, err := openSnapshotBackend(s.Cfg, s.snapshotter, apply.snapshot) + newbe, err := openSnapshotBackend(s.Cfg, s.snapshotter, apply.snapshot, s.beHooks) if err != nil { lg.Panic("failed to open snapshot backend", zap.Error(err)) } @@ -2096,6 +2115,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) { return } s.lg.Debug("applyEntryNormal", zap.Stringer("raftReq", &raftReq)) + if raftReq.V2 != nil { req := (*RequestV2)(raftReq.V2) s.w.Trigger(req.ID, s.applyV2Request(req))