Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcdserver: separate maybeCompactRaftLog function to compact raft log independently #18635

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
67 changes: 67 additions & 0 deletions server/etcdserver/memory_storage_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright 2024 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package etcdserver

import (
"testing"

"github.com/stretchr/testify/assert"

"go.etcd.io/raft/v3"
"go.etcd.io/raft/v3/raftpb"
)

// TestMemoryStorageCompaction tests that after calling raftStorage.Compact(compacti)
// without errors, the dummy entry becomes {Index: compacti} and
// raftStorage.FirstIndex() returns (compacti+1, nil).
func TestMemoryStorageCompaction(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the test, it's really great to protect our assumption via test.

clement2026 marked this conversation as resolved.
Show resolved Hide resolved
// entries: [ {Index: 0} ]
raftStorage := raft.NewMemoryStorage()

firstIndex, err := raftStorage.FirstIndex()
assert.NoError(t, err)
assert.Equal(t, uint64(1), firstIndex)

// after appending, entries should be:
// [ {Index: 0}, {Index: 1}, {Index: 2}, {Index: 3}, {Index: 4}, {Index: 5} ]
appliedIndex := uint64(1)
for ; appliedIndex <= 5; appliedIndex++ {
e := raftpb.Entry{
Type: raftpb.EntryNormal,
Term: 1,
Index: appliedIndex,
}
err := raftStorage.Append([]raftpb.Entry{e})
assert.NoError(t, err)
}

firstIndex, err = raftStorage.FirstIndex()
assert.NoError(t, err)
assert.Equal(t, uint64(1), firstIndex)

lastIndex, err := raftStorage.LastIndex()
assert.NoError(t, err)
assert.Equal(t, uint64(5), lastIndex)

// after compacting, entries should be:
// [ {Index: 3}, {Index: 4}, {Index: 5} ]
err = raftStorage.Compact(3)
assert.NoError(t, err)


firstIndex, err = raftStorage.FirstIndex()
assert.NoError(t, err)
assert.Equal(t, uint64(3+1), firstIndex)
clement2026 marked this conversation as resolved.
Show resolved Hide resolved
}
38 changes: 31 additions & 7 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,7 @@ type etcdProgress struct {
snapi uint64
appliedt uint64
appliedi uint64
compacti uint64
}

// raftReadyHandler contains a set of EtcdServer operations to be called by raftNode,
Expand All @@ -764,6 +765,10 @@ func (s *EtcdServer) run() {
if err != nil {
lg.Panic("failed to get snapshot from Raft storage", zap.Error(err))
}
firstIndex, err := s.r.raftStorage.FirstIndex()
if err != nil {
lg.Panic("failed to get first index from Raft storage", zap.Error(err))
}

// asynchronously accept toApply packets, dispatch progress in-order
sched := schedule.NewFIFOScheduler(lg)
Expand Down Expand Up @@ -813,6 +818,13 @@ func (s *EtcdServer) run() {
snapi: sn.Metadata.Index,
appliedt: sn.Metadata.Term,
appliedi: sn.Metadata.Index,
// compacti is the index from the last time raftStorage.Compact was called
// without errors.
//
// After calling raftStorage.Compact(compacti) without errors, the dummy entry of
// raftStorage becomes {Index: compacti}, and raftStorage.FirstIndex() returns
// (compacti+1, nil). This is validated by TestMemoryStorageCompaction.
clement2026 marked this conversation as resolved.
Show resolved Hide resolved
compacti: firstIndex - 1,
}

defer func() {
Expand Down Expand Up @@ -980,6 +992,7 @@ func (s *EtcdServer) applyAll(ep *etcdProgress, apply *toApply) {
<-apply.notifyc

s.triggerSnapshot(ep)
s.maybeCompactRaftLog(ep)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you really want to get this small merged firstly, then please ensure it's as independent as possible. Currently the s.snapshot performs both snapshot and compaction operations. It makes sense to extract the compaction operation as an independent function/method, but let's call the method inside s.triggerSnapshot,

func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) {
	if !s.shouldSnapshot(ep) {
		return
	}
	lg := s.Logger()
	lg.Info(
		"triggering snapshot",
		zap.String("local-member-id", s.MemberID().String()),
		zap.Uint64("local-member-applied-index", ep.appliedi),
		zap.Uint64("local-member-snapshot-index", ep.snapi),
		zap.Uint64("local-member-snapshot-count", s.Cfg.SnapshotCount),
		zap.Bool("snapshot-forced", s.forceSnapshot),
	)
	s.forceSnapshot = false

	s.snapshot(ep.appliedi, ep.confState)
	ep.snapi = ep.appliedi
	s.compact(xxxxx)   // call the new method here, so we still do it each time after creating a snapshot.

}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No rush on merging this PR. If we do merge it, we need to ensure etcd actually benefits from it. Let's resolve #17098 (comment) first.

Copy link
Member

@serathius serathius Sep 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal of the PR is to make compaction independent from snapshot. Not just refactoring it to function.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not just refactor the function.

Just refactoring the function (extract the compact into a separate method) is an independent and safe change, accordingly can be merged soon.

The goal of the PR is to make compaction independent from snapshot

It modifies the logic/semantics, so it's no longer an independent change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Refactoring it to function" is a subset of "refactoring".

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clearer about #18635 (comment), I am proposing an independent & safe minor refactoring below as the very first step

ahrtr@efae0d2

select {
// snapshot requested via send()
case m := <-s.r.msgSnapC:
Expand Down Expand Up @@ -2170,6 +2183,22 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
"saved snapshot",
zap.Uint64("snapshot-index", snap.Metadata.Index),
)
}

func (s *EtcdServer) maybeCompactRaftLog(ep *etcdProgress) {
// Retain all log entries up to the latest snapshot index to ensure any member can recover from that snapshot.
// Beyond the snapshot index, preserve the most recent s.Cfg.SnapshotCatchUpEntries entries in memory.
// This allows slow followers to catch up by synchronizing entries instead of requiring a full snapshot transfer.
if ep.snapi <= s.Cfg.SnapshotCatchUpEntries {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if ep.snapi <= s.Cfg.SnapshotCatchUpEntries {
if ep.appliedi <= s.Cfg.SnapshotCatchUpEntries {

Copy link
Member

@serathius serathius Sep 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

snapi is more correct here, the reason is described in the comment.

return
}

compacti := ep.snapi - s.Cfg.SnapshotCatchUpEntries
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
compacti := ep.snapi - s.Cfg.SnapshotCatchUpEntries
compacti := ep.appliedi - s.Cfg.SnapshotCatchUpEntries

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of the change?
If you use ep.snapi, then the behaviour is exactly the same as existing behaviour, because ep.snapi only gets updated each time after creating the (v2) snapshot.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ahrtr #18622 made a lot of changes, but Serathius and I agreed to keep PRs small. So, in this PR, we just separated the compaction from the snapshot while keeping the existing behavior.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see #17098 (comment). Can we get that confirmed firstly?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change doesn't change when compaction is run or how many times it's executed. I was aware of https://github.com/etcd-io/raft/blob/5d6eb55c4e6929e461997c9113aba99a5148e921/storage.go#L266-L269 code, that's why I was proposing compacting only ever X applies.

if compacti <= ep.compacti {
return
}

lg := s.Logger()

// When sending a snapshot, etcd will pause compaction.
// After receives a snapshot, the slow follower needs to get all the entries right after
Expand All @@ -2181,13 +2210,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
return
}

// keep some in memory log entries for slow followers.
compacti := uint64(1)
if snapi > s.Cfg.SnapshotCatchUpEntries {
compacti = snapi - s.Cfg.SnapshotCatchUpEntries
}

err = s.r.raftStorage.Compact(compacti)
err := s.r.raftStorage.Compact(compacti)
serathius marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
// the compaction was done asynchronously with the progress of raft.
// raft log might already been compact.
Expand All @@ -2196,6 +2219,7 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
}
lg.Panic("failed to compact", zap.Error(err))
}
ep.compacti = compacti
lg.Info(
"compacted Raft logs",
zap.Uint64("compact-index", compacti),
Expand Down
95 changes: 95 additions & 0 deletions tests/integration/raft_log_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright 2024 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package integration

import (
"context"
"errors"
"testing"
"time"

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/tests/v3/framework/integration"
)

// TestRaftLogCompaction tests whether raft log snapshot and compaction work correctly.
func TestRaftLogCompaction(t *testing.T) {
integration.BeforeTest(t)

clus := integration.NewCluster(t, &integration.ClusterConfig{
Size: 1,
SnapshotCount: 10,
SnapshotCatchUpEntries: 5,
})
defer clus.Terminate(t)

mem := clus.Members[0]
kvc := integration.ToGRPC(mem.Client).KV

// When starting a new cluster with 1 member, the member will have an index of 4.
serathius marked this conversation as resolved.
Show resolved Hide resolved
// TODO: Can someone explain this?
// Currently, if `ep.appliedi-ep.snapi > s.Cfg.SnapshotCount`,
// a raft log snapshot is created, and raft log entries are compacted.
// In this case, it triggers when the index is a multiple of 11.
appliedi := 4
serathius marked this conversation as resolved.
Show resolved Hide resolved
for ; appliedi <= 10; appliedi++ {
_, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")})
if err != nil {
t.Errorf("#%d: couldn't put key (%v)", appliedi, err)
}
}
// The first snapshot and compaction shouldn't happen because the index is less than 11
expectMemberLogTimeout(t, mem, 5*time.Second, "saved snapshot", 1)
serathius marked this conversation as resolved.
Show resolved Hide resolved
expectMemberLogTimeout(t, mem, time.Second, "compacted Raft logs", 1)

for ; appliedi <= 11; appliedi++ {
_, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")})
if err != nil {
t.Errorf("#%d: couldn't put key (%v)", appliedi, err)
}
}
// The first snapshot and compaction should happen because the index is 11
expectMemberLog(t, mem, 5*time.Second, "saved snapshot", 1)
serathius marked this conversation as resolved.
Show resolved Hide resolved
expectMemberLog(t, mem, time.Second, "compacted Raft logs", 1)
expectMemberLog(t, mem, time.Second, "\"compact-index\": 6", 1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to check if compacted Raft log occured at most N times? This is hard to check, why checking if "compact-index": X is not enough ?


for ; appliedi <= 1100; appliedi++ {
_, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")})
if err != nil {
t.Errorf("#%d: couldn't put key (%v)", appliedi, err)
}
}
// With the index at 1100, snapshot and compaction should happen 100 times.
expectMemberLog(t, mem, 5*time.Second, "saved snapshot", 100)
expectMemberLog(t, mem, time.Second, "compacted Raft logs", 100)
expectMemberLog(t, mem, time.Second, "\"compact-index\": 1095", 1)

// No more snapshot and compaction should happen.
expectMemberLogTimeout(t, mem, 5*time.Second, "saved snapshot", 101)
expectMemberLogTimeout(t, mem, time.Second, "compacted Raft logs", 101)
}

// expectMemberLogTimeout ensures that the log has fewer than `count` occurrences of `s` before timing out
func expectMemberLogTimeout(t *testing.T, m *integration.Member, timeout time.Duration, s string, count int) {
serathius marked this conversation as resolved.
Show resolved Hide resolved
ctx, cancel := context.WithTimeout(context.TODO(), timeout)
defer cancel()

_, err := m.LogObserver.Expect(ctx, s, count)
if !errors.Is(err, context.DeadlineExceeded) {
if err != nil {
t.Fatalf("failed to expect (log:%s, count:%v): %v", s, count, err)
}
}
}
Loading