From 46570f68cfaf879818f9cbeec4a7688963ff5a31 Mon Sep 17 00:00:00 2001 From: Aleksander Mistewicz Date: Tue, 10 Dec 2024 12:19:44 +0100 Subject: [PATCH] Run Member tests in parallel Introduce port allocator and remove unused MemberNumber. On my local machine it brings down execution time from 5m to 32s. Issue: https://github.com/etcd-io/etcd/issues/18983 Signed-off-by: Aleksander Mistewicz --- tests/common/member_test.go | 6 +++ tests/framework/e2e/cluster.go | 17 ++++++-- tests/framework/e2e/etcd_process.go | 14 +++++-- tests/framework/e2e/port_alloc.go | 58 ++++++++++++++++++++++++++ tests/framework/integration/cluster.go | 10 ++--- 5 files changed, 90 insertions(+), 15 deletions(-) create mode 100644 tests/framework/e2e/port_alloc.go diff --git a/tests/common/member_test.go b/tests/common/member_test.go index 1efb95039a66..5a40fff53a11 100644 --- a/tests/common/member_test.go +++ b/tests/common/member_test.go @@ -34,6 +34,8 @@ func TestMemberList(t *testing.T) { for _, tc := range clusterTestCases() { t.Run(tc.name, func(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() clus := testRunner.NewCluster(ctx, t, config.WithClusterConfig(tc.config)) @@ -113,6 +115,8 @@ func TestMemberAdd(t *testing.T) { for _, quorumTc := range quorumTcs { for _, clusterTc := range clusterTestCases() { t.Run(learnerTc.name+"/"+quorumTc.name+"/"+clusterTc.name, func(t *testing.T) { + t.Parallel() + ctxTimeout := 10 * time.Second if quorumTc.waitForQuorum { ctxTimeout += etcdserver.HealthInterval @@ -198,6 +202,8 @@ func TestMemberRemove(t *testing.T) { continue } t.Run(quorumTc.name+"/"+clusterTc.name, func(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), 14*time.Second) defer cancel() c := clusterTc.config diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index 4aff11b9d6f9..7e87e85d55a3 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -407,9 +407,6 @@ func InitEtcdProcessCluster(t testing.TB, cfg *EtcdProcessClusterConfig) (*EtcdP if cfg.Logger == nil { cfg.Logger = zaptest.NewLogger(t) } - if cfg.BasePort == 0 { - cfg.BasePort = EtcdProcessBasePort - } if cfg.ServerConfig.SnapshotCount == 0 { cfg.ServerConfig.SnapshotCount = etcdserver.DefaultSnapshotCount } @@ -518,6 +515,16 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in peer2Port := port + 3 clientHTTPPort := port + 4 + var allocatedPorts []int + if cfg.BasePort == 0 { + clientPort = uniquePorts.Alloc() + peerPort = uniquePorts.Alloc() + metricsPort = uniquePorts.Alloc() + peer2Port = uniquePorts.Alloc() + clientHTTPPort = uniquePorts.Alloc() + allocatedPorts = []int{clientPort, peerPort, metricsPort, peer2Port, clientHTTPPort} + } + if cfg.Client.ConnectionType == ClientTLSAndNonTLS { curl = clientURL(cfg.ClientScheme(), clientPort, ClientNonTLS) curls = []string{curl, clientURL(cfg.ClientScheme(), clientPort, ClientTLS)} @@ -639,7 +646,8 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in } var gofailPort int if cfg.GoFailEnabled { - gofailPort = (i+1)*10000 + 2381 + gofailPort = uniquePorts.Alloc() + allocatedPorts = append(allocatedPorts, gofailPort) envVars["GOFAIL_HTTP"] = fmt.Sprintf("127.0.0.1:%d", gofailPort) } @@ -662,6 +670,7 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in GoFailClientTimeout: cfg.GoFailClientTimeout, Proxy: proxyCfg, LazyFSEnabled: cfg.LazyFSEnabled, + AllocatedPorts: allocatedPorts, } } diff --git a/tests/framework/e2e/etcd_process.go b/tests/framework/e2e/etcd_process.go index 445ea26e94cc..23f937df15c8 100644 --- a/tests/framework/e2e/etcd_process.go +++ b/tests/framework/e2e/etcd_process.go @@ -88,10 +88,11 @@ type EtcdServerProcessConfig struct { Name string - PeerURL url.URL - ClientURL string - ClientHTTPURL string - MetricsURL string + PeerURL url.URL + ClientURL string + ClientHTTPURL string + MetricsURL string + AllocatedPorts []int InitialToken string InitialCluster string @@ -248,6 +249,11 @@ func (ep *EtcdServerProcess) Close() error { ep.cfg.lg.Info("removing directory", zap.String("data-dir", ep.cfg.DataDirPath)) return os.RemoveAll(ep.cfg.DataDirPath) } + + for _, port := range ep.cfg.AllocatedPorts { + uniquePorts.Free(port) + } + ep.cfg.AllocatedPorts = nil return nil } diff --git a/tests/framework/e2e/port_alloc.go b/tests/framework/e2e/port_alloc.go new file mode 100644 index 000000000000..02e1831edbf8 --- /dev/null +++ b/tests/framework/e2e/port_alloc.go @@ -0,0 +1,58 @@ +// Copyright 2024 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import "sync" + +// uniquePorts is a global instance of testPorts. +var uniquePorts *testPorts + +func init() { + uniquePorts = newTestPorts(11000, 19000) +} + +// testPorts is used to allocate listen ports for etcd instance in tests +// in a safe way for concurrent use (i.e. running tests in parallel). +type testPorts struct { + mux sync.Mutex + unused map[int]bool +} + +// newTestPorts keeps track of unused ports in the specified range. +func newTestPorts(start, end int) *testPorts { + m := make(map[int]bool, end-start) + for i := start; i < end; i++ { + m[i] = true + } + return &testPorts{unused: m} +} + +// Alloc allocates a new port or panics if none is available. +func (pa *testPorts) Alloc() int { + pa.mux.Lock() + defer pa.mux.Unlock() + for port := range pa.unused { + delete(pa.unused, port) + return port + } + panic("all ports are used") +} + +// Free makes port available for allocation through Alloc. +func (pa *testPorts) Free(port int) { + pa.mux.Lock() + defer pa.mux.Unlock() + pa.unused[port] = true +} diff --git a/tests/framework/integration/cluster.go b/tests/framework/integration/cluster.go index f78af573f348..8e117f47ecd1 100644 --- a/tests/framework/integration/cluster.go +++ b/tests/framework/integration/cluster.go @@ -260,13 +260,12 @@ func (c *Cluster) ProtoMembers() []*pb.Member { } func (c *Cluster) mustNewMember(t testutil.TB) *Member { - memberNumber := c.LastMemberNum + uniqueNumber := atomic.AddInt32(&UniqueNumber, 1)*10 + int32(c.LastMemberNum) c.LastMemberNum++ m := MustNewMember(t, MemberConfig{ - Name: fmt.Sprintf("m%v", memberNumber), - MemberNumber: memberNumber, + Name: fmt.Sprintf("m%v", uniqueNumber), AuthToken: c.Cfg.AuthToken, PeerTLS: c.Cfg.PeerTLS, ClientTLS: c.Cfg.ClientTLS, @@ -549,7 +548,6 @@ func NewListenerWithAddr(t testutil.TB, addr string) net.Listener { type Member struct { config.ServerConfig UniqNumber int - MemberNumber int Port string PeerListeners, ClientListeners []net.Listener GRPCListener net.Listener @@ -591,7 +589,6 @@ type Member struct { type MemberConfig struct { Name string UniqNumber int64 - MemberNumber int PeerTLS *transport.TLSInfo ClientTLS *transport.TLSInfo AuthToken string @@ -624,8 +621,7 @@ type MemberConfig struct { func MustNewMember(t testutil.TB, mcfg MemberConfig) *Member { var err error m := &Member{ - MemberNumber: mcfg.MemberNumber, - UniqNumber: int(atomic.AddInt32(&UniqueCount, 1)), + UniqNumber: int(atomic.AddInt32(&UniqueCount, 1)), } peerScheme := SchemeFromTLSInfo(mcfg.PeerTLS)