From 2dd72a26dcaa0d760759cc8dcf08a2d5dc029d03 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Fri, 1 Nov 2024 18:31:49 +0100 Subject: [PATCH] Run a separate in memory snapshot to reduce number of entries stored in raft memory storage Signed-off-by: Marek Siarkowicz --- server/etcdserver/api/membership/cluster.go | 6 -- server/etcdserver/server.go | 90 +++++++++++++-------- server/etcdserver/server_test.go | 2 +- tests/integration/v3_watch_restore_test.go | 4 +- 4 files changed, 62 insertions(+), 40 deletions(-) diff --git a/server/etcdserver/api/membership/cluster.go b/server/etcdserver/api/membership/cluster.go index e35375a22c6a..e81908d0e74c 100644 --- a/server/etcdserver/api/membership/cluster.go +++ b/server/etcdserver/api/membership/cluster.go @@ -899,12 +899,6 @@ func (c *RaftCluster) Store(store v2store.Store) { if m.ClientURLs != nil { mustUpdateMemberAttrInStore(c.lg, store, m) } - c.lg.Info( - "snapshot storing member", - zap.String("id", m.ID.String()), - zap.Strings("peer-urls", m.PeerURLs), - zap.Bool("is-learner", m.IsLearner), - ) } for id := range c.removed { //We do not need to delete the member since the store is empty. diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 3e5c6b619d2f..7ed6c84751e9 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -109,6 +109,7 @@ const ( readyPercentThreshold = 0.9 DowngradeEnabledPath = "/downgrade/enabled" + memorySnapshotCount = 10 ) var ( @@ -291,9 +292,10 @@ type EtcdServer struct { clusterVersionChanged *notify.Notifier *AccessController - // forceSnapshot can force snapshot be triggered after apply, independent of the snapshotCount. + // forceDiskSnapshot can force snapshot be triggered after apply, independent of the snapshotCount. // Should only be set within apply code path. Used to force snapshot after cluster version downgrade. - forceSnapshot bool + // TODO: Replace with flush db in v3.7 assuming v3.6 bootstraps from db file. + forceDiskSnapshot bool corruptionChecker CorruptionChecker } @@ -741,10 +743,11 @@ func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) { } type etcdProgress struct { - confState raftpb.ConfState - snapi uint64 - appliedt uint64 - appliedi uint64 + confState raftpb.ConfState + diskSnapshotIndex uint64 + memorySnapshotIndex uint64 + appliedt uint64 + appliedi uint64 } // raftReadyHandler contains a set of EtcdServer operations to be called by raftNode, @@ -809,10 +812,11 @@ func (s *EtcdServer) run() { s.r.start(rh) ep := etcdProgress{ - confState: sn.Metadata.ConfState, - snapi: sn.Metadata.Index, - appliedt: sn.Metadata.Term, - appliedi: sn.Metadata.Index, + confState: sn.Metadata.ConfState, + diskSnapshotIndex: sn.Metadata.Index, + memorySnapshotIndex: sn.Metadata.Index, + appliedt: sn.Metadata.Term, + appliedi: sn.Metadata.Index, } defer func() { @@ -998,7 +1002,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, toApply *toApply) { lg := s.Logger() lg.Info( "applying snapshot", - zap.Uint64("current-snapshot-index", ep.snapi), + zap.Uint64("current-snapshot-index", ep.diskSnapshotIndex), zap.Uint64("current-applied-index", ep.appliedi), zap.Uint64("incoming-leader-snapshot-index", toApply.snapshot.Metadata.Index), zap.Uint64("incoming-leader-snapshot-term", toApply.snapshot.Metadata.Term), @@ -1006,7 +1010,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, toApply *toApply) { defer func() { lg.Info( "applied snapshot", - zap.Uint64("current-snapshot-index", ep.snapi), + zap.Uint64("current-snapshot-index", ep.diskSnapshotIndex), zap.Uint64("current-applied-index", ep.appliedi), zap.Uint64("incoming-leader-snapshot-index", toApply.snapshot.Metadata.Index), zap.Uint64("incoming-leader-snapshot-term", toApply.snapshot.Metadata.Term), @@ -1017,7 +1021,7 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, toApply *toApply) { if toApply.snapshot.Metadata.Index <= ep.appliedi { lg.Panic( "unexpected leader snapshot from outdated index", - zap.Uint64("current-snapshot-index", ep.snapi), + zap.Uint64("current-snapshot-index", ep.diskSnapshotIndex), zap.Uint64("current-applied-index", ep.appliedi), zap.Uint64("incoming-leader-snapshot-index", toApply.snapshot.Metadata.Index), zap.Uint64("incoming-leader-snapshot-term", toApply.snapshot.Metadata.Term), @@ -1132,7 +1136,8 @@ func (s *EtcdServer) applySnapshot(ep *etcdProgress, toApply *toApply) { ep.appliedt = toApply.snapshot.Metadata.Term ep.appliedi = toApply.snapshot.Metadata.Index - ep.snapi = ep.appliedi + ep.diskSnapshotIndex = ep.appliedi + ep.memorySnapshotIndex = ep.appliedi ep.confState = toApply.snapshot.Metadata.ConfState // As backends and implementations like alarmsStore changed, we need @@ -1188,31 +1193,37 @@ func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *toApply) { } func (s *EtcdServer) ForceSnapshot() { - s.forceSnapshot = true + s.forceDiskSnapshot = true } func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) { - if !s.shouldSnapshot(ep) { + if !s.shouldSnapshotToDisk(ep) { + if ep.appliedi > ep.memorySnapshotIndex+memorySnapshotCount { + s.snapshotToMemory(ep.appliedi, ep.confState) + s.compactRaftLog(ep.appliedi) + ep.memorySnapshotIndex = ep.appliedi + } return } + //TODO: Remove disk snapshot in v3.7 lg := s.Logger() lg.Info( "triggering snapshot", zap.String("local-member-id", s.MemberID().String()), zap.Uint64("local-member-applied-index", ep.appliedi), - zap.Uint64("local-member-snapshot-index", ep.snapi), + zap.Uint64("local-member-snapshot-index", ep.diskSnapshotIndex), zap.Uint64("local-member-snapshot-count", s.Cfg.SnapshotCount), - zap.Bool("snapshot-forced", s.forceSnapshot), + zap.Bool("snapshot-forced", s.forceDiskSnapshot), ) - s.forceSnapshot = false + s.forceDiskSnapshot = false - s.snapshot(ep.appliedi, ep.confState) + s.snapshotToDisk(ep.appliedi, ep.confState) s.compactRaftLog(ep.appliedi) - ep.snapi = ep.appliedi + ep.diskSnapshotIndex = ep.appliedi } -func (s *EtcdServer) shouldSnapshot(ep *etcdProgress) bool { - return (s.forceSnapshot && ep.appliedi != ep.snapi) || (ep.appliedi-ep.snapi > s.Cfg.SnapshotCount) +func (s *EtcdServer) shouldSnapshotToDisk(ep *etcdProgress) bool { + return (s.forceDiskSnapshot && ep.appliedi != ep.diskSnapshotIndex) || (ep.appliedi-ep.diskSnapshotIndex > s.Cfg.SnapshotCount) } func (s *EtcdServer) hasMultipleVotingMembers() bool { @@ -2132,7 +2143,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con } // TODO: non-blocking snapshot -func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { +func (s *EtcdServer) snapshotToDisk(snapi uint64, confState raftpb.ConfState) { d := GetMembershipInfoInV2Format(s.Logger(), s.cluster) // commit kv to write metadata (for example: consistent index) to disk. // @@ -2169,11 +2180,30 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { } lg.Info( - "saved snapshot", + "saved snapshot to disk", zap.Uint64("snapshot-index", snap.Metadata.Index), ) } +func (s *EtcdServer) snapshotToMemory(snapi uint64, confState raftpb.ConfState) { + d := GetMembershipInfoInV2Format(s.Logger(), s.cluster) + + lg := s.Logger() + + // For backward compatibility, generate v2 snapshot from v3 state. + snap, err := s.r.raftStorage.CreateSnapshot(snapi, &confState, d) + if err != nil { + // the snapshot was done asynchronously with the progress of raft. + // raft might have already got a newer snapshot. + if errorspkg.Is(err, raft.ErrSnapOutOfDate) { + return + } + lg.Panic("failed to create snapshot", zap.Error(err)) + } + + verifyConsistentIndexIsLatest(lg, snap, s.consistIndex.ConsistentIndex()) +} + func (s *EtcdServer) compactRaftLog(snapi uint64) { lg := s.Logger() @@ -2189,10 +2219,10 @@ func (s *EtcdServer) compactRaftLog(snapi uint64) { // keep some in memory log entries for slow followers. compacti := uint64(1) - if snapi > s.Cfg.SnapshotCatchUpEntries { - compacti = snapi - s.Cfg.SnapshotCatchUpEntries + if snapi <= s.Cfg.SnapshotCatchUpEntries { + return } - + compacti = snapi - s.Cfg.SnapshotCatchUpEntries err := s.r.raftStorage.Compact(compacti) if err != nil { // the compaction was done asynchronously with the progress of raft. @@ -2202,10 +2232,6 @@ func (s *EtcdServer) compactRaftLog(snapi uint64) { } lg.Panic("failed to compact", zap.Error(err)) } - lg.Info( - "compacted Raft logs", - zap.Uint64("compact-index", compacti), - ) } // CutPeer drops messages to the specified peer. diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index 688d71be264f..7ee3b4b8a162 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -649,7 +649,7 @@ func TestSnapshot(t *testing.T) { } }() - srv.snapshot(1, raftpb.ConfState{Voters: []uint64{1}}) + srv.snapshotToDisk(1, raftpb.ConfState{Voters: []uint64{1}}) <-ch if len(st.Action()) != 0 { t.Errorf("no action expected on v2store. Got %d actions", len(st.Action())) diff --git a/tests/integration/v3_watch_restore_test.go b/tests/integration/v3_watch_restore_test.go index f7e2e4b4730b..56b84f0bd9a5 100644 --- a/tests/integration/v3_watch_restore_test.go +++ b/tests/integration/v3_watch_restore_test.go @@ -110,7 +110,9 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) { // // Since there is no way to confirm server has compacted the log, we // use log monitor to watch and expect "compacted Raft logs" content. - expectMemberLog(t, clus.Members[initialLead], 5*time.Second, "compacted Raft logs", 2) + // In v3.6 we no longer generates "compacted Raft logs" log as raft compaction happens independently to snapshot. + // For now let's use snapshot log which should be equivalent to compaction. + expectMemberLog(t, clus.Members[initialLead], 5*time.Second, "saved snapshot to disk", 2) // After RecoverPartition, leader L will send snapshot to slow F_m0 // follower, because F_m0(index:8) is 'out of date' compared to