diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 6708f71bf9af..5571b6d3862d 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -108,6 +108,10 @@ const ( readyPercentThreshold = 0.9 DowngradeEnabledPath = "/downgrade/enabled" + + // CompactRaftLogEveryNApplies improves performance by compacting raft log once every N applies. + // Minimum value is 1, which means compacting raft log every apply. + CompactRaftLogEveryNApplies uint64 = 100 ) var ( @@ -963,6 +967,7 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *toApply) { <-apply.notifyc s.triggerSnapshot(ep) + s.compactRaftLog(ep.appliedi) select { // snapshot requested via send() case m := <-s.r.msgSnapC: @@ -2153,6 +2158,20 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { "saved snapshot", zap.Uint64("snapshot-index", snap.Metadata.Index), ) +} + +func (s *EtcdServer) compactRaftLog(appliedi uint64) { + lg := s.Logger() + + // keep some in memory log entries for slow followers + if appliedi <= s.Cfg.SnapshotCatchUpEntries { + return + } + compacti := appliedi - s.Cfg.SnapshotCatchUpEntries + // only compact raft log once every N applies + if compacti%CompactRaftLogEveryNApplies != 0 { + return + } // When sending a snapshot, etcd will pause compaction. // After receives a snapshot, the slow follower needs to get all the entries right after @@ -2164,13 +2183,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { return } - // keep some in memory log entries for slow followers. - compacti := uint64(1) - if snapi > s.Cfg.SnapshotCatchUpEntries { - compacti = snapi - s.Cfg.SnapshotCatchUpEntries - } - - err = s.r.raftStorage.Compact(compacti) + err := s.r.raftStorage.Compact(compacti) if err != nil { // the compaction was done asynchronously with the progress of raft. // raft log might already been compact. diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index 215c86abbc9c..12d2f824f27f 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -19,6 +19,7 @@ import ( "encoding/json" "fmt" "math" + "math/rand/v2" "net/http" "os" "path/filepath" @@ -853,6 +854,97 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) { } } +// TestCompact ensures that: +// 1. raft log gets compacted only when entries size is greater than SnapshotCatchUpEntries +// 2. raft log gets compacted only once every CompactRaftLogEveryNApplies applies +// 3. after a compaction, entries size equals SnapshotCatchUpEntries +func TestCompact(t *testing.T) { + lg := zaptest.NewLogger(t) + n := newNodeConfChangeCommitterRecorder() + n.readyc <- raft.Ready{ + SoftState: &raft.SoftState{RaftState: raft.StateLeader}, + } + rs := raft.NewMemoryStorage() + + cl := newTestCluster(t) + cl.SetStore(v2store.New()) + be, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, be) + cl.SetBackend(schema.NewMembershipBackend(lg, be)) + cl.AddMember(&membership.Member{ID: 1234}, true) + + r := newRaftNode(raftNodeConfig{ + lg: zaptest.NewLogger(t), + Node: n, + raftStorage: rs, + storage: mockstorage.NewStorageRecorder(""), + transport: newNopTransporter(), + }) + + snapshotCatchUpEntries := rand.Uint64N(10000) + 1 + s := &EtcdServer{ + lgMu: new(sync.RWMutex), + lg: zaptest.NewLogger(t), + Cfg: config.ServerConfig{Logger: lg, SnapshotCatchUpEntries: snapshotCatchUpEntries}, + r: *r, + v2store: v2store.New(), + cluster: cl, + reqIDGen: idutil.NewGenerator(0, time.Time{}), + SyncTicker: &time.Ticker{}, + consistIndex: cindex.NewFakeConsistentIndex(0), + uberApply: uberApplierMock{}, + kv: mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.StoreConfig{}), + } + s.start() + + putFooBar := func(index uint64) { + req := &pb.InternalRaftRequest{ + Header: &pb.RequestHeader{ID: index}, + Put: &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}, + } + ents := []raftpb.Entry{{Index: index, Data: pbutil.MustMarshal(req)}} + n.readyc <- raft.Ready{Entries: ents} + n.readyc <- raft.Ready{CommittedEntries: ents} + } + + // get first index, last index and count of entries + mustGetEntriesInfo := func() (fi, li, cnt uint64) { + fi, err := rs.FirstIndex() + if err != nil { + t.Fatalf("FirstIndex error: %v", err) + } + li, err = rs.LastIndex() + if err != nil { + t.Fatalf("LastIndex error: %v", err) + } + return fi, li, li - fi + 1 + } + + nextIndex := uint64(1) + for nextIndex < snapshotCatchUpEntries+CompactRaftLogEveryNApplies { + putFooBar(nextIndex) + nextIndex++ + } + time.Sleep(time.Second) + // compact should not happen before entries size reaches snapshotCatchUpEntries+CompactRaftLogEveryNApplies + fi, li, cnt := mustGetEntriesInfo() + assert.Equal(t, uint64(1), fi) + assert.Equal(t, snapshotCatchUpEntries+CompactRaftLogEveryNApplies-1, li) + assert.Equal(t, snapshotCatchUpEntries+CompactRaftLogEveryNApplies-1, cnt) + + // trigger the first compaction + putFooBar(nextIndex) + nextIndex++ + time.Sleep(time.Second) + fi, li, cnt = mustGetEntriesInfo() + assert.Equal(t, nextIndex-1, li) + assert.Equal(t, li-snapshotCatchUpEntries+1, fi) + // after each compaction, entries size should equal snapshotCatchUpEntries + assert.Equal(t, snapshotCatchUpEntries, cnt) + + s.Stop() +} + // TestAddMember tests AddMember can propose and perform node addition. func TestAddMember(t *testing.T) { lg := zaptest.NewLogger(t)