diff --git a/tests/integration/clientv3/concurrency/revision_test.go b/tests/integration/clientv3/concurrency/revision_test.go new file mode 100644 index 000000000000..1a3a36175a28 --- /dev/null +++ b/tests/integration/clientv3/concurrency/revision_test.go @@ -0,0 +1,132 @@ +// Copyright 2016 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package concurrency_test + +import ( + "context" + "errors" + "fmt" + "strings" + "sync" + "testing" + "time" + + "go.etcd.io/etcd/tests/v3/framework/integration" + "google.golang.org/grpc/status" +) + +func TestRevisionMonotonicWithLeaderPartitions(t *testing.T) { + testRevisionMonotonicWithFailures(t, 11*time.Second, func(clus *integration.ClusterV3) { + for i:=0; i<5;i++ { + leader := clus.WaitLeader(t) + time.Sleep(time.Second) + clus.Members[leader].InjectPartition(t, clus.Members[(leader+1)%3], clus.Members[(leader+2)%3]) + time.Sleep(time.Second) + clus.Members[leader].RecoverPartition(t, clus.Members[(leader+1)%3], clus.Members[(leader+2)%3]) + } + }) +} + +func TestRevisionMonotonicWithLeadersRestarted(t *testing.T) { + testRevisionMonotonicWithFailures(t, 11 * time.Second, func(clus *integration.ClusterV3) { + for i:=0;i<5;i++ { + leader := clus.WaitLeader(t) + time.Sleep(time.Second) + clus.Members[leader].Stop(t) + time.Sleep(time.Second) + clus.Members[leader].Restart(t) + } + }) +} + +func testRevisionMonotonicWithFailures(t* testing.T, testDuration time.Duration, injectFailures func(clus *integration.ClusterV3)) { + integration.BeforeTest(t) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, UseBridge: true}) + defer clus.Terminate(t) + + ctx, cancel := context.WithTimeout(context.Background(), testDuration) + defer cancel() + + wg := sync.WaitGroup{} + for i:=0;i<10;i++ { + wg.Add(1) + go func() { + defer wg.Done() + putWorker(t, ctx, clus) + }() + } + + for i:=0;i<10;i++ { + wg.Add(1) + go func() { + defer wg.Done() + getWorker(t, ctx, clus) + }() + } + + injectFailures(clus) + wg.Wait() + kv := clus.Client(0) + resp, err := kv.Get(context.Background(), "foo") + if err != nil { + t.Fatal(err) + } + t.Logf("Revision %d", resp.Header.Revision) +} + +func putWorker(t *testing.T, ctx context.Context, clus *integration.ClusterV3) { + for i:=0;;i++ { + kv := clus.Client(i % 3) + _, err := kv.Put(ctx, "foo", fmt.Sprintf("%d", i)) + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + return + } + s := status.Convert(err) + switch s.Message() { + case "context deadline exceeded", "etcdserver: request timed out", "etcdserver: request timed out, possibly due to previous leader failure", "error reading from server: EOF", "read: connection reset by peer", "etcdserver: request timed out, possibly due to connection lost", " closing transport due to: connection error: desc = \"error reading from server: EOF\", received prior goaway: code: NO_ERROR": + continue + default: + if !strings.HasSuffix(s.Message(), "read: connection reset by peer") && !strings.HasSuffix(s.Message(), "use of closed network connection") { + t.Fatal(s.Message()) + } + } + } + } +} + +func getWorker(t *testing.T, ctx context.Context, clus *integration.ClusterV3) { + var prevRev int64 + for i := 0; ; i++ { + kv := clus.Client(i % 3) + resp, err := kv.Get(ctx, "foo") + if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + return + } + s := status.Convert(err) + switch s.Message() { + case "context deadline exceeded", "etcdserver: request timed out": + continue + default: + t.Fatal(s.Message()) + } + } + if prevRev > resp.Header.Revision { + t.Fatalf("rev is less than previously observed revision, rev: %d, prevRev: %d", resp.Header.Revision, prevRev) + } + prevRev = resp.Header.Revision + } +}