Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcdserver: separate "raft log compact" from snapshot #18372

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ type ServerConfig struct {
// follower to catch up.
SnapshotCatchUpEntries uint64

// CompactRaftLogEveryNApplies improves performance by compacting raft log once every N applies.
// Minimum value is 1, which means compacting raft log every apply.
CompactRaftLogEveryNApplies uint64

MaxSnapFiles uint
MaxWALFiles uint

Expand Down
35 changes: 28 additions & 7 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ const (
// follower to catch up.
DefaultSnapshotCatchUpEntries uint64 = 5000

// DefaultCompactRaftLogEveryNApplies improves performance by compacting raft log once every N applies.
// Minimum value is 1, which means compacting raft log every apply.
DefaultCompactRaftLogEveryNApplies uint64 = 100

StoreClusterPrefix = "/0"
StoreKeysPrefix = "/1"

Expand Down Expand Up @@ -568,6 +572,14 @@ func (s *EtcdServer) start() {
)
s.Cfg.SnapshotCatchUpEntries = DefaultSnapshotCatchUpEntries
}
if s.Cfg.CompactRaftLogEveryNApplies == 0 {
lg.Info(
"updating compact raft log every N applies to default",
zap.Uint64("given-compact-raft-log-every-n-applies", s.Cfg.CompactRaftLogEveryNApplies),
zap.Uint64("updated-compact-raft-log-every-n-applies", DefaultCompactRaftLogEveryNApplies),
)
s.Cfg.CompactRaftLogEveryNApplies = DefaultCompactRaftLogEveryNApplies
}

s.w = wait.New()
s.applyWait = wait.NewTimeList()
Expand Down Expand Up @@ -963,6 +975,7 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *toApply) {
<-apply.notifyc

s.triggerSnapshot(ep)
s.compactRaftLog(ep.appliedi)
select {
// snapshot requested via send()
case m := <-s.r.msgSnapC:
Expand Down Expand Up @@ -2153,6 +2166,20 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
"saved snapshot",
zap.Uint64("snapshot-index", snap.Metadata.Index),
)
}

func (s *EtcdServer) compactRaftLog(appliedi uint64) {
lg := s.Logger()

// keep some in memory log entries for slow followers
if appliedi <= s.Cfg.SnapshotCatchUpEntries {
return
}
compacti := appliedi - s.Cfg.SnapshotCatchUpEntries
// only compact raft log once every N applies
if compacti%s.Cfg.CompactRaftLogEveryNApplies != 0 {
return
}

// When sending a snapshot, etcd will pause compaction.
// After receives a snapshot, the slow follower needs to get all the entries right after
Expand All @@ -2164,13 +2191,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
return
}

// keep some in memory log entries for slow followers.
compacti := uint64(1)
if snapi > s.Cfg.SnapshotCatchUpEntries {
compacti = snapi - s.Cfg.SnapshotCatchUpEntries
}

err = s.r.raftStorage.Compact(compacti)
err := s.r.raftStorage.Compact(compacti)
if err != nil {
// the compaction was done asynchronously with the progress of raft.
// raft log might already been compact.
Expand Down
92 changes: 92 additions & 0 deletions server/etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"
"fmt"
"math"
"math/rand/v2"
"net/http"
"os"
"path/filepath"
Expand Down Expand Up @@ -853,6 +854,97 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
}
}

// TestCompactRaftLog ensures that:
// 1. raft log gets compacted only when entries size is greater than SnapshotCatchUpEntries
// 2. raft log gets compacted once every CompactRaftLogEveryNApplies applies
// 3. after each compaction, entries size equals SnapshotCatchUpEntries
func TestCompactRaftLog(t *testing.T) {
lg := zaptest.NewLogger(t)
n := newNodeConfChangeCommitterRecorder()
n.readyc <- raft.Ready{
SoftState: &raft.SoftState{RaftState: raft.StateLeader},
}
rs := raft.NewMemoryStorage()

cl := newTestCluster(t)
cl.SetStore(v2store.New())
be, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, be)
cl.SetBackend(schema.NewMembershipBackend(lg, be))
cl.AddMember(&membership.Member{ID: 1234}, true)

r := newRaftNode(raftNodeConfig{
lg: zaptest.NewLogger(t),
Node: n,
raftStorage: rs,
storage: mockstorage.NewStorageRecorder(""),
transport: newNopTransporter(),
})

snapshotCatchUpEntries := rand.Uint64N(10000) + 1
s := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zaptest.NewLogger(t),
Cfg: config.ServerConfig{Logger: lg, SnapshotCatchUpEntries: snapshotCatchUpEntries},
r: *r,
v2store: v2store.New(),
cluster: cl,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
SyncTicker: &time.Ticker{},
consistIndex: cindex.NewFakeConsistentIndex(0),
uberApply: uberApplierMock{},
kv: mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.StoreConfig{}),
}
s.start()

putFooBar := func(index uint64) {
req := &pb.InternalRaftRequest{
Header: &pb.RequestHeader{ID: index},
Put: &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")},
}
ents := []raftpb.Entry{{Index: index, Data: pbutil.MustMarshal(req)}}
n.readyc <- raft.Ready{Entries: ents}
n.readyc <- raft.Ready{CommittedEntries: ents}
}

// get first index, last index and count of entries
mustGetEntriesInfo := func() (fi, li, cnt uint64) {
fi, err := rs.FirstIndex()
if err != nil {
t.Fatalf("FirstIndex error: %v", err)
}
li, err = rs.LastIndex()
if err != nil {
t.Fatalf("LastIndex error: %v", err)
}
return fi, li, li - fi + 1
}

currentIndex := uint64(0)
for currentIndex < snapshotCatchUpEntries+DefaultCompactRaftLogEveryNApplies-1 {
currentIndex++
putFooBar(currentIndex)
}

time.Sleep(time.Second * 3)
// no compaction occurred before entries size reaches snapshotCatchUpEntries+CompactRaftLogEveryNApplies
fi, li, cnt := mustGetEntriesInfo()
assert.Equal(t, uint64(1), fi)
assert.Equal(t, snapshotCatchUpEntries+DefaultCompactRaftLogEveryNApplies-1, li)
assert.Equal(t, snapshotCatchUpEntries+DefaultCompactRaftLogEveryNApplies-1, cnt)

// trigger the first compaction
currentIndex++
putFooBar(currentIndex)
time.Sleep(time.Second * 3)
_, li, cnt = mustGetEntriesInfo()
assert.Equal(t, currentIndex, li)
// after each compaction, entries size should equal snapshotCatchUpEntries
assert.Equal(t, snapshotCatchUpEntries, cnt)

s.Stop()
}

// TestAddMember tests AddMember can propose and perform node addition.
func TestAddMember(t *testing.T) {
lg := zaptest.NewLogger(t)
Expand Down
11 changes: 9 additions & 2 deletions tests/framework/integration/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,9 @@ type ClusterConfig struct {
MaxTxnOps uint
MaxRequestBytes uint

SnapshotCount uint64
SnapshotCatchUpEntries uint64
SnapshotCount uint64
SnapshotCatchUpEntries uint64
CompactRaftLogEveryNApplies uint64

GRPCKeepAliveMinTime time.Duration
GRPCKeepAliveInterval time.Duration
Expand Down Expand Up @@ -277,6 +278,7 @@ func (c *Cluster) mustNewMember(t testutil.TB) *Member {
MaxRequestBytes: c.Cfg.MaxRequestBytes,
SnapshotCount: c.Cfg.SnapshotCount,
SnapshotCatchUpEntries: c.Cfg.SnapshotCatchUpEntries,
CompactRaftLogEveryNApplies: c.Cfg.CompactRaftLogEveryNApplies,
GRPCKeepAliveMinTime: c.Cfg.GRPCKeepAliveMinTime,
GRPCKeepAliveInterval: c.Cfg.GRPCKeepAliveInterval,
GRPCKeepAliveTimeout: c.Cfg.GRPCKeepAliveTimeout,
Expand Down Expand Up @@ -603,6 +605,7 @@ type MemberConfig struct {
MaxRequestBytes uint
SnapshotCount uint64
SnapshotCatchUpEntries uint64
CompactRaftLogEveryNApplies uint64
GRPCKeepAliveMinTime time.Duration
GRPCKeepAliveInterval time.Duration
GRPCKeepAliveTimeout time.Duration
Expand Down Expand Up @@ -690,6 +693,10 @@ func MustNewMember(t testutil.TB, mcfg MemberConfig) *Member {
if mcfg.SnapshotCatchUpEntries != 0 {
m.SnapshotCatchUpEntries = mcfg.SnapshotCatchUpEntries
}
m.CompactRaftLogEveryNApplies = etcdserver.DefaultCompactRaftLogEveryNApplies
if mcfg.CompactRaftLogEveryNApplies != 0 {
m.CompactRaftLogEveryNApplies = mcfg.CompactRaftLogEveryNApplies
}

// for the purpose of integration testing, simple token is enough
m.AuthToken = "simple"
Expand Down
11 changes: 7 additions & 4 deletions tests/integration/v3_watch_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) {
integration.BeforeTest(t)

clus := integration.NewCluster(t, &integration.ClusterConfig{
Size: 3,
SnapshotCount: 10,
SnapshotCatchUpEntries: 5,
Size: 3,
SnapshotCount: 10,
SnapshotCatchUpEntries: 5,
CompactRaftLogEveryNApplies: 1,
})
defer clus.Terminate(t)

Expand Down Expand Up @@ -110,7 +111,9 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) {
//
// Since there is no way to confirm server has compacted the log, we
// use log monitor to watch and expect "compacted Raft logs" content.
expectMemberLog(t, clus.Members[initialLead], 5*time.Second, "compacted Raft logs", 2)
expectMemberLog(t, clus.Members[initialLead], 5*time.Second, "compacted Raft logs", 17)
expectMemberLog(t, clus.Members[initialLead], 5*time.Second, "\"compact-index\": 6", 1)
expectMemberLog(t, clus.Members[initialLead], 5*time.Second, "\"compact-index\": 17", 1)

// After RecoverPartition, leader L will send snapshot to slow F_m0
// follower, because F_m0(index:8) is 'out of date' compared to
Expand Down
Loading