diff --git a/server/etcdserver/api/membership/cluster.go b/server/etcdserver/api/membership/cluster.go index e35375a22c6a..0104b801511b 100644 --- a/server/etcdserver/api/membership/cluster.go +++ b/server/etcdserver/api/membership/cluster.go @@ -899,7 +899,7 @@ func (c *RaftCluster) Store(store v2store.Store) { if m.ClientURLs != nil { mustUpdateMemberAttrInStore(c.lg, store, m) } - c.lg.Info( + c.lg.Debug( "snapshot storing member", zap.String("id", m.ID.String()), zap.Strings("peer-urls", m.PeerURLs), diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index fed5f75b1ab8..41ce5dd66703 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -109,6 +109,7 @@ const ( readyPercentThreshold = 0.9 DowngradeEnabledPath = "/downgrade/enabled" + memorySnapshotCount = 100 ) var ( @@ -291,8 +292,9 @@ type EtcdServer struct { clusterVersionChanged *notify.Notifier *AccessController - // forceSnapshot can force snapshot be triggered after apply, independent of the snapshotCount. + // forceDiskSnapshot can force snapshot be triggered after apply, independent of the snapshotCount. // Should only be set within apply code path. Used to force snapshot after cluster version downgrade. + // TODO: Replace with flush db in v3.7 assuming v3.6 bootstraps from db file. forceDiskSnapshot bool corruptionChecker CorruptionChecker } @@ -1195,9 +1197,15 @@ func (s *EtcdServer) ForceSnapshot() { } func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) { - if !s.shouldSnapshot(ep) { + if !s.shouldSnapshotToDisk(ep) { + if ep.appliedi > ep.memorySnapshotIndex+memorySnapshotCount { + s.snapshot(ep, false) + s.compactRaftLog(ep.appliedi) + ep.memorySnapshotIndex = ep.appliedi + } return } + //TODO: Remove disk snapshot in v3.7 lg := s.Logger() lg.Info( "triggering snapshot", @@ -1209,11 +1217,11 @@ func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) { ) s.forceDiskSnapshot = false - s.snapshot(ep) + s.snapshot(ep, true) s.compactRaftLog(ep.appliedi) } -func (s *EtcdServer) shouldSnapshot(ep *etcdProgress) bool { +func (s *EtcdServer) shouldSnapshotToDisk(ep *etcdProgress) bool { return (s.forceDiskSnapshot && ep.appliedi != ep.diskSnapshotIndex) || (ep.appliedi-ep.diskSnapshotIndex > s.Cfg.SnapshotCount) } @@ -2134,20 +2142,21 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con } // TODO: non-blocking snapshot -func (s *EtcdServer) snapshot(ep *etcdProgress) { - d := GetMembershipInfoInV2Format(s.Logger(), s.cluster) - // commit kv to write metadata (for example: consistent index) to disk. - // - // This guarantees that Backend's consistent_index is >= index of last snapshot. - // - // KV().commit() updates the consistent index in backend. - // All operations that update consistent index must be called sequentially - // from applyAll function. - // So KV().Commit() cannot run in parallel with toApply. It has to be called outside - // the go routine created below. - s.KV().Commit() - +func (s *EtcdServer) snapshot(ep *etcdProgress, toDisk bool) { lg := s.Logger() + d := GetMembershipInfoInV2Format(lg, s.cluster) + if toDisk { + // commit kv to write metadata (for example: consistent index) to disk. + // + // This guarantees that Backend's consistent_index is >= index of last snapshot. + // + // KV().commit() updates the consistent index in backend. + // All operations that update consistent index must be called sequentially + // from applyAll function. + // So KV().Commit() cannot run in parallel with toApply. It has to be called outside + // the go routine created below. + s.KV().Commit() + } // For backward compatibility, generate v2 snapshot from v3 state. snap, err := s.r.raftStorage.CreateSnapshot(ep.appliedi, &ep.confState, d) @@ -2159,23 +2168,25 @@ func (s *EtcdServer) snapshot(ep *etcdProgress) { } lg.Panic("failed to create snapshot", zap.Error(err)) } + ep.memorySnapshotIndex = ep.appliedi verifyConsistentIndexIsLatest(lg, snap, s.consistIndex.ConsistentIndex()) - // SaveSnap saves the snapshot to file and appends the corresponding WAL entry. - if err = s.r.storage.SaveSnap(snap); err != nil { - lg.Panic("failed to save snapshot", zap.Error(err)) - } - ep.diskSnapshotIndex = ep.appliedi - ep.memorySnapshotIndex = ep.appliedi - if err = s.r.storage.Release(snap); err != nil { - lg.Panic("failed to release wal", zap.Error(err)) - } + if toDisk { + // SaveSnap saves the snapshot to file and appends the corresponding WAL entry. + if err = s.r.storage.SaveSnap(snap); err != nil { + lg.Panic("failed to save snapshot", zap.Error(err)) + } + ep.diskSnapshotIndex = ep.appliedi + if err = s.r.storage.Release(snap); err != nil { + lg.Panic("failed to release wal", zap.Error(err)) + } - lg.Info( - "saved snapshot", - zap.Uint64("snapshot-index", snap.Metadata.Index), - ) + lg.Info( + "saved snapshot to disk", + zap.Uint64("snapshot-index", snap.Metadata.Index), + ) + } } func (s *EtcdServer) compactRaftLog(snapi uint64) { @@ -2192,11 +2203,10 @@ func (s *EtcdServer) compactRaftLog(snapi uint64) { } // keep some in memory log entries for slow followers. - compacti := uint64(1) - if snapi > s.Cfg.SnapshotCatchUpEntries { - compacti = snapi - s.Cfg.SnapshotCatchUpEntries + if snapi <= s.Cfg.SnapshotCatchUpEntries { + return } - + compacti := snapi - s.Cfg.SnapshotCatchUpEntries err := s.r.raftStorage.Compact(compacti) if err != nil { // the compaction was done asynchronously with the progress of raft. @@ -2206,7 +2216,7 @@ func (s *EtcdServer) compactRaftLog(snapi uint64) { } lg.Panic("failed to compact", zap.Error(err)) } - lg.Info( + lg.Debug( "compacted Raft logs", zap.Uint64("compact-index", compacti), ) diff --git a/tests/integration/v3_watch_restore_test.go b/tests/integration/v3_watch_restore_test.go index f7e2e4b4730b..56b84f0bd9a5 100644 --- a/tests/integration/v3_watch_restore_test.go +++ b/tests/integration/v3_watch_restore_test.go @@ -110,7 +110,9 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) { // // Since there is no way to confirm server has compacted the log, we // use log monitor to watch and expect "compacted Raft logs" content. - expectMemberLog(t, clus.Members[initialLead], 5*time.Second, "compacted Raft logs", 2) + // In v3.6 we no longer generates "compacted Raft logs" log as raft compaction happens independently to snapshot. + // For now let's use snapshot log which should be equivalent to compaction. + expectMemberLog(t, clus.Members[initialLead], 5*time.Second, "saved snapshot to disk", 2) // After RecoverPartition, leader L will send snapshot to slow F_m0 // follower, because F_m0(index:8) is 'out of date' compared to