Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcdserver: compact raft log entries more frequently #18622

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ type ServerConfig struct {
// follower to catch up.
SnapshotCatchUpEntries uint64

// CompactRaftLogEveryNApplies compact raft log once every N applies.
// Minimum value is 1, which means compacting raft log every apply.
CompactRaftLogEveryNApplies uint64

MaxSnapFiles uint
MaxWALFiles uint

Expand Down
33 changes: 26 additions & 7 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ const (
// follower to catch up.
DefaultSnapshotCatchUpEntries uint64 = 5000

// DefaultCompactRaftLogEveryNApplies compact raft log once every N applies.
// Minimum value is 1, which means compacting raft log every apply.
DefaultCompactRaftLogEveryNApplies uint64 = 10

StoreClusterPrefix = "/0"
StoreKeysPrefix = "/1"

Expand Down Expand Up @@ -569,6 +573,14 @@ func (s *EtcdServer) start() {
)
s.Cfg.SnapshotCatchUpEntries = DefaultSnapshotCatchUpEntries
}
if s.Cfg.CompactRaftLogEveryNApplies == 0 {
lg.Info(
"updating compact raft log every N applies to default",
zap.Uint64("given-compact-raft-log-every-n-applies", s.Cfg.CompactRaftLogEveryNApplies),
zap.Uint64("updated-compact-raft-log-every-n-applies", DefaultCompactRaftLogEveryNApplies),
)
s.Cfg.CompactRaftLogEveryNApplies = DefaultCompactRaftLogEveryNApplies
}

s.w = wait.New()
s.applyWait = wait.NewTimeList()
Expand Down Expand Up @@ -980,6 +992,7 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *toApply) {
<-apply.notifyc

s.triggerSnapshot(ep)
s.maybeCompactRaftLog(ep)
select {
// snapshot requested via send()
case m := <-s.r.msgSnapC:
Expand Down Expand Up @@ -2170,6 +2183,18 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
"saved snapshot",
zap.Uint64("snapshot-index", snap.Metadata.Index),
)
}

func (s *EtcdServer) maybeCompactRaftLog(ep *etcdProgress) {
lg := s.Logger()

// Keep some in memory log entries for slow followers, while keeping the entries up to snapshot index.
// Only compact raft log once every N applies
if ep.appliedi <= ep.snapi+s.Cfg.SnapshotCatchUpEntries || ep.appliedi%s.Cfg.CompactRaftLogEveryNApplies != 0 {
return
}

compacti := ep.appliedi - s.Cfg.SnapshotCatchUpEntries
Comment on lines +2191 to +2197
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Keep some in memory log entries for slow followers, while keeping the entries up to snapshot index.
// Only compact raft log once every N applies
if ep.appliedi <= ep.snapi+s.Cfg.SnapshotCatchUpEntries || ep.appliedi%s.Cfg.CompactRaftLogEveryNApplies != 0 {
return
}
compacti := ep.appliedi - s.Cfg.SnapshotCatchUpEntries
// Retain all log entries up to the latest snapshot index to ensure any member can recover from that snapshot.
// Beyond the snapshot index, preserve the most recent s.Cfg.SnapshotCatchUpEntries entries in memory.
// This allows slow followers to catch up by synchronizing entries instead of requiring a full snapshot transfer.
// Only compact raft log once every N applies
if ep.snapi > s.Cfg.SnapshotCatchUpEntries {
return
}
compacti := ep.snapi - s.Cfg.SnapshotCatchUpEntries

In the first PR, let's keep the same number of entries, just make the compaction independent from snapshot.


// When sending a snapshot, etcd will pause compaction.
// After receives a snapshot, the slow follower needs to get all the entries right after
Expand All @@ -2181,13 +2206,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
return
}

// keep some in memory log entries for slow followers.
compacti := uint64(1)
if snapi > s.Cfg.SnapshotCatchUpEntries {
compacti = snapi - s.Cfg.SnapshotCatchUpEntries
}

err = s.r.raftStorage.Compact(compacti)
err := s.r.raftStorage.Compact(compacti)
if err != nil {
// the compaction was done asynchronously with the progress of raft.
// raft log might already been compact.
Expand Down
11 changes: 9 additions & 2 deletions tests/framework/integration/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,9 @@ type ClusterConfig struct {
MaxTxnOps uint
MaxRequestBytes uint

SnapshotCount uint64
SnapshotCatchUpEntries uint64
SnapshotCount uint64
SnapshotCatchUpEntries uint64
CompactRaftLogEveryNApplies uint64

GRPCKeepAliveMinTime time.Duration
GRPCKeepAliveInterval time.Duration
Expand Down Expand Up @@ -276,6 +277,7 @@ func (c *Cluster) mustNewMember(t testutil.TB) *Member {
MaxRequestBytes: c.Cfg.MaxRequestBytes,
SnapshotCount: c.Cfg.SnapshotCount,
SnapshotCatchUpEntries: c.Cfg.SnapshotCatchUpEntries,
CompactRaftLogEveryNApplies: c.Cfg.CompactRaftLogEveryNApplies,
GRPCKeepAliveMinTime: c.Cfg.GRPCKeepAliveMinTime,
GRPCKeepAliveInterval: c.Cfg.GRPCKeepAliveInterval,
GRPCKeepAliveTimeout: c.Cfg.GRPCKeepAliveTimeout,
Expand Down Expand Up @@ -601,6 +603,7 @@ type MemberConfig struct {
MaxRequestBytes uint
SnapshotCount uint64
SnapshotCatchUpEntries uint64
CompactRaftLogEveryNApplies uint64
GRPCKeepAliveMinTime time.Duration
GRPCKeepAliveInterval time.Duration
GRPCKeepAliveTimeout time.Duration
Expand Down Expand Up @@ -686,6 +689,10 @@ func MustNewMember(t testutil.TB, mcfg MemberConfig) *Member {
if mcfg.SnapshotCatchUpEntries != 0 {
m.SnapshotCatchUpEntries = mcfg.SnapshotCatchUpEntries
}
m.CompactRaftLogEveryNApplies = etcdserver.DefaultCompactRaftLogEveryNApplies
if mcfg.CompactRaftLogEveryNApplies != 0 {
m.CompactRaftLogEveryNApplies = mcfg.CompactRaftLogEveryNApplies
}

// for the purpose of integration testing, simple token is enough
m.AuthToken = "simple"
Expand Down
14 changes: 8 additions & 6 deletions tests/integration/v3_watch_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) {
integration.BeforeTest(t)

clus := integration.NewCluster(t, &integration.ClusterConfig{
Size: 3,
SnapshotCount: 10,
SnapshotCatchUpEntries: 5,
Size: 3,
SnapshotCount: 10,
SnapshotCatchUpEntries: 5,
CompactRaftLogEveryNApplies: 10,
})
defer clus.Terminate(t)

Expand Down Expand Up @@ -102,11 +103,12 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) {
// elected. Leader will apply 3 MemberAttrSet and 1 ClusterVersionSet
// changes. So member 0 has index 8 in raft log before network
// partition. We need to trigger EtcdServer.snapshot() at least twice.
// Raft log is only compacted when appliedi%CompactRaftLogEveryNApplies==0
//
// SnapshotCount: 10, SnapshotCatchUpEntries: 5
// SnapshotCount: 10, SnapshotCatchUpEntries: 5, CompactRaftLogEveryNApplies: 10
//
// T1: L(snapshot-index: 11, compacted-index: 6), F_m0(index:8)
// T2: L(snapshot-index: 22, compacted-index: 17), F_m0(index:8, out of date)
// T1: L(snapshot-index: 11, compacted-index: 5), F_m0(index:8)
// T2: L(snapshot-index: 22, compacted-index: 15), F_m0(index:8, out of date)
//
// Since there is no way to confirm server has compacted the log, we
// use log monitor to watch and expect "compacted Raft logs" content.
Expand Down
Loading