diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 5571b6d3862d..211785348cb4 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -966,8 +966,9 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *toApply) { // storage, since the raft routine might be slower than toApply routine. <-apply.notifyc - s.triggerSnapshot(ep) - s.compactRaftLog(ep.appliedi) + tr := s.triggerSnapshot(ep) + // force compaction after each snapshot as some tests assume it happens right away + s.compactRaftLog(ep.appliedi, tr) select { // snapshot requested via send() case m := <-s.r.msgSnapC: @@ -1179,9 +1180,10 @@ func (s *EtcdServer) ForceSnapshot() { s.forceSnapshot = true } -func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) { +// triggerSnapshot returns whether the snapshot has been triggered +func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) bool { if !s.shouldSnapshot(ep) { - return + return false } lg := s.Logger() lg.Info( @@ -1196,6 +1198,7 @@ func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) { s.snapshot(ep.appliedi, ep.confState) ep.snapi = ep.appliedi + return true } func (s *EtcdServer) shouldSnapshot(ep *etcdProgress) bool { @@ -2160,7 +2163,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { ) } -func (s *EtcdServer) compactRaftLog(appliedi uint64) { +func (s *EtcdServer) compactRaftLog(appliedi uint64, force bool) { lg := s.Logger() // keep some in memory log entries for slow followers @@ -2168,8 +2171,8 @@ func (s *EtcdServer) compactRaftLog(appliedi uint64) { return } compacti := appliedi - s.Cfg.SnapshotCatchUpEntries - // only compact raft log once every N applies - if compacti%CompactRaftLogEveryNApplies != 0 { + // if not force, only compact raft log once every N applies + if !force && compacti%CompactRaftLogEveryNApplies != 0 { return } diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index 12d2f824f27f..3f423593fd01 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -854,11 +854,11 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) { } } -// TestCompact ensures that: +// TestCompactRaftLog ensures that: // 1. raft log gets compacted only when entries size is greater than SnapshotCatchUpEntries -// 2. raft log gets compacted only once every CompactRaftLogEveryNApplies applies +// 2. raft log gets compacted once every CompactRaftLogEveryNApplies applies // 3. after a compaction, entries size equals SnapshotCatchUpEntries -func TestCompact(t *testing.T) { +func TestCompactRaftLog(t *testing.T) { lg := zaptest.NewLogger(t) n := newNodeConfChangeCommitterRecorder() n.readyc <- raft.Ready{ @@ -925,7 +925,7 @@ func TestCompact(t *testing.T) { putFooBar(nextIndex) nextIndex++ } - time.Sleep(time.Second) + time.Sleep(time.Second * 3) // compact should not happen before entries size reaches snapshotCatchUpEntries+CompactRaftLogEveryNApplies fi, li, cnt := mustGetEntriesInfo() assert.Equal(t, uint64(1), fi) @@ -935,7 +935,7 @@ func TestCompact(t *testing.T) { // trigger the first compaction putFooBar(nextIndex) nextIndex++ - time.Sleep(time.Second) + time.Sleep(time.Second * 3) fi, li, cnt = mustGetEntriesInfo() assert.Equal(t, nextIndex-1, li) assert.Equal(t, li-snapshotCatchUpEntries+1, fi)