From 6d941b9b7aae32b20c8725b5a711d3def3e07a82 Mon Sep 17 00:00:00 2001 From: Aleksander Mistewicz Date: Tue, 10 Dec 2024 12:19:44 +0100 Subject: [PATCH] Run Member tests in parallel Introduce port allocator and remove unused MemberNumber. Add UniquePortAlloc config field so that default behavior will stay unchanged. On my local machine it brings down execution time from 5m to 32s. Issue: https://github.com/etcd-io/etcd/issues/18983 Change-Id: Ibd49dafecaaab85966934ed9d314d23912fbac32 Signed-off-by: Aleksander Mistewicz --- tests/common/member_test.go | 9 ++++ tests/framework/config/cluster.go | 5 +++ tests/framework/e2e/cluster.go | 14 ++++++- tests/framework/e2e/e2e.go | 3 ++ tests/framework/e2e/etcd_process.go | 14 +++++-- tests/framework/e2e/port_alloc.go | 58 ++++++++++++++++++++++++++ tests/framework/integration/cluster.go | 10 ++--- 7 files changed, 101 insertions(+), 12 deletions(-) create mode 100644 tests/framework/e2e/port_alloc.go diff --git a/tests/common/member_test.go b/tests/common/member_test.go index 1efb95039a66..e0e3a3522f96 100644 --- a/tests/common/member_test.go +++ b/tests/common/member_test.go @@ -34,8 +34,11 @@ func TestMemberList(t *testing.T) { for _, tc := range clusterTestCases() { t.Run(tc.name, func(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() + tc.config.UniquePortAlloc = true clus := testRunner.NewCluster(ctx, t, config.WithClusterConfig(tc.config)) defer clus.Close() cc := testutils.MustClient(clus.Client()) @@ -113,6 +116,8 @@ func TestMemberAdd(t *testing.T) { for _, quorumTc := range quorumTcs { for _, clusterTc := range clusterTestCases() { t.Run(learnerTc.name+"/"+quorumTc.name+"/"+clusterTc.name, func(t *testing.T) { + t.Parallel() + ctxTimeout := 10 * time.Second if quorumTc.waitForQuorum { ctxTimeout += etcdserver.HealthInterval @@ -121,6 +126,7 @@ func TestMemberAdd(t *testing.T) { defer cancel() c := clusterTc.config c.StrictReconfigCheck = quorumTc.strictReconfigCheck + c.UniquePortAlloc = true clus := testRunner.NewCluster(ctx, t, config.WithClusterConfig(c)) defer clus.Close() cc := testutils.MustClient(clus.Client()) @@ -198,10 +204,13 @@ func TestMemberRemove(t *testing.T) { continue } t.Run(quorumTc.name+"/"+clusterTc.name, func(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), 14*time.Second) defer cancel() c := clusterTc.config c.StrictReconfigCheck = quorumTc.strictReconfigCheck + c.UniquePortAlloc = true clus := testRunner.NewCluster(ctx, t, config.WithClusterConfig(c)) defer clus.Close() // client connects to a specific member which won't be removed from cluster diff --git a/tests/framework/config/cluster.go b/tests/framework/config/cluster.go index 0e6ec561afb9..260980683989 100644 --- a/tests/framework/config/cluster.go +++ b/tests/framework/config/cluster.go @@ -36,6 +36,7 @@ type ClusterConfig struct { StrictReconfigCheck bool AuthToken string SnapshotCount uint64 + UniquePortAlloc bool // ClusterContext is used by "e2e" or "integration" to extend the // ClusterConfig. The common test cases shouldn't care about what @@ -88,3 +89,7 @@ func WithSnapshotCount(count uint64) ClusterOption { func WithStrictReconfigCheck(strict bool) ClusterOption { return func(c *ClusterConfig) { c.StrictReconfigCheck = strict } } + +func WithUniquePortAlloc() ClusterOption { + return func(c *ClusterConfig) { c.UniquePortAlloc = true } +} diff --git a/tests/framework/e2e/cluster.go b/tests/framework/e2e/cluster.go index 083dcc7a077f..46ff29a7e195 100644 --- a/tests/framework/e2e/cluster.go +++ b/tests/framework/e2e/cluster.go @@ -518,6 +518,16 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in peer2Port := port + 3 clientHTTPPort := port + 4 + var allocatedPorts []int + if cfg.BasePort == -1 { + clientPort = uniquePorts.Alloc() + peerPort = uniquePorts.Alloc() + metricsPort = uniquePorts.Alloc() + peer2Port = uniquePorts.Alloc() + clientHTTPPort = uniquePorts.Alloc() + allocatedPorts = []int{clientPort, peerPort, metricsPort, peer2Port, clientHTTPPort} + } + if cfg.Client.ConnectionType == ClientTLSAndNonTLS { curl = clientURL(cfg.ClientScheme(), clientPort, ClientNonTLS) curls = []string{curl, clientURL(cfg.ClientScheme(), clientPort, ClientTLS)} @@ -639,7 +649,8 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in } var gofailPort int if cfg.GoFailEnabled { - gofailPort = (i+1)*10000 + 2381 + gofailPort = uniquePorts.Alloc() + allocatedPorts = append(allocatedPorts, gofailPort) envVars["GOFAIL_HTTP"] = fmt.Sprintf("127.0.0.1:%d", gofailPort) } @@ -662,6 +673,7 @@ func (cfg *EtcdProcessClusterConfig) EtcdServerProcessConfig(tb testing.TB, i in GoFailClientTimeout: cfg.GoFailClientTimeout, Proxy: proxyCfg, LazyFSEnabled: cfg.LazyFSEnabled, + AllocatedPorts: allocatedPorts, } } diff --git a/tests/framework/e2e/e2e.go b/tests/framework/e2e/e2e.go index f78df57926ea..9db02af93755 100644 --- a/tests/framework/e2e/e2e.go +++ b/tests/framework/e2e/e2e.go @@ -83,6 +83,9 @@ func (e e2eRunner) NewCluster(ctx context.Context, t testing.TB, opts ...config. default: t.Fatalf("PeerTLS config %q not supported", cfg.PeerTLS) } + if cfg.UniquePortAlloc { + e2eConfig.BasePort = -1 + } epc, err := NewEtcdProcessCluster(ctx, t, WithConfig(e2eConfig)) if err != nil { t.Fatalf("could not start etcd integrationCluster: %s", err) diff --git a/tests/framework/e2e/etcd_process.go b/tests/framework/e2e/etcd_process.go index 445ea26e94cc..23f937df15c8 100644 --- a/tests/framework/e2e/etcd_process.go +++ b/tests/framework/e2e/etcd_process.go @@ -88,10 +88,11 @@ type EtcdServerProcessConfig struct { Name string - PeerURL url.URL - ClientURL string - ClientHTTPURL string - MetricsURL string + PeerURL url.URL + ClientURL string + ClientHTTPURL string + MetricsURL string + AllocatedPorts []int InitialToken string InitialCluster string @@ -248,6 +249,11 @@ func (ep *EtcdServerProcess) Close() error { ep.cfg.lg.Info("removing directory", zap.String("data-dir", ep.cfg.DataDirPath)) return os.RemoveAll(ep.cfg.DataDirPath) } + + for _, port := range ep.cfg.AllocatedPorts { + uniquePorts.Free(port) + } + ep.cfg.AllocatedPorts = nil return nil } diff --git a/tests/framework/e2e/port_alloc.go b/tests/framework/e2e/port_alloc.go new file mode 100644 index 000000000000..02e1831edbf8 --- /dev/null +++ b/tests/framework/e2e/port_alloc.go @@ -0,0 +1,58 @@ +// Copyright 2024 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import "sync" + +// uniquePorts is a global instance of testPorts. +var uniquePorts *testPorts + +func init() { + uniquePorts = newTestPorts(11000, 19000) +} + +// testPorts is used to allocate listen ports for etcd instance in tests +// in a safe way for concurrent use (i.e. running tests in parallel). +type testPorts struct { + mux sync.Mutex + unused map[int]bool +} + +// newTestPorts keeps track of unused ports in the specified range. +func newTestPorts(start, end int) *testPorts { + m := make(map[int]bool, end-start) + for i := start; i < end; i++ { + m[i] = true + } + return &testPorts{unused: m} +} + +// Alloc allocates a new port or panics if none is available. +func (pa *testPorts) Alloc() int { + pa.mux.Lock() + defer pa.mux.Unlock() + for port := range pa.unused { + delete(pa.unused, port) + return port + } + panic("all ports are used") +} + +// Free makes port available for allocation through Alloc. +func (pa *testPorts) Free(port int) { + pa.mux.Lock() + defer pa.mux.Unlock() + pa.unused[port] = true +} diff --git a/tests/framework/integration/cluster.go b/tests/framework/integration/cluster.go index f78af573f348..8e117f47ecd1 100644 --- a/tests/framework/integration/cluster.go +++ b/tests/framework/integration/cluster.go @@ -260,13 +260,12 @@ func (c *Cluster) ProtoMembers() []*pb.Member { } func (c *Cluster) mustNewMember(t testutil.TB) *Member { - memberNumber := c.LastMemberNum + uniqueNumber := atomic.AddInt32(&UniqueNumber, 1)*10 + int32(c.LastMemberNum) c.LastMemberNum++ m := MustNewMember(t, MemberConfig{ - Name: fmt.Sprintf("m%v", memberNumber), - MemberNumber: memberNumber, + Name: fmt.Sprintf("m%v", uniqueNumber), AuthToken: c.Cfg.AuthToken, PeerTLS: c.Cfg.PeerTLS, ClientTLS: c.Cfg.ClientTLS, @@ -549,7 +548,6 @@ func NewListenerWithAddr(t testutil.TB, addr string) net.Listener { type Member struct { config.ServerConfig UniqNumber int - MemberNumber int Port string PeerListeners, ClientListeners []net.Listener GRPCListener net.Listener @@ -591,7 +589,6 @@ type Member struct { type MemberConfig struct { Name string UniqNumber int64 - MemberNumber int PeerTLS *transport.TLSInfo ClientTLS *transport.TLSInfo AuthToken string @@ -624,8 +621,7 @@ type MemberConfig struct { func MustNewMember(t testutil.TB, mcfg MemberConfig) *Member { var err error m := &Member{ - MemberNumber: mcfg.MemberNumber, - UniqNumber: int(atomic.AddInt32(&UniqueCount, 1)), + UniqNumber: int(atomic.AddInt32(&UniqueCount, 1)), } peerScheme := SchemeFromTLSInfo(mcfg.PeerTLS)