Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcdserver: separate raft log compact from snapshot #18459

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ type ServerConfig struct {
// follower to catch up.
SnapshotCatchUpEntries uint64

// RaftLogCompactionStep ensures raft log is compacted whenever
// applied index is a multiple of RaftLogCompactionStep.
// The minimum value is 1, meaning compacting raft log each time applied index increases.
RaftLogCompactionStep uint64

MaxSnapFiles uint
MaxWALFiles uint

Expand Down
37 changes: 30 additions & 7 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ const (
// follower to catch up.
DefaultSnapshotCatchUpEntries uint64 = 5000

// DefaultRaftLogCompactionStep ensures raft log is compacted whenever
// applied index is a multiple of DefaultRaftLogCompactionStep.
// The minimum value is 1, meaning compacting raft log each time applied index increases.
DefaultRaftLogCompactionStep uint64 = 10

StoreClusterPrefix = "/0"
StoreKeysPrefix = "/1"

Expand Down Expand Up @@ -568,6 +573,14 @@ func (s *EtcdServer) start() {
)
s.Cfg.SnapshotCatchUpEntries = DefaultSnapshotCatchUpEntries
}
if s.Cfg.RaftLogCompactionStep == 0 {
lg.Info(
"updating raft-log-compaction-step to default",
zap.Uint64("given-raft-log-compaction-step", s.Cfg.RaftLogCompactionStep),
zap.Uint64("updated-raft-log-compaction-step", DefaultRaftLogCompactionStep),
)
s.Cfg.RaftLogCompactionStep = DefaultRaftLogCompactionStep
}

s.w = wait.New()
s.applyWait = wait.NewTimeList()
Expand Down Expand Up @@ -979,6 +992,7 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *toApply) {
<-apply.notifyc

s.triggerSnapshot(ep)
s.compactRaftLog(ep.appliedi)
select {
// snapshot requested via send()
case m := <-s.r.msgSnapC:
Expand Down Expand Up @@ -2169,6 +2183,21 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
"saved snapshot",
zap.Uint64("snapshot-index", snap.Metadata.Index),
)
}

func (s *EtcdServer) compactRaftLog(appliedi uint64) {
lg := s.Logger()

// only compact raft log when applied index is a multiple of RaftLogCompactionStep
if appliedi%s.Cfg.RaftLogCompactionStep != 0 {
return
}

// keep some in memory log entries for slow followers
if appliedi <= s.Cfg.SnapshotCatchUpEntries {
return
}
compacti := appliedi - s.Cfg.SnapshotCatchUpEntries

// When sending a snapshot, etcd will pause compaction.
// After receives a snapshot, the slow follower needs to get all the entries right after
Expand All @@ -2180,13 +2209,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
return
}

// keep some in memory log entries for slow followers.
compacti := uint64(1)
if snapi > s.Cfg.SnapshotCatchUpEntries {
compacti = snapi - s.Cfg.SnapshotCatchUpEntries
}

err = s.r.raftStorage.Compact(compacti)
err := s.r.raftStorage.Compact(compacti)
if err != nil {
// the compaction was done asynchronously with the progress of raft.
// raft log might already been compact.
Expand Down
93 changes: 93 additions & 0 deletions server/etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -854,6 +854,99 @@ func TestConcurrentApplyAndSnapshotV3(t *testing.T) {
}
}

// TestCompactRaftLog ensures that:
// 1. raft log gets compacted only when raft log entries size is greater than SnapshotCatchUpEntries
// 2. after each compaction, raft log entries size equals SnapshotCatchUpEntries
//
// More test cases are covered in integration/TestCompactRaftLog_CompactionTimes
func TestCompactRaftLog(t *testing.T) {
lg := zaptest.NewLogger(t)
n := newNodeConfChangeCommitterRecorder()
n.readyc <- raft.Ready{
SoftState: &raft.SoftState{RaftState: raft.StateLeader},
}
rs := raft.NewMemoryStorage()

cl := newTestCluster(t)
cl.SetStore(v2store.New())
be, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, be)
cl.SetBackend(schema.NewMembershipBackend(lg, be))
cl.AddMember(&membership.Member{ID: 1234}, true)

r := newRaftNode(raftNodeConfig{
lg: zaptest.NewLogger(t),
Node: n,
raftStorage: rs,
storage: mockstorage.NewStorageRecorder(""),
transport: newNopTransporter(),
})

snapshotCatchUpEntries := uint64(1333)
s := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zaptest.NewLogger(t),
Cfg: config.ServerConfig{Logger: lg, SnapshotCatchUpEntries: snapshotCatchUpEntries},
r: *r,
v2store: v2store.New(),
cluster: cl,
reqIDGen: idutil.NewGenerator(0, time.Time{}),
SyncTicker: &time.Ticker{},
consistIndex: cindex.NewFakeConsistentIndex(0),
uberApply: uberApplierMock{},
kv: mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.StoreConfig{}),
}
s.start()
defer s.Stop()

putFooBar := func(index uint64) {
req := &pb.InternalRaftRequest{
Header: &pb.RequestHeader{ID: index},
Put: &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")},
}
ents := []raftpb.Entry{{Index: index, Data: pbutil.MustMarshal(req)}}
n.readyc <- raft.Ready{Entries: ents}
n.readyc <- raft.Ready{CommittedEntries: ents}
}

// get first index, last index and count of raft log entries
mustGetEntriesInfo := func() (fi, li, cnt uint64) {
fi, err := rs.FirstIndex()
if err != nil {
t.Fatalf("FirstIndex error: %v", err)
}
li, err = rs.LastIndex()
if err != nil {
t.Fatalf("LastIndex error: %v", err)
}
return fi, li, li - fi + 1
}

currentIndex := uint64(0)
for currentIndex < snapshotCatchUpEntries-1 {
currentIndex++
putFooBar(currentIndex)
}

// wait for put request to complete
time.Sleep(time.Second * 5)
// no compaction occurred before raft log entries size reaches snapshotCatchUpEntries
fi, li, cnt := mustGetEntriesInfo()
assert.Equal(t, uint64(1), fi)
assert.Equal(t, snapshotCatchUpEntries-1, li)
assert.Equal(t, snapshotCatchUpEntries-1, cnt)

// trigger the first compaction
currentIndex++
putFooBar(currentIndex)
// wait for put request to complete
time.Sleep(time.Second * 5)
_, li, cnt = mustGetEntriesInfo()
assert.Equal(t, currentIndex, li)
// after each compaction, raft log entries size should equal snapshotCatchUpEntries
assert.Equal(t, snapshotCatchUpEntries, cnt)
}

// TestAddMember tests AddMember can propose and perform node addition.
func TestAddMember(t *testing.T) {
lg := zaptest.NewLogger(t)
Expand Down
9 changes: 5 additions & 4 deletions tests/e2e/etcd_mix_versions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/stretchr/testify/require"

"go.etcd.io/etcd/client/pkg/v3/fileutil"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/tests/v3/framework/config"
"go.etcd.io/etcd/tests/v3/framework/e2e"
)
Expand Down Expand Up @@ -132,8 +133,8 @@ func mixVersionsSnapshotTestByMockPartition(t *testing.T, cfg *e2e.EtcdProcessCl

clusterOptions := []e2e.EPClusterOption{
e2e.WithConfig(cfg),
e2e.WithSnapshotCount(10),
e2e.WithSnapshotCatchUpEntries(10),
e2e.WithSnapshotCount(etcdserver.DefaultRaftLogCompactionStep),
e2e.WithSnapshotCatchUpEntries(etcdserver.DefaultRaftLogCompactionStep),
}
t.Logf("Create an etcd cluster with %d member", cfg.ClusterSize)
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, clusterOptions...)
Expand All @@ -148,8 +149,8 @@ func mixVersionsSnapshotTestByMockPartition(t *testing.T, cfg *e2e.EtcdProcessCl
err = toPartitionedMember.Stop()
require.NoError(t, err)

t.Log("Writing 20 keys to the cluster (more than SnapshotCount entries to trigger at least a snapshot)")
writeKVs(t, epc.Etcdctl(), 0, 20)
t.Log("Writing 2*DefaultRaftLogCompactionStep keys to the cluster (more than SnapshotCount entries to trigger at least a snapshot)")
writeKVs(t, epc.Etcdctl(), 0, int(etcdserver.DefaultRaftLogCompactionStep*2))

t.Log("Verify logs to check leader has saved snapshot")
leaderEPC := epc.Procs[epc.WaitLeader(t)]
Expand Down
5 changes: 3 additions & 2 deletions tests/e2e/leader_snapshot_no_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/pkg/v3/expect"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/tests/v3/framework/config"
"go.etcd.io/etcd/tests/v3/framework/e2e"
"go.etcd.io/etcd/tests/v3/robustness/failpoint"
Expand All @@ -42,8 +43,8 @@ func TestRecoverSnapshotBackend(t *testing.T) {
e2e.WithClusterSize(3),
e2e.WithKeepDataDir(true),
e2e.WithPeerProxy(true),
e2e.WithSnapshotCatchUpEntries(50),
e2e.WithSnapshotCount(50),
e2e.WithSnapshotCatchUpEntries(etcdserver.DefaultRaftLogCompactionStep),
e2e.WithSnapshotCount(etcdserver.DefaultRaftLogCompactionStep),
e2e.WithGoFailEnabled(true),
e2e.WithIsPeerTLS(true),
)
Expand Down
7 changes: 7 additions & 0 deletions tests/framework/integration/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ type ClusterConfig struct {

SnapshotCount uint64
SnapshotCatchUpEntries uint64
RaftLogCompactionStep uint64

GRPCKeepAliveMinTime time.Duration
GRPCKeepAliveInterval time.Duration
Expand Down Expand Up @@ -276,6 +277,7 @@ func (c *Cluster) mustNewMember(t testutil.TB) *Member {
MaxRequestBytes: c.Cfg.MaxRequestBytes,
SnapshotCount: c.Cfg.SnapshotCount,
SnapshotCatchUpEntries: c.Cfg.SnapshotCatchUpEntries,
RaftLogCompactionStep: c.Cfg.RaftLogCompactionStep,
GRPCKeepAliveMinTime: c.Cfg.GRPCKeepAliveMinTime,
GRPCKeepAliveInterval: c.Cfg.GRPCKeepAliveInterval,
GRPCKeepAliveTimeout: c.Cfg.GRPCKeepAliveTimeout,
Expand Down Expand Up @@ -601,6 +603,7 @@ type MemberConfig struct {
MaxRequestBytes uint
SnapshotCount uint64
SnapshotCatchUpEntries uint64
RaftLogCompactionStep uint64
GRPCKeepAliveMinTime time.Duration
GRPCKeepAliveInterval time.Duration
GRPCKeepAliveTimeout time.Duration
Expand Down Expand Up @@ -686,6 +689,10 @@ func MustNewMember(t testutil.TB, mcfg MemberConfig) *Member {
if mcfg.SnapshotCatchUpEntries != 0 {
m.SnapshotCatchUpEntries = mcfg.SnapshotCatchUpEntries
}
m.RaftLogCompactionStep = etcdserver.DefaultRaftLogCompactionStep
if mcfg.RaftLogCompactionStep != 0 {
m.RaftLogCompactionStep = mcfg.RaftLogCompactionStep
}

// for the purpose of integration testing, simple token is enough
m.AuthToken = "simple"
Expand Down
Loading
Loading