From 45fc4c369848104862df20f0182e5e5d36a2b8d5 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Fri, 1 Nov 2024 18:31:49 +0100 Subject: [PATCH] Run a separate in memory snapshot to reduce number of entries stored in raft memory storage Signed-off-by: Marek Siarkowicz --- server/etcdserver/api/membership/cluster.go | 2 +- server/etcdserver/server.go | 90 ++++++++++++--------- server/etcdserver/server_test.go | 64 ++++++++++++++- tests/integration/v3_watch_restore_test.go | 4 +- 4 files changed, 116 insertions(+), 44 deletions(-) diff --git a/server/etcdserver/api/membership/cluster.go b/server/etcdserver/api/membership/cluster.go index 9d62db121541..6539b977d233 100644 --- a/server/etcdserver/api/membership/cluster.go +++ b/server/etcdserver/api/membership/cluster.go @@ -899,7 +899,7 @@ func (c *RaftCluster) Store(store v2store.Store) { if m.ClientURLs != nil { mustUpdateMemberAttrInStore(c.lg, store, m) } - c.lg.Info( + c.lg.Debug( "snapshot storing member", zap.String("id", m.ID.String()), zap.Strings("peer-urls", m.PeerURLs), diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 11d5ec69a258..446606431a43 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -109,6 +109,7 @@ const ( readyPercentThreshold = 0.9 DowngradeEnabledPath = "/downgrade/enabled" + memorySnapshotCount = 100 ) var ( @@ -293,6 +294,7 @@ type EtcdServer struct { *AccessController // forceDiskSnapshot can force snapshot be triggered after apply, independent of the snapshotCount. // Should only be set within apply code path. Used to force snapshot after cluster version downgrade. + // TODO: Replace with flush db in v3.7 assuming v3.6 bootstraps from db file. forceDiskSnapshot bool corruptionChecker CorruptionChecker } @@ -1195,17 +1197,24 @@ func (s *EtcdServer) ForceSnapshot() { } func (s *EtcdServer) snapshotIfNeededAndCompactRaftLog(ep *etcdProgress) { - if !s.shouldSnapshot(ep) { + //TODO: Remove disk snapshot in v3.7 + shouldSnapshotToDisk := s.shouldSnapshotToDisk(ep) + shouldSnapshotToMemory := s.shouldSnapshotToMemory(ep) + if !shouldSnapshotToDisk && !shouldSnapshotToMemory { return } - s.snapshot(ep) + s.snapshot(ep, shouldSnapshotToDisk) s.compactRaftLog(ep.appliedi) } -func (s *EtcdServer) shouldSnapshot(ep *etcdProgress) bool { +func (s *EtcdServer) shouldSnapshotToDisk(ep *etcdProgress) bool { return (s.forceDiskSnapshot && ep.appliedi != ep.diskSnapshotIndex) || (ep.appliedi-ep.diskSnapshotIndex > s.Cfg.SnapshotCount) } +func (s *EtcdServer) shouldSnapshotToMemory(ep *etcdProgress) bool { + return ep.appliedi > ep.memorySnapshotIndex+memorySnapshotCount +} + func (s *EtcdServer) hasMultipleVotingMembers() bool { return s.cluster != nil && len(s.cluster.VotingMemberIDs()) > 1 } @@ -2119,28 +2128,30 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con } // TODO: non-blocking snapshot -func (s *EtcdServer) snapshot(ep *etcdProgress) { +func (s *EtcdServer) snapshot(ep *etcdProgress, toDisk bool) { lg := s.Logger() - lg.Info( - "triggering snapshot", - zap.String("local-member-id", s.MemberID().String()), - zap.Uint64("local-member-applied-index", ep.appliedi), - zap.Uint64("local-member-snapshot-index", ep.diskSnapshotIndex), - zap.Uint64("local-member-snapshot-count", s.Cfg.SnapshotCount), - zap.Bool("snapshot-forced", s.forceDiskSnapshot), - ) - s.forceDiskSnapshot = false - d := GetMembershipInfoInV2Format(s.Logger(), s.cluster) - // commit kv to write metadata (for example: consistent index) to disk. - // - // This guarantees that Backend's consistent_index is >= index of last snapshot. - // - // KV().commit() updates the consistent index in backend. - // All operations that update consistent index must be called sequentially - // from applyAll function. - // So KV().Commit() cannot run in parallel with toApply. It has to be called outside - // the go routine created below. - s.KV().Commit() + d := GetMembershipInfoInV2Format(lg, s.cluster) + if toDisk { + s.Logger().Info( + "triggering snapshot", + zap.String("local-member-id", s.MemberID().String()), + zap.Uint64("local-member-applied-index", ep.appliedi), + zap.Uint64("local-member-snapshot-index", ep.diskSnapshotIndex), + zap.Uint64("local-member-snapshot-count", s.Cfg.SnapshotCount), + zap.Bool("snapshot-forced", s.forceDiskSnapshot), + ) + s.forceDiskSnapshot = false + // commit kv to write metadata (for example: consistent index) to disk. + // + // This guarantees that Backend's consistent_index is >= index of last snapshot. + // + // KV().commit() updates the consistent index in backend. + // All operations that update consistent index must be called sequentially + // from applyAll function. + // So KV().Commit() cannot run in parallel with toApply. It has to be called outside + // the go routine created below. + s.KV().Commit() + } // For backward compatibility, generate v2 snapshot from v3 state. snap, err := s.r.raftStorage.CreateSnapshot(ep.appliedi, &ep.confState, d) @@ -2152,23 +2163,25 @@ func (s *EtcdServer) snapshot(ep *etcdProgress) { } lg.Panic("failed to create snapshot", zap.Error(err)) } + ep.memorySnapshotIndex = ep.appliedi verifyConsistentIndexIsLatest(lg, snap, s.consistIndex.ConsistentIndex()) - // SaveSnap saves the snapshot to file and appends the corresponding WAL entry. - if err = s.r.storage.SaveSnap(snap); err != nil { - lg.Panic("failed to save snapshot", zap.Error(err)) - } - ep.diskSnapshotIndex = ep.appliedi - ep.memorySnapshotIndex = ep.appliedi - if err = s.r.storage.Release(snap); err != nil { - lg.Panic("failed to release wal", zap.Error(err)) - } + if toDisk { + // SaveSnap saves the snapshot to file and appends the corresponding WAL entry. + if err = s.r.storage.SaveSnap(snap); err != nil { + lg.Panic("failed to save snapshot", zap.Error(err)) + } + ep.diskSnapshotIndex = ep.appliedi + if err = s.r.storage.Release(snap); err != nil { + lg.Panic("failed to release wal", zap.Error(err)) + } - lg.Info( - "saved snapshot", - zap.Uint64("snapshot-index", snap.Metadata.Index), - ) + lg.Info( + "saved snapshot to disk", + zap.Uint64("snapshot-index", snap.Metadata.Index), + ) + } } func (s *EtcdServer) compactRaftLog(snapi uint64) { @@ -2189,7 +2202,6 @@ func (s *EtcdServer) compactRaftLog(snapi uint64) { if snapi > s.Cfg.SnapshotCatchUpEntries { compacti = snapi - s.Cfg.SnapshotCatchUpEntries } - err := s.r.raftStorage.Compact(compacti) if err != nil { // the compaction was done asynchronously with the progress of raft. @@ -2199,7 +2211,7 @@ func (s *EtcdServer) compactRaftLog(snapi uint64) { } lg.Panic("failed to compact", zap.Error(err)) } - lg.Info( + lg.Debug( "compacted Raft logs", zap.Uint64("compact-index", compacti), ) diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index 8724cdbe2649..d2864fc03834 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -627,8 +627,8 @@ func TestApplyMultiConfChangeShouldStop(t *testing.T) { } } -// TestSnapshot should snapshot the store and cut the persistent -func TestSnapshot(t *testing.T) { +// TestSnapshotDisk should save the snapshot to disk and release old snapshots +func TestSnapshotDisk(t *testing.T) { revertFunc := verify.DisableVerifications() defer revertFunc() @@ -680,7 +680,7 @@ func TestSnapshot(t *testing.T) { } }() ep := etcdProgress{appliedi: 1, confState: raftpb.ConfState{Voters: []uint64{1}}} - srv.snapshot(&ep) + srv.snapshot(&ep, true) <-ch if len(st.Action()) != 0 { t.Errorf("no action expected on v2store. Got %d actions", len(st.Action())) @@ -693,6 +693,64 @@ func TestSnapshot(t *testing.T) { } } +func TestSnapshotMemory(t *testing.T) { + revertFunc := verify.DisableVerifications() + defer revertFunc() + + be, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, be) + + s := raft.NewMemoryStorage() + s.Append([]raftpb.Entry{{Index: 1}}) + st := mockstore.NewRecorderStream() + p := mockstorage.NewStorageRecorderStream("") + r := newRaftNode(raftNodeConfig{ + lg: zaptest.NewLogger(t), + Node: newNodeNop(), + raftStorage: s, + storage: p, + }) + srv := &EtcdServer{ + lgMu: new(sync.RWMutex), + lg: zaptest.NewLogger(t), + r: *r, + v2store: st, + consistIndex: cindex.NewConsistentIndex(be), + } + srv.kv = mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.StoreConfig{}) + defer func() { + assert.NoError(t, srv.kv.Close()) + }() + srv.be = be + + cl := membership.NewCluster(zaptest.NewLogger(t)) + srv.cluster = cl + + ch := make(chan struct{}, 1) + + go func() { + gaction, _ := p.Wait(1) + defer func() { ch <- struct{}{} }() + + if len(gaction) != 0 { + t.Errorf("len(action) = %d, want 0", len(gaction)) + return + } + }() + ep := etcdProgress{appliedi: 1, confState: raftpb.ConfState{Voters: []uint64{1}}} + srv.snapshot(&ep, false) + <-ch + if len(st.Action()) != 0 { + t.Errorf("no action expected on v2store. Got %d actions", len(st.Action())) + } + if ep.diskSnapshotIndex != 0 { + t.Errorf("ep.diskSnapshotIndex = %d, want 0", ep.diskSnapshotIndex) + } + if ep.memorySnapshotIndex != 1 { + t.Errorf("ep.memorySnapshotIndex = %d, want 1", ep.memorySnapshotIndex) + } +} + // TestSnapshotOrdering ensures raft persists snapshot onto disk before // snapshot db is applied. func TestSnapshotOrdering(t *testing.T) { diff --git a/tests/integration/v3_watch_restore_test.go b/tests/integration/v3_watch_restore_test.go index b9b6afccb38b..b3fc8236f784 100644 --- a/tests/integration/v3_watch_restore_test.go +++ b/tests/integration/v3_watch_restore_test.go @@ -111,7 +111,9 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) { // // Since there is no way to confirm server has compacted the log, we // use log monitor to watch and expect "compacted Raft logs" content. - expectMemberLog(t, clus.Members[initialLead], 5*time.Second, "compacted Raft logs", 2) + // In v3.6 we no longer generates "compacted Raft logs" log as raft compaction happens independently to snapshot. + // For now let's use snapshot log which should be equivalent to compaction. + expectMemberLog(t, clus.Members[initialLead], 5*time.Second, "saved snapshot to disk", 2) // After RecoverPartition, leader L will send snapshot to slow F_m0 // follower, because F_m0(index:8) is 'out of date' compared to