Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

etcdserver: creates a non-empty raft log snapshot on server startup #18494

Closed
35 changes: 19 additions & 16 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1210,7 +1210,10 @@ func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) {
}

func (s *EtcdServer) shouldSnapshot(ep *etcdProgress) bool {
return (s.forceSnapshot && ep.appliedi != ep.snapi) || (ep.appliedi-ep.snapi > s.Cfg.SnapshotCount)
return (s.forceSnapshot && ep.appliedi != ep.snapi) ||
(ep.appliedi-ep.snapi > s.Cfg.SnapshotCount) ||
// make sure a non-empty snapshot always exists
(ep.snapi == 0 && ep.appliedi > ep.snapi)
}

func (s *EtcdServer) hasMultipleVotingMembers() bool {
Expand Down Expand Up @@ -2181,24 +2184,24 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
}

// keep some in memory log entries for slow followers.
compacti := uint64(1)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could cause a panic if the applied index is also 1

if snapi > s.Cfg.SnapshotCatchUpEntries {
compacti = snapi - s.Cfg.SnapshotCatchUpEntries
}

err = s.r.raftStorage.Compact(compacti)
if err != nil {
// the compaction was done asynchronously with the progress of raft.
// raft log might already been compact.
if err == raft.ErrCompacted {
return
compacti := snapi - s.Cfg.SnapshotCatchUpEntries
// if a compaction occurs, the index value of the first entry(dummy) in raft log
// will be `compacti`. So, `compacti` must be less than `appliedi` (`snapi`).
err = s.r.raftStorage.Compact(compacti)
if err != nil {
// the compaction was done asynchronously with the progress of raft.
// raft log might already been compact.
if err == raft.ErrCompacted {
return
}
lg.Panic("failed to compact", zap.Error(err))
}
lg.Panic("failed to compact", zap.Error(err))
lg.Info(
"compacted Raft logs",
zap.Uint64("compact-index", compacti),
)
}
lg.Info(
"compacted Raft logs",
zap.Uint64("compact-index", compacti),
)
}

// CutPeer drops messages to the specified peer.
Expand Down
32 changes: 31 additions & 1 deletion server/etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,12 @@ func TestApplyRepeat(t *testing.T) {
SyncTicker: &time.Ticker{},
consistIndex: cindex.NewFakeConsistentIndex(0),
uberApply: uberApplierMock{},
kv: mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.StoreConfig{}),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set kv to avoid a nil pointer panic:

func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
d := GetMembershipInfoInV2Format(s.Logger(), s.cluster)
// commit kv to write metadata (for example: consistent index) to disk.
//
// This guarantees that Backend's consistent_index is >= index of last snapshot.
//
// KV().commit() updates the consistent index in backend.
// All operations that update consistent index must be called sequentially
// from applyAll function.
// So KV().Commit() cannot run in parallel with toApply. It has to be called outside
// the go routine created below.
s.KV().Commit()
lg := s.Logger()
// For backward compatibility, generate v2 snapshot from v3 state.
snap, err := s.r.raftStorage.CreateSnapshot(snapi, &confState, d)
if err != nil {

Copy link
Member

@serathius serathius Sep 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that we might need to fix couple of things in tests before we can merge the logic change. How about creating a separate PR to fix the tests first? That would give us confidence that there is no interdependence.

If those tests fixes are really beneficial, we can merge them immediately so detailed review needed for raft logic change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It’s good practice to avoid interdependence. Thanks for the insight! I’ll create new PRs to fix the tests whenever they can be separated.

}
s.start()

n.readyc <- newDummyPutReqReady()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test case wasn’t appending raft log entries correctly when the applied index increases, leading to an “slice bounds out of range” panic during snapshot creation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move it to a separate PR as proposed above?

req := &pb.InternalRaftRequest{
Header: &pb.RequestHeader{ID: 1},
Put: &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")},
Expand Down Expand Up @@ -134,7 +138,7 @@ func TestApplyRepeat(t *testing.T) {
t.Fatal(err)
}
if len(act) == 0 {
t.Fatalf("expected len(act)=0, got %d", len(act))
t.Fatalf("expected len(act)>0, got 0")
}

if err = <-stopc; err != nil {
Expand Down Expand Up @@ -885,8 +889,12 @@ func TestAddMember(t *testing.T) {
SyncTicker: &time.Ticker{},
consistIndex: cindex.NewFakeConsistentIndex(0),
beHooks: serverstorage.NewBackendHooks(lg, nil),
kv: mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.StoreConfig{}),
}
s.start()

n.readyc <- newDummyPutReqReady()

m := membership.Member{ID: 1234, RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"foo"}}}
_, err := s.AddMember(context.Background(), m)
gaction := n.Action()
Expand Down Expand Up @@ -992,8 +1000,12 @@ func TestRemoveMember(t *testing.T) {
SyncTicker: &time.Ticker{},
consistIndex: cindex.NewFakeConsistentIndex(0),
beHooks: serverstorage.NewBackendHooks(lg, nil),
kv: mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.StoreConfig{}),
}
s.start()

n.readyc <- newDummyPutReqReady()

_, err := s.RemoveMember(context.Background(), 1234)
gaction := n.Action()
s.Stop()
Expand Down Expand Up @@ -1041,8 +1053,12 @@ func TestUpdateMember(t *testing.T) {
SyncTicker: &time.Ticker{},
consistIndex: cindex.NewFakeConsistentIndex(0),
beHooks: serverstorage.NewBackendHooks(lg, nil),
kv: mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.StoreConfig{}),
}
s.start()

n.readyc <- newDummyPutReqReady()

wm := membership.Member{ID: 1234, RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://127.0.0.1:1"}}}
_, err := s.UpdateMember(context.Background(), wm)
gaction := n.Action()
Expand Down Expand Up @@ -1581,3 +1597,17 @@ func TestIsActive(t *testing.T) {
require.Equal(t, tc.expectActive, s.isActive())
}
}

// newDummyPutReqReady is useful in unit tests with a partially functional raft.Node
// (nodeConfChangeCommitterRecorder) that doesn't always append raft log entries properly.
// When this happens, it can crash when creating a raft log snapshot due to missing entries.
// To prevent this crash, we can send put requests to raft.Node's readyc after the server starts.
func newDummyPutReqReady() raft.Ready {
req := &pb.InternalRaftRequest{
Header: &pb.RequestHeader{ID: 1},
Put: &pb.PutRequest{Key: []byte("newDummyPutReqReady"), Value: []byte("bar")},
}
ents := []raftpb.Entry{{Index: 1, Data: pbutil.MustMarshal(req)}}

return raft.Ready{Entries: ents}
}
40 changes: 28 additions & 12 deletions tests/e2e/v2store_deprecation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"go.uber.org/zap"
"go.uber.org/zap/zaptest"

"go.etcd.io/etcd/api/v3/version"
"go.etcd.io/etcd/client/pkg/v3/fileutil"
"go.etcd.io/etcd/pkg/v3/expect"
"go.etcd.io/etcd/server/v3/etcdserver"
Expand Down Expand Up @@ -138,7 +139,32 @@ func TestV2DeprecationSnapshotMatches(t *testing.T) {
members2 := addAndRemoveKeysAndMembers(ctx, t, cc2, snapshotCount)
assert.NoError(t, epc.Close())

assertSnapshotsMatch(t, oldMemberDataDir, newMemberDataDir, func(data []byte) []byte {
lastVer, err := e2e.GetVersionFromBinary(e2e.BinPath.EtcdLastRelease)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why this is needed, please all test changes to separate PR.

if err != nil {
t.Fatal(err)
}
currVer, err := e2e.GetVersionFromBinary(e2e.BinPath.Etcd)
if err != nil {
t.Fatal(err)
}

firstFiles, err := fileutil.ListFiles(oldMemberDataDir, filterSnapshotFiles)
if err != nil {
t.Fatal(err)
}
secondFiles, err := fileutil.ListFiles(newMemberDataDir, filterSnapshotFiles)
if err != nil {
t.Fatal(err)
}
assert.NotEmpty(t, firstFiles)
assert.NotEmpty(t, secondFiles)

if lastVer.LessThan(version.V3_6) && (version.V3_6.Equal(*currVer) || version.V3_6.LessThan(*currVer)) {
assert.Equal(t, len(firstFiles)+1, len(secondFiles), "etcd v3.6 should create a snapshot of raft log on startup")
t.Skipf("raft log snapshots of %v are supposed to differ from of %v", currVer, lastVer)
}

assertSnapshotsMatch(t, firstFiles, secondFiles, func(data []byte) []byte {
// Patch members ids
for i, mid := range members1 {
data = bytes.Replace(data, []byte(fmt.Sprintf("%x", mid)), []byte(fmt.Sprintf("%d", i+1)), -1)
Expand Down Expand Up @@ -237,18 +263,8 @@ func filterSnapshotFiles(path string) bool {
return strings.HasSuffix(path, ".snap")
}

func assertSnapshotsMatch(t testing.TB, firstDataDir, secondDataDir string, patch func([]byte) []byte) {
func assertSnapshotsMatch(t testing.TB, firstFiles, secondFiles []string, patch func([]byte) []byte) {
lg := zaptest.NewLogger(t)
firstFiles, err := fileutil.ListFiles(firstDataDir, filterSnapshotFiles)
if err != nil {
t.Fatal(err)
}
secondFiles, err := fileutil.ListFiles(secondDataDir, filterSnapshotFiles)
if err != nil {
t.Fatal(err)
}
assert.NotEmpty(t, firstFiles)
assert.NotEmpty(t, secondFiles)
assert.Equal(t, len(firstFiles), len(secondFiles))
sort.Strings(firstFiles)
sort.Strings(secondFiles)
Expand Down
25 changes: 25 additions & 0 deletions tests/framework/integration/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1693,5 +1693,30 @@ func (c *Cluster) MustNewMember(t testutil.TB, resp *clientv3.MemberAddResponse)
}
m.InitialPeerURLsMap[m.Name] = types.MustNewURLs(resp.Member.PeerURLs)
c.Members = append(c.Members, m)

// Override PeerURLs and PeerListeners set by mustNewMember
urls := types.MustNewURLs(resp.Member.PeerURLs)
m.PeerURLs = urls

var listeners []net.Listener
for _, url := range urls {
var l net.Listener
var err error
switch url.Scheme {
case "http", "https":
l, err = net.Listen("tcp", url.Host)
case "unix", "unixs":
l, err = net.Listen("unix", url.Host)
default:
err = fmt.Errorf("unsupported scheme: %s", url.Scheme)
}

if err != nil {
t.Fatal("failed to listen on %v: %v", url, err)
}
listeners = append(listeners, l)
}
m.PeerListeners = listeners
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test case added a learner member with peer URL http://127.0.0.1:1234 to the cluster, but the learner member didn't listen to the host and couldn’t receive a snapshot from the leader.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move to a separate PR so we can merge this faster.


return m
}
117 changes: 117 additions & 0 deletions tests/integration/raft_log_snapshot_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright 2024 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package integration

import (
"context"
"errors"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/tests/v3/framework/integration"
)

func TestRaftLogSnapshotAlwaysExistsClusterOf1(t *testing.T) {
testRaftLogSnapshotExistsPostStartUp(t, 1)
}

func TestRaftLogSnapshotAlwaysExistsClusterOf3(t *testing.T) {
testRaftLogSnapshotExistsPostStartUp(t, 3)
}

// testRaftLogSnapshotExistsPostStartUp ensures
// - a non-empty raft log snapshot is present after the server starts up
// - the snapshot index is as expected
// - subsequent snapshots work as they used to
func testRaftLogSnapshotExistsPostStartUp(t *testing.T, size int) {
integration.BeforeTest(t)

clus := integration.NewCluster(t, &integration.ClusterConfig{
Size: size,
SnapshotCount: 100,
SnapshotCatchUpEntries: 10,
})
defer clus.Terminate(t)

// expect the first snapshot to appear
//
// NOTE: When starting a new cluster with N member, each member will
// apply N ConfChange directly at the beginning, setting the applied index to N.
expectedSnapIndex := size
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
defer cancel()
for _, m := range clus.Members {
lines, err := m.LogObserver.Expect(ctx, "saved snapshot", 1)
for _, line := range lines {
t.Logf("[expected line]: %v", line)
}

if err != nil {
t.Fatalf("failed to expect (log:%s, count:%v): %v", "saved snapshot", 1, err)
}

assert.Contains(t, lines[0], fmt.Sprintf("{\"snapshot-index\": %d}", expectedSnapIndex))
}

// increase applied index from size to size + 101, to trigger the second snapshot
expectedSnapIndex = size + 101
kvc := integration.ToGRPC(clus.RandClient()).KV
for i := 0; i < expectedSnapIndex; i++ {
_, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")})
if err != nil {
t.Fatalf("#%d: couldn't put key (%v)", i, err)
}
}

// expect the second snapshot to appear
ctx2, cancel2 := context.WithTimeout(context.TODO(), 5*time.Second)
defer cancel2()
for _, m := range clus.Members {
lines, err := m.LogObserver.Expect(ctx2, "saved snapshot", 2)
for _, line := range lines {
t.Logf("[expected line]: %v", line)
}

if err != nil {
t.Fatalf("failed to expect (log:%s, count:%v): %v", "saved snapshot", 2, err)
}

assert.Contains(t, lines[1], fmt.Sprintf("{\"snapshot-index\": %d}", expectedSnapIndex))
}

// expect the third snapshot doesn't appear
errC := make(chan error)
ctx3, cancel3 := context.WithTimeout(context.TODO(), 5*time.Second)
defer cancel3()
for _, m := range clus.Members {
go func() {
// m.LogObserver.Expect should return a DeadlineExceeded error to confirm there are no more snapshots
_, err := m.LogObserver.Expect(ctx3, "saved snapshot", 3)
if !errors.Is(err, context.DeadlineExceeded) {
errC <- fmt.Errorf("expected a DeadlineExceeded error, got %v, max snapshots allowed is %d", err, 2)
}
}()
}

select {
case err := <-errC:
t.Fatal(err)
case <-ctx3.Done():
}
}
15 changes: 8 additions & 7 deletions tests/integration/v3_watch_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) {
kvc := integration.ToGRPC(clus.Client(1)).KV

// to trigger snapshot from the leader to the stopped follower
for i := 0; i < 15; i++ {
for i := 0; i < 17; i++ {
_, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")})
if err != nil {
t.Errorf("#%d: couldn't put key (%v)", i, err)
Expand All @@ -99,14 +99,15 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) {

// NOTE: When starting a new cluster with 3 members, each member will
// apply 3 ConfChange directly at the beginning before a leader is
// elected. Leader will apply 3 MemberAttrSet and 1 ClusterVersionSet
// elected. A snapshot of raft log is created, setting the snap
// index to 3. Leader will apply 3 MemberAttrSet and 1 ClusterVersionSet
// changes. So member 0 has index 8 in raft log before network
// partition. We need to trigger EtcdServer.snapshot() at least twice.
//
// SnapshotCount: 10, SnapshotCatchUpEntries: 5
//
// T1: L(snapshot-index: 11, compacted-index: 6), F_m0(index:8)
// T2: L(snapshot-index: 22, compacted-index: 17), F_m0(index:8, out of date)
// T1: L(snapshot-index: 14, compacted-index: 9), F_m0(index:8, out of date)
// T2: L(snapshot-index: 25, compacted-index: 20), F_m0(index:8, out of date)
//
// Since there is no way to confirm server has compacted the log, we
// use log monitor to watch and expect "compacted Raft logs" content.
Expand Down Expand Up @@ -156,9 +157,9 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) {
errc <- cerr
return
}
// from start revision 5 to latest revision 16
if len(cresp.Events) != 12 {
errc <- fmt.Errorf("expected 12 events, got %+v", cresp.Events)
// from start revision 5 to latest revision 18
if len(cresp.Events) != 14 {
errc <- fmt.Errorf("expected 14 events, got %+v", cresp.Events)
return
}
errc <- nil
Expand Down
Loading