-
Notifications
You must be signed in to change notification settings - Fork 9.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
etcdserver: creates a non-empty raft log snapshot on server startup #18494
Changes from 11 commits
fa0fb6f
87faf55
103398a
926df1a
7f42b01
5e8a2f3
654aee8
49c5a42
3c14a8f
6136355
1955f1e
e40267f
ccbec07
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -99,8 +99,12 @@ func TestApplyRepeat(t *testing.T) { | |||||||||||||||||||||||||||||||||||||
SyncTicker: &time.Ticker{}, | ||||||||||||||||||||||||||||||||||||||
consistIndex: cindex.NewFakeConsistentIndex(0), | ||||||||||||||||||||||||||||||||||||||
uberApply: uberApplierMock{}, | ||||||||||||||||||||||||||||||||||||||
kv: mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.StoreConfig{}), | ||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Set etcd/server/etcdserver/server.go Lines 2132 to 2149 in 4bb9392
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see that we might need to fix couple of things in tests before we can merge the logic change. How about creating a separate PR to fix the tests first? That would give us confidence that there is no interdependence. If those tests fixes are really beneficial, we can merge them immediately so detailed review needed for raft logic change. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It’s good practice to avoid interdependence. Thanks for the insight! I’ll create new PRs to fix the tests whenever they can be separated. |
||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||
s.start() | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
n.readyc <- newDummyPutReqReady() | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The test case wasn’t appending raft log entries correctly when the applied index increases, leading to an “slice bounds out of range” panic during snapshot creation. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you move it to a separate PR as proposed above? |
||||||||||||||||||||||||||||||||||||||
req := &pb.InternalRaftRequest{ | ||||||||||||||||||||||||||||||||||||||
Header: &pb.RequestHeader{ID: 1}, | ||||||||||||||||||||||||||||||||||||||
Put: &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}, | ||||||||||||||||||||||||||||||||||||||
|
@@ -134,7 +138,7 @@ func TestApplyRepeat(t *testing.T) { | |||||||||||||||||||||||||||||||||||||
t.Fatal(err) | ||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||
if len(act) == 0 { | ||||||||||||||||||||||||||||||||||||||
t.Fatalf("expected len(act)=0, got %d", len(act)) | ||||||||||||||||||||||||||||||||||||||
t.Fatalf("expected len(act)>0, got 0") | ||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
if err = <-stopc; err != nil { | ||||||||||||||||||||||||||||||||||||||
|
@@ -885,8 +889,12 @@ func TestAddMember(t *testing.T) { | |||||||||||||||||||||||||||||||||||||
SyncTicker: &time.Ticker{}, | ||||||||||||||||||||||||||||||||||||||
consistIndex: cindex.NewFakeConsistentIndex(0), | ||||||||||||||||||||||||||||||||||||||
beHooks: serverstorage.NewBackendHooks(lg, nil), | ||||||||||||||||||||||||||||||||||||||
kv: mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.StoreConfig{}), | ||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||
s.start() | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
n.readyc <- newDummyPutReqReady() | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
m := membership.Member{ID: 1234, RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"foo"}}} | ||||||||||||||||||||||||||||||||||||||
_, err := s.AddMember(context.Background(), m) | ||||||||||||||||||||||||||||||||||||||
gaction := n.Action() | ||||||||||||||||||||||||||||||||||||||
|
@@ -992,8 +1000,12 @@ func TestRemoveMember(t *testing.T) { | |||||||||||||||||||||||||||||||||||||
SyncTicker: &time.Ticker{}, | ||||||||||||||||||||||||||||||||||||||
consistIndex: cindex.NewFakeConsistentIndex(0), | ||||||||||||||||||||||||||||||||||||||
beHooks: serverstorage.NewBackendHooks(lg, nil), | ||||||||||||||||||||||||||||||||||||||
kv: mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.StoreConfig{}), | ||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||
s.start() | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
n.readyc <- newDummyPutReqReady() | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
_, err := s.RemoveMember(context.Background(), 1234) | ||||||||||||||||||||||||||||||||||||||
gaction := n.Action() | ||||||||||||||||||||||||||||||||||||||
s.Stop() | ||||||||||||||||||||||||||||||||||||||
|
@@ -1041,8 +1053,12 @@ func TestUpdateMember(t *testing.T) { | |||||||||||||||||||||||||||||||||||||
SyncTicker: &time.Ticker{}, | ||||||||||||||||||||||||||||||||||||||
consistIndex: cindex.NewFakeConsistentIndex(0), | ||||||||||||||||||||||||||||||||||||||
beHooks: serverstorage.NewBackendHooks(lg, nil), | ||||||||||||||||||||||||||||||||||||||
kv: mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.StoreConfig{}), | ||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||
s.start() | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
n.readyc <- newDummyPutReqReady() | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
wm := membership.Member{ID: 1234, RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://127.0.0.1:1"}}} | ||||||||||||||||||||||||||||||||||||||
_, err := s.UpdateMember(context.Background(), wm) | ||||||||||||||||||||||||||||||||||||||
gaction := n.Action() | ||||||||||||||||||||||||||||||||||||||
|
@@ -1581,3 +1597,17 @@ func TestIsActive(t *testing.T) { | |||||||||||||||||||||||||||||||||||||
require.Equal(t, tc.expectActive, s.isActive()) | ||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
// newDummyPutReqReady is useful in unit tests with a partially functional raft.Node | ||||||||||||||||||||||||||||||||||||||
// (nodeConfChangeCommitterRecorder) that doesn't always append raft log entries properly. | ||||||||||||||||||||||||||||||||||||||
// When this happens, it can crash when creating a raft log snapshot due to missing entries. | ||||||||||||||||||||||||||||||||||||||
// To prevent this crash, you can send put requests to raft.Node's readyc after the server starts. | ||||||||||||||||||||||||||||||||||||||
func newDummyPutReqReady() raft.Ready { | ||||||||||||||||||||||||||||||||||||||
req := &pb.InternalRaftRequest{ | ||||||||||||||||||||||||||||||||||||||
Header: &pb.RequestHeader{ID: 1}, | ||||||||||||||||||||||||||||||||||||||
Put: &pb.PutRequest{Key: []byte("newDummyPutReqReady"), Value: []byte("bar")}, | ||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||
ents := []raftpb.Entry{{Index: 1, Data: pbutil.MustMarshal(req)}} | ||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||
return raft.Ready{Entries: ents} | ||||||||||||||||||||||||||||||||||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,6 +28,7 @@ import ( | |
"go.uber.org/zap" | ||
"go.uber.org/zap/zaptest" | ||
|
||
"go.etcd.io/etcd/api/v3/version" | ||
"go.etcd.io/etcd/client/pkg/v3/fileutil" | ||
"go.etcd.io/etcd/pkg/v3/expect" | ||
"go.etcd.io/etcd/server/v3/etcdserver" | ||
|
@@ -138,7 +139,32 @@ func TestV2DeprecationSnapshotMatches(t *testing.T) { | |
members2 := addAndRemoveKeysAndMembers(ctx, t, cc2, snapshotCount) | ||
assert.NoError(t, epc.Close()) | ||
|
||
assertSnapshotsMatch(t, oldMemberDataDir, newMemberDataDir, func(data []byte) []byte { | ||
lastVer, err := e2e.GetVersionFromBinary(e2e.BinPath.EtcdLastRelease) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure why this is needed, please all test changes to separate PR. |
||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
currVer, err := e2e.GetVersionFromBinary(e2e.BinPath.Etcd) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
firstFiles, err := fileutil.ListFiles(oldMemberDataDir, filterSnapshotFiles) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
secondFiles, err := fileutil.ListFiles(newMemberDataDir, filterSnapshotFiles) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
assert.NotEmpty(t, firstFiles) | ||
assert.NotEmpty(t, secondFiles) | ||
|
||
if lastVer.LessThan(version.V3_6) && (version.V3_6.Equal(*currVer) || version.V3_6.LessThan(*currVer)) { | ||
assert.Equal(t, len(firstFiles)+1, len(secondFiles), "etcd v3.6 should create a snapshot of raft log snapshot on startup") | ||
t.Skipf("raft log snapshots of %v are supposed to differ from of %v", currVer, lastVer) | ||
} | ||
|
||
assertSnapshotsMatch(t, firstFiles, secondFiles, func(data []byte) []byte { | ||
// Patch members ids | ||
for i, mid := range members1 { | ||
data = bytes.Replace(data, []byte(fmt.Sprintf("%x", mid)), []byte(fmt.Sprintf("%d", i+1)), -1) | ||
|
@@ -237,18 +263,8 @@ func filterSnapshotFiles(path string) bool { | |
return strings.HasSuffix(path, ".snap") | ||
} | ||
|
||
func assertSnapshotsMatch(t testing.TB, firstDataDir, secondDataDir string, patch func([]byte) []byte) { | ||
func assertSnapshotsMatch(t testing.TB, firstFiles, secondFiles []string, patch func([]byte) []byte) { | ||
lg := zaptest.NewLogger(t) | ||
firstFiles, err := fileutil.ListFiles(firstDataDir, filterSnapshotFiles) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
secondFiles, err := fileutil.ListFiles(secondDataDir, filterSnapshotFiles) | ||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
assert.NotEmpty(t, firstFiles) | ||
assert.NotEmpty(t, secondFiles) | ||
assert.Equal(t, len(firstFiles), len(secondFiles)) | ||
sort.Strings(firstFiles) | ||
sort.Strings(secondFiles) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1693,5 +1693,30 @@ func (c *Cluster) MustNewMember(t testutil.TB, resp *clientv3.MemberAddResponse) | |
} | ||
m.InitialPeerURLsMap[m.Name] = types.MustNewURLs(resp.Member.PeerURLs) | ||
c.Members = append(c.Members, m) | ||
|
||
// Override PeerURLs and PeerListeners set by mustNewMember | ||
urls := types.MustNewURLs(resp.Member.PeerURLs) | ||
m.PeerURLs = urls | ||
|
||
var listeners []net.Listener | ||
for _, url := range urls { | ||
var l net.Listener | ||
var err error | ||
switch url.Scheme { | ||
case "http", "https": | ||
l, err = net.Listen("tcp", url.Host) | ||
case "unix", "unixs": | ||
l, err = net.Listen("unix", url.Host) | ||
default: | ||
err = fmt.Errorf("unsupported scheme: %s", url.Scheme) | ||
} | ||
|
||
if err != nil { | ||
t.Fatal("failed to listen on %v: %v", url, err) | ||
} | ||
listeners = append(listeners, l) | ||
} | ||
m.PeerListeners = listeners | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The test case added a learner member with peer URL There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please move to a separate PR so we can merge this faster. |
||
|
||
return m | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
// Copyright 2024 The etcd Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package integration | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/assert" | ||
|
||
pb "go.etcd.io/etcd/api/v3/etcdserverpb" | ||
"go.etcd.io/etcd/tests/v3/framework/integration" | ||
) | ||
|
||
func TestRaftLogSnapshotAlwaysExistsClusterOf1(t *testing.T) { | ||
testRaftLogSnapshotExistsPostStartUp(t, 1) | ||
} | ||
|
||
func TestRaftLogSnapshotAlwaysExistsClusterOf3(t *testing.T) { | ||
testRaftLogSnapshotExistsPostStartUp(t, 3) | ||
} | ||
|
||
// testRaftLogSnapshotExistsPostStartUp ensures | ||
// - a non-empty raft log snapshot is present after the server starts up | ||
// - the snapshot index is as expected | ||
// - subsequent snapshots work as they used to | ||
func testRaftLogSnapshotExistsPostStartUp(t *testing.T, size int) { | ||
integration.BeforeTest(t) | ||
|
||
clus := integration.NewCluster(t, &integration.ClusterConfig{ | ||
Size: size, | ||
SnapshotCount: 100, | ||
SnapshotCatchUpEntries: 10, | ||
}) | ||
defer clus.Terminate(t) | ||
|
||
// expect the first snapshot to appear | ||
// | ||
// NOTE: When starting a new cluster with N member, each member will | ||
// apply N ConfChange directly at the beginning, setting the applied index to N. | ||
expectedSnapIndex := size | ||
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) | ||
defer cancel() | ||
for _, m := range clus.Members { | ||
lines, err := m.LogObserver.Expect(ctx, "saved snapshot", 1) | ||
for _, line := range lines { | ||
t.Logf("[expected line]: %v", line) | ||
} | ||
|
||
if err != nil { | ||
t.Fatalf("failed to expect (log:%s, count:%v): %v", "saved snapshot", 1, err) | ||
} | ||
|
||
assert.Contains(t, lines[0], fmt.Sprintf("{\"snapshot-index\": %d}", expectedSnapIndex)) | ||
} | ||
|
||
// increase applied index from size to size + 101, to trigger the second snapshot | ||
expectedSnapIndex = size + 101 | ||
kvc := integration.ToGRPC(clus.RandClient()).KV | ||
for i := 0; i < expectedSnapIndex; i++ { | ||
_, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}) | ||
if err != nil { | ||
t.Fatalf("#%d: couldn't put key (%v)", i, err) | ||
} | ||
} | ||
|
||
// expect the second snapshot to appear | ||
ctx2, cancel2 := context.WithTimeout(context.TODO(), 5*time.Second) | ||
defer cancel2() | ||
for _, m := range clus.Members { | ||
lines, err := m.LogObserver.Expect(ctx2, "saved snapshot", 2) | ||
for _, line := range lines { | ||
t.Logf("[expected line]: %v", line) | ||
} | ||
|
||
if err != nil { | ||
t.Fatalf("failed to expect (log:%s, count:%v): %v", "saved snapshot", 2, err) | ||
} | ||
|
||
assert.Contains(t, lines[1], fmt.Sprintf("{\"snapshot-index\": %d}", expectedSnapIndex)) | ||
} | ||
|
||
// expect the third snapshot doesn't appear | ||
errC := make(chan error) | ||
ctx3, cancel3 := context.WithTimeout(context.TODO(), 5*time.Second) | ||
defer cancel3() | ||
for _, m := range clus.Members { | ||
go func() { | ||
// m.LogObserver.Expect should return a DeadlineExceeded error to confirm there are no more snapshots | ||
_, err := m.LogObserver.Expect(ctx3, "saved snapshot", 3) | ||
if !errors.Is(err, context.DeadlineExceeded) { | ||
errC <- fmt.Errorf("expected a DeadlineExceeded error, got %v, max snapshots allowed is %d", err, 2) | ||
} | ||
}() | ||
} | ||
|
||
select { | ||
case err := <-errC: | ||
t.Fatal(err) | ||
case <-ctx3.Done(): | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could cause a panic if the applied index is also 1