From fa0fb6fa9b98f8ca8b89f8742dcfcb373687850e Mon Sep 17 00:00:00 2001 From: Clement Date: Mon, 26 Aug 2024 02:45:41 +0800 Subject: [PATCH 01/13] etcdserver: a non-empty raft log snapshot should always be available Signed-off-by: Clement --- server/etcdserver/server.go | 5 +- tests/integration/raft_log_snapshot_test.go | 78 +++++++++++++++++++++ 2 files changed, 82 insertions(+), 1 deletion(-) create mode 100644 tests/integration/raft_log_snapshot_test.go diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 0600a31b896..ac9e00d9b60 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -1210,7 +1210,10 @@ func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) { } func (s *EtcdServer) shouldSnapshot(ep *etcdProgress) bool { - return (s.forceSnapshot && ep.appliedi != ep.snapi) || (ep.appliedi-ep.snapi > s.Cfg.SnapshotCount) + return (s.forceSnapshot && ep.appliedi != ep.snapi) || + (ep.appliedi-ep.snapi > s.Cfg.SnapshotCount) || + // ensures a non-empty snapshot always exists + (ep.snapi == 0 && ep.appliedi > ep.snapi) } func (s *EtcdServer) hasMultipleVotingMembers() bool { diff --git a/tests/integration/raft_log_snapshot_test.go b/tests/integration/raft_log_snapshot_test.go new file mode 100644 index 00000000000..d048dee277d --- /dev/null +++ b/tests/integration/raft_log_snapshot_test.go @@ -0,0 +1,78 @@ +// Copyright 2024 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package integration + +import ( + "context" + "errors" + "testing" + "time" + + pb "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/tests/v3/framework/integration" +) + +// TestRaftLogSnapshotExistsPostStartUp ensures a non-empty raft log snapshot exists after startup +func TestRaftLogSnapshotExistsPostStartUp(t *testing.T) { + integration.BeforeTest(t) + + clus := integration.NewCluster(t, &integration.ClusterConfig{ + Size: 1, + SnapshotCount: 100, + SnapshotCatchUpEntries: 10, + }) + defer clus.Terminate(t) + + m := clus.Members[0] + + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + defer cancel() + + _, err := m.LogObserver.Expect(ctx, "saved snapshot", 1) + if err != nil { + t.Fatalf("failed to expect (log:%s, count:%v): %v", "saved snapshot", 1, err) + } + + kvc := integration.ToGRPC(clus.RandClient()).KV + + // In order to trigger another snapshot, we should increase applied index from 1 to 102. + // + // NOTE: When starting a new cluster with 1 member, the member will + // apply 3 ConfChange directly at the beginning, meaning its applied + // index is 4. + for i := 0; i < 102-4; i++ { + _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}) + if err != nil { + t.Fatalf("#%d: couldn't put key (%v)", i, err) + } + } + + ctx2, cancel2 := context.WithTimeout(context.TODO(), 5*time.Second) + defer cancel2() + + _, err = m.LogObserver.Expect(ctx2, "saved snapshot", 2) + if err != nil { + t.Fatalf("failed to expect (log:%s, count:%v): %v", "saved snapshot", 1, err) + } + + ctx3, cancel3 := context.WithTimeout(context.TODO(), 5*time.Second) + defer cancel3() + + // Expect function should return a DeadlineExceeded error to ensure no more snapshots are present + _, err = m.LogObserver.Expect(ctx3, "saved snapshot", 3) + if !errors.Is(err, context.DeadlineExceeded) { + t.Fatalf("unexpected error, max snapshots allowed is %d: %v", 2, err) + } +} From 87faf556d1caccad32b9480b7d288fff991ec5e2 Mon Sep 17 00:00:00 2001 From: Clement Date: Thu, 29 Aug 2024 02:43:00 +0800 Subject: [PATCH 02/13] fix: TestMemberPromote should override PeerURLS Signed-off-by: Clement --- tests/framework/integration/cluster.go | 15 +++++++++++++++ tests/framework/testutils/helpters.go | 14 ++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/tests/framework/integration/cluster.go b/tests/framework/integration/cluster.go index 95b5c88d9f8..b6a31374559 100644 --- a/tests/framework/integration/cluster.go +++ b/tests/framework/integration/cluster.go @@ -1693,5 +1693,20 @@ func (c *Cluster) MustNewMember(t testutil.TB, resp *clientv3.MemberAddResponse) } m.InitialPeerURLsMap[m.Name] = types.MustNewURLs(resp.Member.PeerURLs) c.Members = append(c.Members, m) + + // Override PeerURLs and PeerListeners set by mustNewMember + urls := types.MustNewURLs(resp.Member.PeerURLs) + m.PeerURLs = urls + + var listeners []net.Listener + for _, url := range urls { + l, err := testutils.ListenURL(&url) + if err != nil { + t.Fatal("failed to listen on %v: %v", url, err) + } + listeners = append(listeners, l) + } + m.PeerListeners = listeners + return m } diff --git a/tests/framework/testutils/helpters.go b/tests/framework/testutils/helpters.go index 91363176c2f..00e209a320d 100644 --- a/tests/framework/testutils/helpters.go +++ b/tests/framework/testutils/helpters.go @@ -16,6 +16,9 @@ package testutils import ( "errors" + "fmt" + "net" + "net/url" "time" clientv3 "go.etcd.io/etcd/client/v3" @@ -69,3 +72,14 @@ func MustClient(c intf.Client, err error) intf.Client { } return c } + +func ListenURL(addr *url.URL) (net.Listener, error) { + switch addr.Scheme { + case "http", "https": + return net.Listen("tcp", addr.Host) + case "unix", "unixs": + return net.Listen("unix", addr.Path) + default: + return nil, fmt.Errorf("unsupported scheme: %s", addr.Scheme) + } +} From 103398a036afa1b218943b7dbfdc82bbee0acc69 Mon Sep 17 00:00:00 2001 From: Clement Date: Thu, 29 Aug 2024 02:53:29 +0800 Subject: [PATCH 03/13] fix: TestGrpcProxyAutoSync should wait a bit longer at CloseProc Signed-off-by: Clement --- tests/framework/e2e/cluster.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index 23d422aa313..97040515807 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -786,7 +786,7 @@ func (epc *EtcdProcessCluster) CloseProc(ctx context.Context, finder func(EtcdPr } memberRemoved := false - for i := 0; i < 10; i++ { + for i := 0; i < 15; i++ { _, err := memberCtl.MemberRemove(ctx, memberID) if err != nil && strings.Contains(err.Error(), "member not found") { memberRemoved = true From 926df1a2bc71043290e9851b08a1e4646c27e1e7 Mon Sep 17 00:00:00 2001 From: Clement Date: Thu, 29 Aug 2024 20:18:56 +0800 Subject: [PATCH 04/13] fix: compacti must be less than appliedi Signed-off-by: Clement --- server/etcdserver/server.go | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index ac9e00d9b60..407d740e579 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -2184,24 +2184,24 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { } // keep some in memory log entries for slow followers. - compacti := uint64(1) if snapi > s.Cfg.SnapshotCatchUpEntries { - compacti = snapi - s.Cfg.SnapshotCatchUpEntries - } - - err = s.r.raftStorage.Compact(compacti) - if err != nil { - // the compaction was done asynchronously with the progress of raft. - // raft log might already been compact. - if err == raft.ErrCompacted { - return + compacti := snapi - s.Cfg.SnapshotCatchUpEntries + // After calling s.r.raftStorage.Compact, compacti becomes the first (dummy) entry. + // Therefore, compacti must be less than appliedi (snapi). + err = s.r.raftStorage.Compact(compacti) + if err != nil { + // the compaction was done asynchronously with the progress of raft. + // raft log might already been compact. + if err == raft.ErrCompacted { + return + } + lg.Panic("failed to compact", zap.Error(err)) } - lg.Panic("failed to compact", zap.Error(err)) + lg.Info( + "compacted Raft logs", + zap.Uint64("compact-index", compacti), + ) } - lg.Info( - "compacted Raft logs", - zap.Uint64("compact-index", compacti), - ) } // CutPeer drops messages to the specified peer. From 7f42b01b8ca545e75b4ffe7dcbb2accc2cdbdf09 Mon Sep 17 00:00:00 2001 From: Clement Date: Thu, 29 Aug 2024 21:50:30 +0800 Subject: [PATCH 05/13] fix: adjust TestV3WatchRestoreSnapshotUnsync to handle a different initial snap index Signed-off-by: Clement --- tests/integration/v3_watch_restore_test.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/tests/integration/v3_watch_restore_test.go b/tests/integration/v3_watch_restore_test.go index f7e2e4b4730..3ad46ba0a44 100644 --- a/tests/integration/v3_watch_restore_test.go +++ b/tests/integration/v3_watch_restore_test.go @@ -90,7 +90,7 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) { kvc := integration.ToGRPC(clus.Client(1)).KV // to trigger snapshot from the leader to the stopped follower - for i := 0; i < 15; i++ { + for i := 0; i < 17; i++ { _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}) if err != nil { t.Errorf("#%d: couldn't put key (%v)", i, err) @@ -99,14 +99,15 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) { // NOTE: When starting a new cluster with 3 members, each member will // apply 3 ConfChange directly at the beginning before a leader is - // elected. Leader will apply 3 MemberAttrSet and 1 ClusterVersionSet + // elected. Also, a snapshot of raft log is created, setting the snap + // index to 3. Leader will apply 3 MemberAttrSet and 1 ClusterVersionSet // changes. So member 0 has index 8 in raft log before network // partition. We need to trigger EtcdServer.snapshot() at least twice. // // SnapshotCount: 10, SnapshotCatchUpEntries: 5 // - // T1: L(snapshot-index: 11, compacted-index: 6), F_m0(index:8) - // T2: L(snapshot-index: 22, compacted-index: 17), F_m0(index:8, out of date) + // T1: L(snapshot-index: 14, compacted-index: 9), F_m0(index:8, out of date) + // T2: L(snapshot-index: 25, compacted-index: 20), F_m0(index:8, out of date) // // Since there is no way to confirm server has compacted the log, we // use log monitor to watch and expect "compacted Raft logs" content. @@ -156,9 +157,9 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) { errc <- cerr return } - // from start revision 5 to latest revision 16 - if len(cresp.Events) != 12 { - errc <- fmt.Errorf("expected 12 events, got %+v", cresp.Events) + // from start revision 5 to latest revision 18 + if len(cresp.Events) != 14 { + errc <- fmt.Errorf("expected 14 events, got %+v", cresp.Events) return } errc <- nil From 5e8a2f3b51cdb32d8227fc523fee00f7e1d53c4b Mon Sep 17 00:00:00 2001 From: Clement Date: Thu, 29 Aug 2024 21:58:15 +0800 Subject: [PATCH 06/13] fix: TestV2SetMemberAttributes should have a kv Signed-off-by: Clement --- server/etcdserver/server_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index 858aa32a076..d9b120e8f04 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -99,6 +99,7 @@ func TestApplyRepeat(t *testing.T) { SyncTicker: &time.Ticker{}, consistIndex: cindex.NewFakeConsistentIndex(0), uberApply: uberApplierMock{}, + kv: mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.StoreConfig{}), } s.start() req := &pb.InternalRaftRequest{ From 654aee86fa4274d0f6faeec559bb47cbebe49b47 Mon Sep 17 00:00:00 2001 From: Clement Date: Fri, 30 Aug 2024 01:11:10 +0800 Subject: [PATCH 07/13] fix: TestApplyRepeat Signed-off-by: Clement --- server/etcdserver/server_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index d9b120e8f04..8d2f87fe203 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -107,9 +107,9 @@ func TestApplyRepeat(t *testing.T) { Put: &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}, } ents := []raftpb.Entry{{Index: 1, Data: pbutil.MustMarshal(req)}} - n.readyc <- raft.Ready{CommittedEntries: ents} + n.readyc <- raft.Ready{Entries: ents, CommittedEntries: ents} // dup msg - n.readyc <- raft.Ready{CommittedEntries: ents} + n.readyc <- raft.Ready{Entries: ents, CommittedEntries: ents} // use a conf change to block until dup msgs are all processed cc := &raftpb.ConfChange{Type: raftpb.ConfChangeRemoveNode, NodeID: 2} @@ -135,7 +135,7 @@ func TestApplyRepeat(t *testing.T) { t.Fatal(err) } if len(act) == 0 { - t.Fatalf("expected len(act)=0, got %d", len(act)) + t.Fatalf("expected len(act)>0, got 0") } if err = <-stopc; err != nil { From 49c5a42c2c868b7aa3702799c909320493ba6d62 Mon Sep 17 00:00:00 2001 From: Clement Date: Fri, 30 Aug 2024 05:22:48 +0800 Subject: [PATCH 08/13] fix: unit tests Signed-off-by: Clement --- server/etcdserver/server_test.go | 23 +++++++++++++++++++++ tests/integration/raft_log_snapshot_test.go | 3 +-- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index 8d2f87fe203..65ebee05013 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -886,8 +886,12 @@ func TestAddMember(t *testing.T) { SyncTicker: &time.Ticker{}, consistIndex: cindex.NewFakeConsistentIndex(0), beHooks: serverstorage.NewBackendHooks(lg, nil), + kv: mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.StoreConfig{}), } s.start() + + n.readyc <- createDummyReady() + m := membership.Member{ID: 1234, RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"foo"}}} _, err := s.AddMember(context.Background(), m) gaction := n.Action() @@ -993,8 +997,12 @@ func TestRemoveMember(t *testing.T) { SyncTicker: &time.Ticker{}, consistIndex: cindex.NewFakeConsistentIndex(0), beHooks: serverstorage.NewBackendHooks(lg, nil), + kv: mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.StoreConfig{}), } s.start() + + n.readyc <- createDummyReady() + _, err := s.RemoveMember(context.Background(), 1234) gaction := n.Action() s.Stop() @@ -1042,8 +1050,12 @@ func TestUpdateMember(t *testing.T) { SyncTicker: &time.Ticker{}, consistIndex: cindex.NewFakeConsistentIndex(0), beHooks: serverstorage.NewBackendHooks(lg, nil), + kv: mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.StoreConfig{}), } s.start() + + n.readyc <- createDummyReady() + wm := membership.Member{ID: 1234, RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://127.0.0.1:1"}}} _, err := s.UpdateMember(context.Background(), wm) gaction := n.Action() @@ -1582,3 +1594,14 @@ func TestIsActive(t *testing.T) { require.Equal(t, tc.expectActive, s.isActive()) } } + +// createDummyReady creates a raft Ready that can be sent to readyc to prevent crashes during snapshots. +func createDummyReady() raft.Ready { + req := &pb.InternalRaftRequest{ + Header: &pb.RequestHeader{ID: 1}, + Put: &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}, + } + ents := []raftpb.Entry{{Index: 1, Data: pbutil.MustMarshal(req)}} + + return raft.Ready{Entries: ents} +} diff --git a/tests/integration/raft_log_snapshot_test.go b/tests/integration/raft_log_snapshot_test.go index d048dee277d..b6e5790a8b1 100644 --- a/tests/integration/raft_log_snapshot_test.go +++ b/tests/integration/raft_log_snapshot_test.go @@ -50,8 +50,7 @@ func TestRaftLogSnapshotExistsPostStartUp(t *testing.T) { // In order to trigger another snapshot, we should increase applied index from 1 to 102. // // NOTE: When starting a new cluster with 1 member, the member will - // apply 3 ConfChange directly at the beginning, meaning its applied - // index is 4. + // apply 3 ConfChange directly at the beginning, setting the applied index to 4. for i := 0; i < 102-4; i++ { _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}) if err != nil { From 3c14a8f19a24862e47b8d30f19ddecd8fbe4a20c Mon Sep 17 00:00:00 2001 From: Clement Date: Sat, 31 Aug 2024 20:12:12 +0800 Subject: [PATCH 09/13] fix: TestV2DeprecationSnapshotMatches Signed-off-by: Clement --- tests/e2e/v2store_deprecation_test.go | 36 +++++++++++++++------ tests/integration/raft_log_snapshot_test.go | 2 +- 2 files changed, 27 insertions(+), 11 deletions(-) diff --git a/tests/e2e/v2store_deprecation_test.go b/tests/e2e/v2store_deprecation_test.go index 99628cd54e3..2ab87a91ecf 100644 --- a/tests/e2e/v2store_deprecation_test.go +++ b/tests/e2e/v2store_deprecation_test.go @@ -28,6 +28,7 @@ import ( "go.uber.org/zap" "go.uber.org/zap/zaptest" + "go.etcd.io/etcd/api/v3/version" "go.etcd.io/etcd/client/pkg/v3/fileutil" "go.etcd.io/etcd/pkg/v3/expect" "go.etcd.io/etcd/server/v3/etcdserver" @@ -138,7 +139,30 @@ func TestV2DeprecationSnapshotMatches(t *testing.T) { members2 := addAndRemoveKeysAndMembers(ctx, t, cc2, snapshotCount) assert.NoError(t, epc.Close()) - assertSnapshotsMatch(t, oldMemberDataDir, newMemberDataDir, func(data []byte) []byte { + lastVer, err := e2e.GetVersionFromBinary(e2e.BinPath.EtcdLastRelease) + if err != nil { + t.Fatal(err) + } + currVer, err := e2e.GetVersionFromBinary(e2e.BinPath.Etcd) + if err != nil { + t.Fatal(err) + } + + firstFiles, err := fileutil.ListFiles(oldMemberDataDir, filterSnapshotFiles) + if err != nil { + t.Fatal(err) + } + secondFiles, err := fileutil.ListFiles(newMemberDataDir, filterSnapshotFiles) + if err != nil { + t.Fatal(err) + } + + if lastVer.LessThan(version.V3_6) && (version.V3_6.Equal(*currVer) || version.V3_6.LessThan(*currVer)) { + assert.Equal(t, len(firstFiles)+1, len(secondFiles), "etcd v3.6 should creates an additional raft log snapshot on startup") + t.Skipf("raft log snapshots of %v are supposed to differ from of %v", currVer, lastVer) + } + + assertSnapshotsMatch(t, firstFiles, secondFiles, func(data []byte) []byte { // Patch members ids for i, mid := range members1 { data = bytes.Replace(data, []byte(fmt.Sprintf("%x", mid)), []byte(fmt.Sprintf("%d", i+1)), -1) @@ -237,16 +261,8 @@ func filterSnapshotFiles(path string) bool { return strings.HasSuffix(path, ".snap") } -func assertSnapshotsMatch(t testing.TB, firstDataDir, secondDataDir string, patch func([]byte) []byte) { +func assertSnapshotsMatch(t testing.TB, firstFiles, secondFiles []string, patch func([]byte) []byte) { lg := zaptest.NewLogger(t) - firstFiles, err := fileutil.ListFiles(firstDataDir, filterSnapshotFiles) - if err != nil { - t.Fatal(err) - } - secondFiles, err := fileutil.ListFiles(secondDataDir, filterSnapshotFiles) - if err != nil { - t.Fatal(err) - } assert.NotEmpty(t, firstFiles) assert.NotEmpty(t, secondFiles) assert.Equal(t, len(firstFiles), len(secondFiles)) diff --git a/tests/integration/raft_log_snapshot_test.go b/tests/integration/raft_log_snapshot_test.go index b6e5790a8b1..198926193a2 100644 --- a/tests/integration/raft_log_snapshot_test.go +++ b/tests/integration/raft_log_snapshot_test.go @@ -52,7 +52,7 @@ func TestRaftLogSnapshotExistsPostStartUp(t *testing.T) { // NOTE: When starting a new cluster with 1 member, the member will // apply 3 ConfChange directly at the beginning, setting the applied index to 4. for i := 0; i < 102-4; i++ { - _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}) + _, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}) if err != nil { t.Fatalf("#%d: couldn't put key (%v)", i, err) } From 6136355dcf8dbc6c921ebb17ee7cd338749eb0f3 Mon Sep 17 00:00:00 2001 From: Clement Date: Sat, 31 Aug 2024 23:19:00 +0800 Subject: [PATCH 10/13] fix: improve readability Signed-off-by: Clement --- server/etcdserver/server.go | 2 +- server/etcdserver/server_test.go | 22 +++++---- tests/e2e/v2store_deprecation_test.go | 6 +-- tests/framework/e2e/cluster.go | 2 +- tests/framework/integration/cluster.go | 12 ++++- tests/framework/testutils/helpters.go | 14 ------ tests/integration/raft_log_snapshot_test.go | 52 ++++++++++++++++++--- tests/integration/v3_watch_restore_test.go | 2 +- 8 files changed, 76 insertions(+), 36 deletions(-) diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 407d740e579..15beac3328c 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -1212,7 +1212,7 @@ func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) { func (s *EtcdServer) shouldSnapshot(ep *etcdProgress) bool { return (s.forceSnapshot && ep.appliedi != ep.snapi) || (ep.appliedi-ep.snapi > s.Cfg.SnapshotCount) || - // ensures a non-empty snapshot always exists + // make sure a non-empty snapshot always exists (ep.snapi == 0 && ep.appliedi > ep.snapi) } diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index 65ebee05013..33e6481e547 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -102,14 +102,17 @@ func TestApplyRepeat(t *testing.T) { kv: mvcc.New(zaptest.NewLogger(t), be, &lease.FakeLessor{}, mvcc.StoreConfig{}), } s.start() + + n.readyc <- newDummyPutReqReady() + req := &pb.InternalRaftRequest{ Header: &pb.RequestHeader{ID: 1}, Put: &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}, } ents := []raftpb.Entry{{Index: 1, Data: pbutil.MustMarshal(req)}} - n.readyc <- raft.Ready{Entries: ents, CommittedEntries: ents} + n.readyc <- raft.Ready{CommittedEntries: ents} // dup msg - n.readyc <- raft.Ready{Entries: ents, CommittedEntries: ents} + n.readyc <- raft.Ready{CommittedEntries: ents} // use a conf change to block until dup msgs are all processed cc := &raftpb.ConfChange{Type: raftpb.ConfChangeRemoveNode, NodeID: 2} @@ -890,7 +893,7 @@ func TestAddMember(t *testing.T) { } s.start() - n.readyc <- createDummyReady() + n.readyc <- newDummyPutReqReady() m := membership.Member{ID: 1234, RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"foo"}}} _, err := s.AddMember(context.Background(), m) @@ -1001,7 +1004,7 @@ func TestRemoveMember(t *testing.T) { } s.start() - n.readyc <- createDummyReady() + n.readyc <- newDummyPutReqReady() _, err := s.RemoveMember(context.Background(), 1234) gaction := n.Action() @@ -1054,7 +1057,7 @@ func TestUpdateMember(t *testing.T) { } s.start() - n.readyc <- createDummyReady() + n.readyc <- newDummyPutReqReady() wm := membership.Member{ID: 1234, RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://127.0.0.1:1"}}} _, err := s.UpdateMember(context.Background(), wm) @@ -1595,11 +1598,14 @@ func TestIsActive(t *testing.T) { } } -// createDummyReady creates a raft Ready that can be sent to readyc to prevent crashes during snapshots. -func createDummyReady() raft.Ready { +// newDummyPutReqReady is useful in unit tests with a partially functional raft.Node +// (nodeConfChangeCommitterRecorder) that doesn't always append raft log entries properly. +// When this happens, it can crash when creating a raft log snapshot due to missing entries. +// To prevent this crash, you can send put requests to raft.Node's readyc after the server starts. +func newDummyPutReqReady() raft.Ready { req := &pb.InternalRaftRequest{ Header: &pb.RequestHeader{ID: 1}, - Put: &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}, + Put: &pb.PutRequest{Key: []byte("newDummyPutReqReady"), Value: []byte("bar")}, } ents := []raftpb.Entry{{Index: 1, Data: pbutil.MustMarshal(req)}} diff --git a/tests/e2e/v2store_deprecation_test.go b/tests/e2e/v2store_deprecation_test.go index 2ab87a91ecf..3384f8f2682 100644 --- a/tests/e2e/v2store_deprecation_test.go +++ b/tests/e2e/v2store_deprecation_test.go @@ -156,9 +156,11 @@ func TestV2DeprecationSnapshotMatches(t *testing.T) { if err != nil { t.Fatal(err) } + assert.NotEmpty(t, firstFiles) + assert.NotEmpty(t, secondFiles) if lastVer.LessThan(version.V3_6) && (version.V3_6.Equal(*currVer) || version.V3_6.LessThan(*currVer)) { - assert.Equal(t, len(firstFiles)+1, len(secondFiles), "etcd v3.6 should creates an additional raft log snapshot on startup") + assert.Equal(t, len(firstFiles)+1, len(secondFiles), "etcd v3.6 should create a snapshot of raft log snapshot on startup") t.Skipf("raft log snapshots of %v are supposed to differ from of %v", currVer, lastVer) } @@ -263,8 +265,6 @@ func filterSnapshotFiles(path string) bool { func assertSnapshotsMatch(t testing.TB, firstFiles, secondFiles []string, patch func([]byte) []byte) { lg := zaptest.NewLogger(t) - assert.NotEmpty(t, firstFiles) - assert.NotEmpty(t, secondFiles) assert.Equal(t, len(firstFiles), len(secondFiles)) sort.Strings(firstFiles) sort.Strings(secondFiles) diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index 97040515807..23d422aa313 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -786,7 +786,7 @@ func (epc *EtcdProcessCluster) CloseProc(ctx context.Context, finder func(EtcdPr } memberRemoved := false - for i := 0; i < 15; i++ { + for i := 0; i < 10; i++ { _, err := memberCtl.MemberRemove(ctx, memberID) if err != nil && strings.Contains(err.Error(), "member not found") { memberRemoved = true diff --git a/tests/framework/integration/cluster.go b/tests/framework/integration/cluster.go index b6a31374559..f2f8682545d 100644 --- a/tests/framework/integration/cluster.go +++ b/tests/framework/integration/cluster.go @@ -1700,7 +1700,17 @@ func (c *Cluster) MustNewMember(t testutil.TB, resp *clientv3.MemberAddResponse) var listeners []net.Listener for _, url := range urls { - l, err := testutils.ListenURL(&url) + var l net.Listener + var err error + switch url.Scheme { + case "http", "https": + l, err = net.Listen("tcp", url.Host) + case "unix", "unixs": + l, err = net.Listen("unix", url.Host) + default: + err = fmt.Errorf("unsupported scheme: %s", url.Scheme) + } + if err != nil { t.Fatal("failed to listen on %v: %v", url, err) } diff --git a/tests/framework/testutils/helpters.go b/tests/framework/testutils/helpters.go index 00e209a320d..91363176c2f 100644 --- a/tests/framework/testutils/helpters.go +++ b/tests/framework/testutils/helpters.go @@ -16,9 +16,6 @@ package testutils import ( "errors" - "fmt" - "net" - "net/url" "time" clientv3 "go.etcd.io/etcd/client/v3" @@ -72,14 +69,3 @@ func MustClient(c intf.Client, err error) intf.Client { } return c } - -func ListenURL(addr *url.URL) (net.Listener, error) { - switch addr.Scheme { - case "http", "https": - return net.Listen("tcp", addr.Host) - case "unix", "unixs": - return net.Listen("unix", addr.Path) - default: - return nil, fmt.Errorf("unsupported scheme: %s", addr.Scheme) - } -} diff --git a/tests/integration/raft_log_snapshot_test.go b/tests/integration/raft_log_snapshot_test.go index 198926193a2..c8ef6c75fe7 100644 --- a/tests/integration/raft_log_snapshot_test.go +++ b/tests/integration/raft_log_snapshot_test.go @@ -20,11 +20,15 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + pb "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/tests/v3/framework/integration" ) -// TestRaftLogSnapshotExistsPostStartUp ensures a non-empty raft log snapshot exists after startup +// TestRaftLogSnapshotExistsPostStartUp ensures a non-empty raft log +// snapshot is present after the server starts up. It also checks that +// subsequent snapshots work as they used to. func TestRaftLogSnapshotExistsPostStartUp(t *testing.T) { integration.BeforeTest(t) @@ -40,18 +44,20 @@ func TestRaftLogSnapshotExistsPostStartUp(t *testing.T) { ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) defer cancel() - _, err := m.LogObserver.Expect(ctx, "saved snapshot", 1) + lines, err := m.LogObserver.Expect(ctx, "saved snapshot", 1) + t.Logf("[expected line]: %v", lines[0]) + if err != nil { t.Fatalf("failed to expect (log:%s, count:%v): %v", "saved snapshot", 1, err) } - kvc := integration.ToGRPC(clus.RandClient()).KV + // NOTE: When starting a new cluster with 1 member, the member will + // apply 1 ConfChange directly at the beginning, setting the applied index to 1. + assert.Contains(t, lines[0], "{\"snapshot-index\": 1}") // In order to trigger another snapshot, we should increase applied index from 1 to 102. - // - // NOTE: When starting a new cluster with 1 member, the member will - // apply 3 ConfChange directly at the beginning, setting the applied index to 4. - for i := 0; i < 102-4; i++ { + kvc := integration.ToGRPC(clus.RandClient()).KV + for i := 0; i < 102; i++ { _, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}) if err != nil { t.Fatalf("#%d: couldn't put key (%v)", i, err) @@ -75,3 +81,35 @@ func TestRaftLogSnapshotExistsPostStartUp(t *testing.T) { t.Fatalf("unexpected error, max snapshots allowed is %d: %v", 2, err) } } + +// TestRaftLogSnapshotIndexInCluster ensures a non-empty raft log +// snapshot is present after the cluster starts up, and checks if +// the snapshot index is as expected. +func TestRaftLogSnapshotIndexInCluster(t *testing.T) { + integration.BeforeTest(t) + + clus := integration.NewCluster(t, &integration.ClusterConfig{ + Size: 3, + SnapshotCount: 100, + SnapshotCatchUpEntries: 10, + }) + defer clus.Terminate(t) + + m := clus.Members[0] + + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + defer cancel() + + lines, err := m.LogObserver.Expect(ctx, "saved snapshot", 1) + t.Logf("[expected line]: %v", lines[0]) + + if err != nil { + t.Fatalf("failed to expect (log:%s, count:%v): %v", "saved snapshot", 1, err) + } + + // NOTE: When starting a new cluster with 3 members, each member will + // apply 3 ConfChange directly at the beginning before a leader is + // elected. A snapshot of raft log is created, setting the snap + // index to 3. + assert.Contains(t, lines[0], "{\"snapshot-index\": 3}") +} diff --git a/tests/integration/v3_watch_restore_test.go b/tests/integration/v3_watch_restore_test.go index 3ad46ba0a44..31812f277fb 100644 --- a/tests/integration/v3_watch_restore_test.go +++ b/tests/integration/v3_watch_restore_test.go @@ -99,7 +99,7 @@ func TestV3WatchRestoreSnapshotUnsync(t *testing.T) { // NOTE: When starting a new cluster with 3 members, each member will // apply 3 ConfChange directly at the beginning before a leader is - // elected. Also, a snapshot of raft log is created, setting the snap + // elected. A snapshot of raft log is created, setting the snap // index to 3. Leader will apply 3 MemberAttrSet and 1 ClusterVersionSet // changes. So member 0 has index 8 in raft log before network // partition. We need to trigger EtcdServer.snapshot() at least twice. From 1955f1ec5f3fd624c1e95c83805f125f06e48b33 Mon Sep 17 00:00:00 2001 From: Clement Date: Sun, 1 Sep 2024 02:01:31 +0800 Subject: [PATCH 11/13] test: rewrite tests for raft log snapshot Signed-off-by: Clement --- tests/integration/raft_log_snapshot_test.go | 114 ++++++++++---------- 1 file changed, 58 insertions(+), 56 deletions(-) diff --git a/tests/integration/raft_log_snapshot_test.go b/tests/integration/raft_log_snapshot_test.go index c8ef6c75fe7..dafe8ef49ee 100644 --- a/tests/integration/raft_log_snapshot_test.go +++ b/tests/integration/raft_log_snapshot_test.go @@ -17,6 +17,7 @@ package integration import ( "context" "errors" + "fmt" "testing" "time" @@ -26,90 +27,91 @@ import ( "go.etcd.io/etcd/tests/v3/framework/integration" ) -// TestRaftLogSnapshotExistsPostStartUp ensures a non-empty raft log -// snapshot is present after the server starts up. It also checks that -// subsequent snapshots work as they used to. -func TestRaftLogSnapshotExistsPostStartUp(t *testing.T) { +func TestRaftLogSnapshotAlwaysExistsClusterOf1(t *testing.T) { + testRaftLogSnapshotExistsPostStartUp(t, 1) +} + +func TestRaftLogSnapshotAlwaysExistsClusterOf3(t *testing.T) { + testRaftLogSnapshotExistsPostStartUp(t, 3) +} + +// testRaftLogSnapshotExistsPostStartUp ensures +// - a non-empty raft log snapshot is present after the server starts up +// - the snapshot index is as expected +// - subsequent snapshots work as they used to +func testRaftLogSnapshotExistsPostStartUp(t *testing.T, size int) { integration.BeforeTest(t) clus := integration.NewCluster(t, &integration.ClusterConfig{ - Size: 1, + Size: size, SnapshotCount: 100, SnapshotCatchUpEntries: 10, }) defer clus.Terminate(t) - m := clus.Members[0] - + // expect the first snapshot to appear + // + // NOTE: When starting a new cluster with N member, each member will + // apply N ConfChange directly at the beginning, setting the applied index to N. + expectedSnapIndex := size ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) defer cancel() + for _, m := range clus.Members { + lines, err := m.LogObserver.Expect(ctx, "saved snapshot", 1) + for _, line := range lines { + t.Logf("[expected line]: %v", line) + } - lines, err := m.LogObserver.Expect(ctx, "saved snapshot", 1) - t.Logf("[expected line]: %v", lines[0]) + if err != nil { + t.Fatalf("failed to expect (log:%s, count:%v): %v", "saved snapshot", 1, err) + } - if err != nil { - t.Fatalf("failed to expect (log:%s, count:%v): %v", "saved snapshot", 1, err) + assert.Contains(t, lines[0], fmt.Sprintf("{\"snapshot-index\": %d}", expectedSnapIndex)) } - // NOTE: When starting a new cluster with 1 member, the member will - // apply 1 ConfChange directly at the beginning, setting the applied index to 1. - assert.Contains(t, lines[0], "{\"snapshot-index\": 1}") - - // In order to trigger another snapshot, we should increase applied index from 1 to 102. + // increase applied index from size to size + 101, to trigger the second snapshot + expectedSnapIndex = size + 101 kvc := integration.ToGRPC(clus.RandClient()).KV - for i := 0; i < 102; i++ { - _, err = kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}) + for i := 0; i < expectedSnapIndex; i++ { + _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}) if err != nil { t.Fatalf("#%d: couldn't put key (%v)", i, err) } } + // expect the second snapshot to appear ctx2, cancel2 := context.WithTimeout(context.TODO(), 5*time.Second) defer cancel2() + for _, m := range clus.Members { + lines, err := m.LogObserver.Expect(ctx2, "saved snapshot", 2) + for _, line := range lines { + t.Logf("[expected line]: %v", line) + } - _, err = m.LogObserver.Expect(ctx2, "saved snapshot", 2) - if err != nil { - t.Fatalf("failed to expect (log:%s, count:%v): %v", "saved snapshot", 1, err) + if err != nil { + t.Fatalf("failed to expect (log:%s, count:%v): %v", "saved snapshot", 2, err) + } + + assert.Contains(t, lines[1], fmt.Sprintf("{\"snapshot-index\": %d}", expectedSnapIndex)) } + // expect the third snapshot doesn't appear + errC := make(chan error) ctx3, cancel3 := context.WithTimeout(context.TODO(), 5*time.Second) defer cancel3() - - // Expect function should return a DeadlineExceeded error to ensure no more snapshots are present - _, err = m.LogObserver.Expect(ctx3, "saved snapshot", 3) - if !errors.Is(err, context.DeadlineExceeded) { - t.Fatalf("unexpected error, max snapshots allowed is %d: %v", 2, err) + for _, m := range clus.Members { + go func() { + // m.LogObserver.Expect should return a DeadlineExceeded error to confirm there are no more snapshots + _, err := m.LogObserver.Expect(ctx3, "saved snapshot", 3) + if !errors.Is(err, context.DeadlineExceeded) { + errC <- fmt.Errorf("expected a DeadlineExceeded error, got %v, max snapshots allowed is %d", err, 2) + } + }() } -} -// TestRaftLogSnapshotIndexInCluster ensures a non-empty raft log -// snapshot is present after the cluster starts up, and checks if -// the snapshot index is as expected. -func TestRaftLogSnapshotIndexInCluster(t *testing.T) { - integration.BeforeTest(t) - - clus := integration.NewCluster(t, &integration.ClusterConfig{ - Size: 3, - SnapshotCount: 100, - SnapshotCatchUpEntries: 10, - }) - defer clus.Terminate(t) - - m := clus.Members[0] - - ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) - defer cancel() - - lines, err := m.LogObserver.Expect(ctx, "saved snapshot", 1) - t.Logf("[expected line]: %v", lines[0]) - - if err != nil { - t.Fatalf("failed to expect (log:%s, count:%v): %v", "saved snapshot", 1, err) + select { + case err := <-errC: + t.Fatal(err) + case <-ctx3.Done(): } - - // NOTE: When starting a new cluster with 3 members, each member will - // apply 3 ConfChange directly at the beginning before a leader is - // elected. A snapshot of raft log is created, setting the snap - // index to 3. - assert.Contains(t, lines[0], "{\"snapshot-index\": 3}") } From e40267f8e636d4f277fd665920604b9559a00c09 Mon Sep 17 00:00:00 2001 From: Clement Date: Sun, 1 Sep 2024 02:45:15 +0800 Subject: [PATCH 12/13] fix: improve readability Signed-off-by: Clement --- server/etcdserver/server.go | 4 ++-- tests/e2e/v2store_deprecation_test.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 15beac3328c..40d979dc9c5 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -2186,8 +2186,8 @@ func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) { // keep some in memory log entries for slow followers. if snapi > s.Cfg.SnapshotCatchUpEntries { compacti := snapi - s.Cfg.SnapshotCatchUpEntries - // After calling s.r.raftStorage.Compact, compacti becomes the first (dummy) entry. - // Therefore, compacti must be less than appliedi (snapi). + // if a compaction occurs, the index value of the first entry(dummy) in raft log + // will be `compacti`. So, `compacti` must be less than `appliedi` (`snapi`). err = s.r.raftStorage.Compact(compacti) if err != nil { // the compaction was done asynchronously with the progress of raft. diff --git a/tests/e2e/v2store_deprecation_test.go b/tests/e2e/v2store_deprecation_test.go index 3384f8f2682..b7fb5e0d407 100644 --- a/tests/e2e/v2store_deprecation_test.go +++ b/tests/e2e/v2store_deprecation_test.go @@ -160,7 +160,7 @@ func TestV2DeprecationSnapshotMatches(t *testing.T) { assert.NotEmpty(t, secondFiles) if lastVer.LessThan(version.V3_6) && (version.V3_6.Equal(*currVer) || version.V3_6.LessThan(*currVer)) { - assert.Equal(t, len(firstFiles)+1, len(secondFiles), "etcd v3.6 should create a snapshot of raft log snapshot on startup") + assert.Equal(t, len(firstFiles)+1, len(secondFiles), "etcd v3.6 should create a snapshot of raft log on startup") t.Skipf("raft log snapshots of %v are supposed to differ from of %v", currVer, lastVer) } From ccbec075e2e180a294935b973d15409a0ea73a8d Mon Sep 17 00:00:00 2001 From: Clement Date: Sun, 1 Sep 2024 04:06:20 +0800 Subject: [PATCH 13/13] fix: improve readability Signed-off-by: Clement --- server/etcdserver/server_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index 33e6481e547..a25066329a6 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -1601,7 +1601,7 @@ func TestIsActive(t *testing.T) { // newDummyPutReqReady is useful in unit tests with a partially functional raft.Node // (nodeConfChangeCommitterRecorder) that doesn't always append raft log entries properly. // When this happens, it can crash when creating a raft log snapshot due to missing entries. -// To prevent this crash, you can send put requests to raft.Node's readyc after the server starts. +// To prevent this crash, we can send put requests to raft.Node's readyc after the server starts. func newDummyPutReqReady() raft.Ready { req := &pb.InternalRaftRequest{ Header: &pb.RequestHeader{ID: 1},