Skip to content

Commit

Permalink
etcdserver: separate raft log compact from snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
clement2026 committed Jul 26, 2024
1 parent 9a6c9ae commit b758e6b
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 7 deletions.
27 changes: 20 additions & 7 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ const (
readyPercentThreshold = 0.9

DowngradeEnabledPath = "/downgrade/enabled"

// CompactRaftLogEveryNApplies improves performance by compacting raft log once every N applies.
// Minimum value is 1, which means compacting raft log every apply.
CompactRaftLogEveryNApplies uint64 = 100
)

var (
Expand Down Expand Up @@ -963,6 +967,7 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *toApply) {
<-apply.notifyc

s.triggerSnapshot(ep)
s.compactRaftLog(ep.appliedi)
select {
// snapshot requested via send()
case m := <-s.r.msgSnapC:
Expand Down Expand Up @@ -2153,6 +2158,20 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
"saved snapshot",
zap.Uint64("snapshot-index", snap.Metadata.Index),
)
}

func (s *EtcdServer) compactRaftLog(appliedi uint64) {
lg := s.Logger()

// keep some in memory log entries for slow followers
if appliedi <= s.Cfg.SnapshotCatchUpEntries {
return
}
compacti := appliedi - s.Cfg.SnapshotCatchUpEntries
// only compact raft log once every N applies
if compacti%CompactRaftLogEveryNApplies != 0 {
return
}

// When sending a snapshot, etcd will pause compaction.
// After receives a snapshot, the slow follower needs to get all the entries right after
Expand All @@ -2164,13 +2183,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
return
}

// keep some in memory log entries for slow followers.
compacti := uint64(1)
if snapi > s.Cfg.SnapshotCatchUpEntries {
compacti = snapi - s.Cfg.SnapshotCatchUpEntries
}

err = s.r.raftStorage.Compact(compacti)
err := s.r.raftStorage.Compact(compacti)
if err != nil {
// the compaction was done asynchronously with the progress of raft.
// raft log might already been compact.
Expand Down
92 changes: 92 additions & 0 deletions server/etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"
"fmt"
"math"
"math/rand/v2"
"net/http"
"os"
"path/filepath"
Expand Down Expand Up @@ -853,6 +854,97 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
}
}

// TestCompact ensures that:
// 1. raft log gets compacted only when entries size is greater than SnapshotCatchUpEntries
// 2. raft log gets compacted only once every CompactRaftLogEveryNApplies applies
// 3. after a compaction, entries size equals SnapshotCatchUpEntries
func TestCompact(t *testing.T) {
lg := zaptest.NewLogger(t)
n := newNodeConfChangeCommitterRecorder()
n.readyc <- raft.Ready{
SoftState: &raft.SoftState{RaftState: raft.StateLeader},
}
rs := raft.NewMemoryStorage()

cl := newTestCluster(t)
cl.SetStore(v2store.New())
be, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, be)
cl.SetBackend(schema.NewMembershipBackend(lg, be))
cl.AddMember(&membership.Member{ID: 1234}, true)

r := newRaftNode(raftNodeConfig{
lg: zaptest.NewLogger(t),
Node: n,
raftStorage: rs,
storage: mockstorage.NewStorageRecorder(""),
transport: newNopTransporter(),
})

snapshotCatchUpEntries := rand.Uint64N(10000) + 1
s := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zaptest.NewLogger(t),
Cfg: config.ServerConfig{Logger: lg, SnapshotCatchUpEntries: snapshotCatchUpEntries},
r: *r,
v2store: v2store.New(),
cluster: cl,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
SyncTicker: &time.Ticker{},
consistIndex: cindex.NewFakeConsistentIndex(0),
uberApply: uberApplierMock{},
kv: mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.StoreConfig{}),
}
s.start()

putFooBar := func(index uint64) {
req := &pb.InternalRaftRequest{
Header: &pb.RequestHeader{ID: index},
Put: &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")},
}
ents := []raftpb.Entry{{Index: index, Data: pbutil.MustMarshal(req)}}
n.readyc <- raft.Ready{Entries: ents}
n.readyc <- raft.Ready{CommittedEntries: ents}
}

// get first index, last index and count of entries
mustGetEntriesInfo := func() (fi, li, cnt uint64) {
fi, err := rs.FirstIndex()
if err != nil {
t.Fatalf("FirstIndex error: %v", err)
}
li, err = rs.LastIndex()
if err != nil {
t.Fatalf("LastIndex error: %v", err)
}
return fi, li, li - fi + 1
}

nextIndex := uint64(1)
for nextIndex < snapshotCatchUpEntries+CompactRaftLogEveryNApplies {
putFooBar(nextIndex)
nextIndex++
}
time.Sleep(time.Second)
// compact should not happen before entries size reaches snapshotCatchUpEntries+CompactRaftLogEveryNApplies
fi, li, cnt := mustGetEntriesInfo()
assert.Equal(t, uint64(1), fi)
assert.Equal(t, snapshotCatchUpEntries+CompactRaftLogEveryNApplies-1, li)
assert.Equal(t, snapshotCatchUpEntries+CompactRaftLogEveryNApplies-1, cnt)

// trigger the first compaction
putFooBar(nextIndex)
nextIndex++
time.Sleep(time.Second)
fi, li, cnt = mustGetEntriesInfo()
assert.Equal(t, nextIndex-1, li)
assert.Equal(t, li-snapshotCatchUpEntries+1, fi)
// after each compaction, entries size should equal snapshotCatchUpEntries
assert.Equal(t, snapshotCatchUpEntries, cnt)

s.Stop()
}

// TestAddMember tests AddMember can propose and perform node addition.
func TestAddMember(t *testing.T) {
lg := zaptest.NewLogger(t)
Expand Down

0 comments on commit b758e6b

Please sign in to comment.