Skip to content

Commit

Permalink
tunable CompactRaftLogEveryNApplies
Browse files Browse the repository at this point in the history
Signed-off-by: Clement <[email protected]>
  • Loading branch information
clement2026 committed Jul 28, 2024
1 parent 4de58cc commit 55fbbfa
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 14 deletions.
4 changes: 4 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ type ServerConfig struct {
// follower to catch up.
SnapshotCatchUpEntries uint64

// CompactRaftLogEveryNApplies improves performance by compacting raft log once every N applies.
// Minimum value is 1, which means compacting raft log every apply.
CompactRaftLogEveryNApplies uint64

MaxSnapFiles uint
MaxWALFiles uint

Expand Down
18 changes: 13 additions & 5 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ const (
// follower to catch up.
DefaultSnapshotCatchUpEntries uint64 = 5000

// DefaultCompactRaftLogEveryNApplies improves performance by compacting raft log once every N applies.
// Minimum value is 1, which means compacting raft log every apply.
DefaultCompactRaftLogEveryNApplies uint64 = 100

StoreClusterPrefix = "/0"
StoreKeysPrefix = "/1"

Expand All @@ -108,10 +112,6 @@ const (
readyPercentThreshold = 0.9

DowngradeEnabledPath = "/downgrade/enabled"

// CompactRaftLogEveryNApplies improves performance by compacting raft log once every N applies.
// Minimum value is 1, which means compacting raft log every apply.
CompactRaftLogEveryNApplies uint64 = 100
)

var (
Expand Down Expand Up @@ -572,6 +572,14 @@ func (s *EtcdServer) start() {
)
s.Cfg.SnapshotCatchUpEntries = DefaultSnapshotCatchUpEntries
}
if s.Cfg.CompactRaftLogEveryNApplies == 0 {
lg.Info(
"updating compact raft log every N applies to default",
zap.Uint64("given-compact-raft-log-every-n-applies", s.Cfg.CompactRaftLogEveryNApplies),
zap.Uint64("updated-compact-raft-log-every-n-applies", DefaultCompactRaftLogEveryNApplies),
)
s.Cfg.CompactRaftLogEveryNApplies = DefaultCompactRaftLogEveryNApplies
}

s.w = wait.New()
s.applyWait = wait.NewTimeList()
Expand Down Expand Up @@ -2169,7 +2177,7 @@ func (s *EtcdServer) compactRaftLog(appliedi uint64) {
}
compacti := appliedi - s.Cfg.SnapshotCatchUpEntries
// only compact raft log once every N applies
if compacti%CompactRaftLogEveryNApplies != 0 {
if compacti%s.Cfg.CompactRaftLogEveryNApplies != 0 {
return
}

Expand Down
6 changes: 3 additions & 3 deletions server/etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -921,7 +921,7 @@ func TestCompactRaftLog(t *testing.T) {
}

currentIndex := uint64(0)
for currentIndex < snapshotCatchUpEntries+CompactRaftLogEveryNApplies-1 {
for currentIndex < snapshotCatchUpEntries+DefaultCompactRaftLogEveryNApplies-1 {
currentIndex++
putFooBar(currentIndex)
}
Expand All @@ -930,8 +930,8 @@ func TestCompactRaftLog(t *testing.T) {
// no compaction occurred before entries size reaches snapshotCatchUpEntries+CompactRaftLogEveryNApplies
fi, li, cnt := mustGetEntriesInfo()
assert.Equal(t, uint64(1), fi)
assert.Equal(t, snapshotCatchUpEntries+CompactRaftLogEveryNApplies-1, li)
assert.Equal(t, snapshotCatchUpEntries+CompactRaftLogEveryNApplies-1, cnt)
assert.Equal(t, snapshotCatchUpEntries+DefaultCompactRaftLogEveryNApplies-1, li)
assert.Equal(t, snapshotCatchUpEntries+DefaultCompactRaftLogEveryNApplies-1, cnt)

// trigger the first compaction
currentIndex++
Expand Down
11 changes: 9 additions & 2 deletions tests/framework/integration/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,9 @@ type ClusterConfig struct {
MaxTxnOps uint
MaxRequestBytes uint

SnapshotCount uint64
SnapshotCatchUpEntries uint64
SnapshotCount uint64
SnapshotCatchUpEntries uint64
CompactRaftLogEveryNApplies uint64

GRPCKeepAliveMinTime time.Duration
GRPCKeepAliveInterval time.Duration
Expand Down Expand Up @@ -277,6 +278,7 @@ func (c *Cluster) mustNewMember(t testutil.TB) *Member {
MaxRequestBytes: c.Cfg.MaxRequestBytes,
SnapshotCount: c.Cfg.SnapshotCount,
SnapshotCatchUpEntries: c.Cfg.SnapshotCatchUpEntries,
CompactRaftLogEveryNApplies: c.Cfg.CompactRaftLogEveryNApplies,
GRPCKeepAliveMinTime: c.Cfg.GRPCKeepAliveMinTime,
GRPCKeepAliveInterval: c.Cfg.GRPCKeepAliveInterval,
GRPCKeepAliveTimeout: c.Cfg.GRPCKeepAliveTimeout,
Expand Down Expand Up @@ -603,6 +605,7 @@ type MemberConfig struct {
MaxRequestBytes uint
SnapshotCount uint64
SnapshotCatchUpEntries uint64
CompactRaftLogEveryNApplies uint64
GRPCKeepAliveMinTime time.Duration
GRPCKeepAliveInterval time.Duration
GRPCKeepAliveTimeout time.Duration
Expand Down Expand Up @@ -690,6 +693,10 @@ func MustNewMember(t testutil.TB, mcfg MemberConfig) *Member {
if mcfg.SnapshotCatchUpEntries != 0 {
m.SnapshotCatchUpEntries = mcfg.SnapshotCatchUpEntries
}
m.CompactRaftLogEveryNApplies = etcdserver.DefaultCompactRaftLogEveryNApplies
if mcfg.CompactRaftLogEveryNApplies != 0 {
m.CompactRaftLogEveryNApplies = mcfg.CompactRaftLogEveryNApplies
}

// for the purpose of integration testing, simple token is enough
m.AuthToken = "simple"
Expand Down
11 changes: 7 additions & 4 deletions tests/integration/v3_watch_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) {
integration.BeforeTest(t)

clus := integration.NewCluster(t, &integration.ClusterConfig{
Size: 3,
SnapshotCount: 10,
SnapshotCatchUpEntries: 5,
Size: 3,
SnapshotCount: 10,
SnapshotCatchUpEntries: 5,
CompactRaftLogEveryNApplies: 1,
})
defer clus.Terminate(t)

Expand Down Expand Up @@ -110,7 +111,9 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) {
//
// Since there is no way to confirm server has compacted the log, we
// use log monitor to watch and expect "compacted Raft logs" content.
expectMemberLog(t, clus.Members[initialLead], 5*time.Second, "compacted Raft logs", 2)
expectMemberLog(t, clus.Members[initialLead], 5*time.Second, "compacted Raft logs", 17)
expectMemberLog(t, clus.Members[initialLead], 5*time.Second, "\"compact-index\": 6", 1)
expectMemberLog(t, clus.Members[initialLead], 5*time.Second, "\"compact-index\": 17", 1)

// After RecoverPartition, leader L will send snapshot to slow F_m0
// follower, because F_m0(index:8) is 'out of date' compared to
Expand Down

0 comments on commit 55fbbfa

Please sign in to comment.