Skip to content

Commit

Permalink
Merge pull request #16457 from serathius/downgrade-membership
Browse files Browse the repository at this point in the history
Add membership changes to downgrade tests
  • Loading branch information
serathius authored Aug 22, 2023
2 parents 3794dfd + 1c0db87 commit 5a54fe6
Showing 1 changed file with 45 additions and 17 deletions.
62 changes: 45 additions & 17 deletions tests/e2e/cluster_downgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,25 @@ func testDowngradeUpgrade(t *testing.T, clusterSize int, triggerSnapshot bool) {
Storage: currentVersionStr,
})
}
cc := epc.Etcdctl()
t.Logf("Cluster created")
if len(epc.Procs) > 1 {
t.Log("Waiting health interval to required to make membership changes")
time.Sleep(etcdserver.HealthInterval)
}

t.Log("Adding member to test membership, but a learner avoid breaking quorum")
resp, err := cc.MemberAddAsLearner(context.Background(), "fake1", []string{"http://127.0.0.1:1001"})
require.NoError(t, err)
if triggerSnapshot {
t.Logf("Generating snapshot")
generateSnapshot(t, snapshotCount, epc)
generateSnapshot(t, snapshotCount, cc)
verifySnapshot(t, epc)
}
bm, bkv := getMembersAndKeys(t, epc)
t.Log("Removing learner to test membership")
_, err = cc.MemberRemove(context.Background(), resp.Member.ID)
require.NoError(t, err)
beforeMembers, beforeKV := getMembersAndKeys(t, cc)

t.Logf("etcdctl downgrade enable %s", lastVersionStr)
downgradeEnable(t, epc, lastVersion)
Expand Down Expand Up @@ -130,9 +143,26 @@ func testDowngradeUpgrade(t *testing.T, clusterSize int, triggerSnapshot bool) {
}

t.Log("Downgrade complete")
am, akv := getMembersAndKeys(t, epc)
assert.Equal(t, bkv.Kvs, akv.Kvs)
assert.Equal(t, bm.Members, am.Members)
afterMembers, afterKV := getMembersAndKeys(t, cc)
assert.Equal(t, beforeKV.Kvs, afterKV.Kvs)
assert.Equal(t, beforeMembers.Members, afterMembers.Members)

if len(epc.Procs) > 1 {
t.Log("Waiting health interval to required to make membership changes")
time.Sleep(etcdserver.HealthInterval)
}
t.Log("Adding learner to test membership, but avoid breaking quorum")
resp, err = cc.MemberAddAsLearner(context.Background(), "fake2", []string{"http://127.0.0.1:1002"})
require.NoError(t, err)
if triggerSnapshot {
t.Logf("Generating snapshot")
generateSnapshot(t, snapshotCount, cc)
verifySnapshot(t, epc)
}
t.Log("Removing learner to test membership")
_, err = cc.MemberRemove(context.Background(), resp.Member.ID)
require.NoError(t, err)
beforeMembers, beforeKV = getMembersAndKeys(t, cc)

t.Logf("Starting upgrade process to %q", currentVersionStr)
for i := 0; i < len(epc.Procs); i++ {
Expand All @@ -153,6 +183,10 @@ func testDowngradeUpgrade(t *testing.T, clusterSize int, triggerSnapshot bool) {
})
}
t.Log("Upgrade complete")

afterMembers, afterKV = getMembersAndKeys(t, cc)
assert.Equal(t, beforeKV.Kvs, afterKV.Kvs)
assert.Equal(t, beforeMembers.Members, afterMembers.Members)
}

func newCluster(t *testing.T, clusterSize int, snapshotCount uint64) *e2e.EtcdProcessCluster {
Expand Down Expand Up @@ -201,16 +235,17 @@ func validateVersion(t *testing.T, cfg *e2e.EtcdProcessClusterConfig, member e2e
for {
result, err := getMemberVersionByCurl(cfg, member)
if err != nil {
cfg.Logger.Warn("failed to get member version and retrying", zap.Error(err))
cfg.Logger.Warn("failed to get member version and retrying", zap.Error(err), zap.String("member", member.Config().Name))
time.Sleep(time.Second)
continue
}

cfg.Logger.Info("Comparing versions", zap.String("member", member.Config().Name), zap.Any("got", result), zap.Any("want", expect))
if err := compareMemberVersion(expect, result); err != nil {
cfg.Logger.Warn("failed to validate and retrying", zap.Error(err))
cfg.Logger.Warn("Versions didn't match retrying", zap.Error(err), zap.String("member", member.Config().Name))
time.Sleep(time.Second)
continue
}
cfg.Logger.Info("Versions match", zap.String("member", member.Config().Name))
break
}
})
Expand Down Expand Up @@ -271,20 +306,16 @@ func getMemberVersionByCurl(cfg *e2e.EtcdProcessClusterConfig, member e2e.EtcdPr
return result, nil
}

func generateSnapshot(t *testing.T, snapshotCount uint64, epc *e2e.EtcdProcessCluster) {
func generateSnapshot(t *testing.T, snapshotCount uint64, cc *e2e.EtcdctlV3) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

cc, err := e2e.NewEtcdctl(epc.Cfg.Client, epc.EndpointsGRPC())
assert.NoError(t, err)

var i uint64
t.Logf("Adding keys")
for i = 0; i < snapshotCount*3; i++ {
err := cc.Put(ctx, fmt.Sprintf("%d", i), "1", config.PutOptions{})
assert.NoError(t, err)
}
verifySnapshot(t, epc)
}

func verifySnapshot(t *testing.T, epc *e2e.EtcdProcessCluster) {
Expand Down Expand Up @@ -315,13 +346,10 @@ func verifySnapshotMembers(t *testing.T, epc *e2e.EtcdProcessCluster, expectedMe
t.Log("All members have a valid snapshot")
}

func getMembersAndKeys(t *testing.T, epc *e2e.EtcdProcessCluster) (*clientv3.MemberListResponse, *clientv3.GetResponse) {
func getMembersAndKeys(t *testing.T, cc *e2e.EtcdctlV3) (*clientv3.MemberListResponse, *clientv3.GetResponse) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

cc, err := e2e.NewEtcdctl(epc.Cfg.Client, epc.EndpointsGRPC())
assert.NoError(t, err)

kvs, err := cc.Get(ctx, "", config.GetOptions{Prefix: true})
assert.NoError(t, err)

Expand Down

0 comments on commit 5a54fe6

Please sign in to comment.