Skip to content

Commit

Permalink
Run a separate in memory snapshot to reduce number of entries stored …
Browse files Browse the repository at this point in the history
…in raft memory storage

Signed-off-by: Marek Siarkowicz <[email protected]>
  • Loading branch information
serathius committed Nov 29, 2024
1 parent aecf3fd commit 3b59eea
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 37 deletions.
2 changes: 1 addition & 1 deletion server/etcdserver/api/membership/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -899,7 +899,7 @@ func (c *RaftCluster) Store(store v2store.Store) {
if m.ClientURLs != nil {
mustUpdateMemberAttrInStore(c.lg, store, m)
}
c.lg.Info(
c.lg.Debug(
"snapshot storing member",
zap.String("id", m.ID.String()),
zap.Strings("peer-urls", m.PeerURLs),
Expand Down
80 changes: 45 additions & 35 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ const (
readyPercentThreshold = 0.9

DowngradeEnabledPath = "/downgrade/enabled"
memorySnapshotCount = 100
)

var (
Expand Down Expand Up @@ -291,8 +292,9 @@ type EtcdServer struct {
clusterVersionChanged *notify.Notifier

*AccessController
// forceSnapshot can force snapshot be triggered after apply, independent of the snapshotCount.
// forceDiskSnapshot can force snapshot be triggered after apply, independent of the snapshotCount.
// Should only be set within apply code path. Used to force snapshot after cluster version downgrade.
// TODO: Replace with flush db in v3.7 assuming v3.6 bootstraps from db file.
forceDiskSnapshot bool
corruptionChecker CorruptionChecker
}
Expand Down Expand Up @@ -1195,9 +1197,15 @@ func (s *EtcdServer) ForceSnapshot() {
}

func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) {
if !s.shouldSnapshot(ep) {
if !s.shouldSnapshotToDisk(ep) {
if ep.appliedi > ep.memorySnapshotIndex+memorySnapshotCount {
s.snapshot(ep, false)
s.compactRaftLog(ep.appliedi)
ep.memorySnapshotIndex = ep.appliedi
}
return
}
//TODO: Remove disk snapshot in v3.7
lg := s.Logger()
lg.Info(
"triggering snapshot",
Expand All @@ -1209,11 +1217,11 @@ func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) {
)
s.forceDiskSnapshot = false

s.snapshot(ep)
s.snapshot(ep, true)
s.compactRaftLog(ep.appliedi)
}

func (s *EtcdServer) shouldSnapshot(ep *etcdProgress) bool {
func (s *EtcdServer) shouldSnapshotToDisk(ep *etcdProgress) bool {
return (s.forceDiskSnapshot && ep.appliedi != ep.diskSnapshotIndex) || (ep.appliedi-ep.diskSnapshotIndex > s.Cfg.SnapshotCount)
}

Expand Down Expand Up @@ -2134,20 +2142,21 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con
}

// TODO: non-blocking snapshot
func (s *EtcdServer) snapshot(ep *etcdProgress) {
d := GetMembershipInfoInV2Format(s.Logger(), s.cluster)
// commit kv to write metadata (for example: consistent index) to disk.
//
// This guarantees that Backend's consistent_index is >= index of last snapshot.
//
// KV().commit() updates the consistent index in backend.
// All operations that update consistent index must be called sequentially
// from applyAll function.
// So KV().Commit() cannot run in parallel with toApply. It has to be called outside
// the go routine created below.
s.KV().Commit()

func (s *EtcdServer) snapshot(ep *etcdProgress, toDisk bool) {
lg := s.Logger()
d := GetMembershipInfoInV2Format(lg, s.cluster)
if toDisk {
// commit kv to write metadata (for example: consistent index) to disk.
//
// This guarantees that Backend's consistent_index is >= index of last snapshot.
//
// KV().commit() updates the consistent index in backend.
// All operations that update consistent index must be called sequentially
// from applyAll function.
// So KV().Commit() cannot run in parallel with toApply. It has to be called outside
// the go routine created below.
s.KV().Commit()
}

// For backward compatibility, generate v2 snapshot from v3 state.
snap, err := s.r.raftStorage.CreateSnapshot(ep.appliedi, &ep.confState, d)

Check warning on line 2162 in server/etcdserver/server.go

View check run for this annotation

Codecov / codecov/patch

server/etcdserver/server.go#L2162

Added line #L2162 was not covered by tests
Expand All @@ -2159,23 +2168,25 @@ func (s *EtcdServer) snapshot(ep *etcdProgress) {
}
lg.Panic("failed to create snapshot", zap.Error(err))
}
ep.memorySnapshotIndex = ep.appliedi

verifyConsistentIndexIsLatest(lg, snap, s.consistIndex.ConsistentIndex())

// SaveSnap saves the snapshot to file and appends the corresponding WAL entry.
if err = s.r.storage.SaveSnap(snap); err != nil {
lg.Panic("failed to save snapshot", zap.Error(err))
}
ep.diskSnapshotIndex = ep.appliedi
ep.memorySnapshotIndex = ep.appliedi
if err = s.r.storage.Release(snap); err != nil {
lg.Panic("failed to release wal", zap.Error(err))
}
if toDisk {
// SaveSnap saves the snapshot to file and appends the corresponding WAL entry.
if err = s.r.storage.SaveSnap(snap); err != nil {
lg.Panic("failed to save snapshot", zap.Error(err))

Check warning on line 2178 in server/etcdserver/server.go

View check run for this annotation

Codecov / codecov/patch

server/etcdserver/server.go#L2178

Added line #L2178 was not covered by tests
}
ep.diskSnapshotIndex = ep.appliedi
if err = s.r.storage.Release(snap); err != nil {
lg.Panic("failed to release wal", zap.Error(err))
}

lg.Info(
"saved snapshot",
zap.Uint64("snapshot-index", snap.Metadata.Index),
)
lg.Info(
"saved snapshot to disk",
zap.Uint64("snapshot-index", snap.Metadata.Index),
)
}
}

func (s *EtcdServer) compactRaftLog(snapi uint64) {
Expand All @@ -2192,11 +2203,10 @@ func (s *EtcdServer) compactRaftLog(snapi uint64) {
}

// keep some in memory log entries for slow followers.
compacti := uint64(1)
if snapi > s.Cfg.SnapshotCatchUpEntries {
compacti = snapi - s.Cfg.SnapshotCatchUpEntries
if snapi <= s.Cfg.SnapshotCatchUpEntries {
return
}

compacti := snapi - s.Cfg.SnapshotCatchUpEntries
err := s.r.raftStorage.Compact(compacti)
if err != nil {
// the compaction was done asynchronously with the progress of raft.
Expand All @@ -2206,7 +2216,7 @@ func (s *EtcdServer) compactRaftLog(snapi uint64) {
}
lg.Panic("failed to compact", zap.Error(err))
}
lg.Info(
lg.Debug(
"compacted Raft logs",
zap.Uint64("compact-index", compacti),
)
Expand Down
4 changes: 3 additions & 1 deletion tests/integration/v3_watch_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) {
//
// Since there is no way to confirm server has compacted the log, we
// use log monitor to watch and expect "compacted Raft logs" content.
expectMemberLog(t, clus.Members[initialLead], 5*time.Second, "compacted Raft logs", 2)
// In v3.6 we no longer generates "compacted Raft logs" log as raft compaction happens independently to snapshot.
// For now let's use snapshot log which should be equivalent to compaction.
expectMemberLog(t, clus.Members[initialLead], 5*time.Second, "saved snapshot to disk", 2)

// After RecoverPartition, leader L will send snapshot to slow F_m0
// follower, because F_m0(index:8) is 'out of date' compared to
Expand Down

0 comments on commit 3b59eea

Please sign in to comment.