From e3fb8990fbdf8dbaaaebdf2bdc10b98d8f276d79 Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Fri, 10 Jan 2025 17:50:56 +0000 Subject: [PATCH] Create a v2 snapshot when running etcdutl migrate command Also added test to cover the etcdutl migrate command Signed-off-by: Benjamin Wang --- etcdutl/etcdutl/common.go | 90 ++++++++++++++ etcdutl/etcdutl/common_test.go | 191 +++++++++++++++++++++++++++++ etcdutl/etcdutl/migrate_command.go | 17 ++- tests/e2e/utl_migrate_test.go | 42 ++++++- 4 files changed, 334 insertions(+), 6 deletions(-) diff --git a/etcdutl/etcdutl/common.go b/etcdutl/etcdutl/common.go index c7473cc7fd7..041e6de1259 100644 --- a/etcdutl/etcdutl/common.go +++ b/etcdutl/etcdutl/common.go @@ -16,14 +16,19 @@ package etcdutl import ( "errors" + "fmt" "go.uber.org/zap" "go.uber.org/zap/zapcore" "go.etcd.io/etcd/client/pkg/v3/logutil" "go.etcd.io/etcd/pkg/v3/cobrautl" + "go.etcd.io/etcd/server/v3/etcdserver" + "go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/etcdserver/api/snap" + "go.etcd.io/etcd/server/v3/storage/backend" "go.etcd.io/etcd/server/v3/storage/datadir" + "go.etcd.io/etcd/server/v3/storage/schema" "go.etcd.io/etcd/server/v3/storage/wal" "go.etcd.io/etcd/server/v3/storage/wal/walpb" "go.etcd.io/raft/v3/raftpb" @@ -68,3 +73,88 @@ func getLatestV2Snapshot(lg *zap.Logger, dataDir string) (*raftpb.Snapshot, erro return snapshot, nil } + +func createV2SnapshotFromV3Store(dataDir string, be backend.Backend) error { + var ( + lg = GetLogger() + + snapDir = datadir.ToSnapDir(dataDir) + walDir = datadir.ToWALDir(dataDir) + ) + + ci, term := schema.ReadConsistentIndex(be.ReadTx()) + + cl := membership.NewCluster(lg) + cl.SetBackend(schema.NewMembershipBackend(lg, be)) + cl.UnsafeLoad() + + latestWALSnap, err := getLatestWALSnap(lg, dataDir) + if err != nil { + return err + } + + // Each time before creating the v2 snapshot, etcdserve always flush + // the backend storage (bbolt db), so the consistent index should never + // less than the Index or term of the latest snapshot. + if ci < latestWALSnap.Index || term < latestWALSnap.Term { + // This should never happen + return fmt.Errorf("consistent_index [Index: %d, Term: %d] is less than the latest snapshot [Index: %d, Term: %d]", ci, term, latestWALSnap.Index, latestWALSnap.Term) + } + + voters, learners := getVotersAndLearners(cl) + confState := raftpb.ConfState{ + Voters: voters, + Learners: learners, + } + + // create the v2 snaspshot file + raftSnap := raftpb.Snapshot{ + Data: etcdserver.GetMembershipInfoInV2Format(lg, cl), + Metadata: raftpb.SnapshotMetadata{ + Index: ci, + Term: term, + ConfState: confState, + }, + } + sn := snap.New(lg, snapDir) + if err = sn.SaveSnap(raftSnap); err != nil { + return err + } + + // save WAL snapshot record + w, err := wal.Open(lg, walDir, latestWALSnap) + if err != nil { + return err + } + defer w.Close() + // We must read all records to locate the tail of the last valid WAL file. + _, st, _, err := w.ReadAll() + if err != nil { + return err + } + + if err := w.SaveSnapshot(walpb.Snapshot{Index: ci, Term: term, ConfState: &confState}); err != nil { + return err + } + if err := w.Save(raftpb.HardState{Term: term, Commit: ci, Vote: st.Vote}, nil); err != nil { + return err + } + return w.Sync() +} + +func getVotersAndLearners(cl *membership.RaftCluster) ([]uint64, []uint64) { + var ( + voters []uint64 + learners []uint64 + ) + for _, m := range cl.Members() { + if m.IsLearner { + learners = append(learners, uint64(m.ID)) + continue + } + + voters = append(voters, uint64(m.ID)) + } + + return voters, learners +} diff --git a/etcdutl/etcdutl/common_test.go b/etcdutl/etcdutl/common_test.go index e2acedfa2e7..481a1addf62 100644 --- a/etcdutl/etcdutl/common_test.go +++ b/etcdutl/etcdutl/common_test.go @@ -15,16 +15,26 @@ package etcdutl import ( + "path" + "path/filepath" "testing" + "github.com/coreos/go-semver/semver" "github.com/stretchr/testify/require" "go.uber.org/zap" "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/client/pkg/v3/fileutil" + "go.etcd.io/etcd/client/pkg/v3/types" "go.etcd.io/etcd/pkg/v3/pbutil" + "go.etcd.io/etcd/server/v3/etcdserver" + "go.etcd.io/etcd/server/v3/etcdserver/api/membership" "go.etcd.io/etcd/server/v3/etcdserver/api/snap" + "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" + "go.etcd.io/etcd/server/v3/etcdserver/cindex" + "go.etcd.io/etcd/server/v3/storage/backend" "go.etcd.io/etcd/server/v3/storage/datadir" + "go.etcd.io/etcd/server/v3/storage/schema" "go.etcd.io/etcd/server/v3/storage/wal" "go.etcd.io/etcd/server/v3/storage/wal/walpb" "go.etcd.io/raft/v3/raftpb" @@ -141,3 +151,184 @@ func TestGetLatestWalSnap(t *testing.T) { }) } } + +func TestCreateV2SnapshotFromV3Store(t *testing.T) { + testCases := []struct { + name string + consistentIndex uint64 + term uint64 + clusterVersion string + members []uint64 + learners []uint64 + removedMembers []uint64 + expectedErrMsg string + }{ + { + name: "unexpected term: less than the last snapshot.term", + consistentIndex: 3, + term: 1, + expectedErrMsg: "less than the latest snapshot", + }, + { + name: "unexpected consistent index: less than the last snapshot.index", + consistentIndex: 1, + term: 3, + expectedErrMsg: "less than the latest snapshot", + }, + { + name: "normal case", + consistentIndex: 32, + term: 4, + clusterVersion: "3.5.0", + members: []uint64{100, 200}, + learners: []uint64{300}, + removedMembers: []uint64{400, 500}, + }, + { + name: "empty cluster version", + consistentIndex: 45, + term: 4, + clusterVersion: "", + members: []uint64{110, 200}, + learners: []uint64{350}, + removedMembers: []uint64{450, 500}, + }, + { + name: "no learner", + consistentIndex: 7, + term: 5, + clusterVersion: "3.5.0", + members: []uint64{150, 200}, + removedMembers: []uint64{450, 550}, + }, + { + name: "no removed members", + consistentIndex: 34, + term: 6, + clusterVersion: "3.7.0", + members: []uint64{160, 200}, + learners: []uint64{300}, + }, + { + name: "no learner and removed members", + consistentIndex: 19, + term: 5, + clusterVersion: "3.6.0", + members: []uint64{120, 220}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + dataDir := t.TempDir() + lg := zap.NewNop() + + require.NoError(t, fileutil.TouchDirAll(lg, datadir.ToMemberDir(dataDir))) + require.NoError(t, fileutil.TouchDirAll(lg, datadir.ToWALDir(dataDir))) + require.NoError(t, fileutil.TouchDirAll(lg, datadir.ToSnapDir(dataDir))) + + t.Log("Populate the wal file") + w, err := wal.Create(lg, datadir.ToWALDir(dataDir), pbutil.MustMarshal( + &etcdserverpb.Metadata{ + NodeID: 1, + ClusterID: 2, + }, + )) + require.NoError(t, err) + err = w.SaveSnapshot(walpb.Snapshot{Index: 2, Term: 2, ConfState: &raftpb.ConfState{Voters: []uint64{1}}}) + require.NoError(t, err) + err = w.Save(raftpb.HardState{Term: 2, Commit: 2, Vote: 1}, nil) + require.NoError(t, err) + err = w.Close() + require.NoError(t, err) + + t.Log("Generate a v2 snapshot file") + ss := snap.New(lg, datadir.ToSnapDir(dataDir)) + err = ss.SaveSnap(raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: 2, Term: 2, ConfState: raftpb.ConfState{Voters: []uint64{1}}}}) + require.NoError(t, err) + + t.Log("Load and verify the latest v2 snapshot file") + oldV2Snap, err := getLatestV2Snapshot(lg, dataDir) + require.NoError(t, err) + require.Equal(t, raftpb.SnapshotMetadata{Index: 2, Term: 2, ConfState: raftpb.ConfState{Voters: []uint64{1}}}, oldV2Snap.Metadata) + + t.Log("Prepare the bbolt db") + be := backend.NewDefaultBackend(lg, filepath.Join(dataDir, "member/snap/db")) + schema.CreateMetaBucket(be.BatchTx()) + schema.NewMembershipBackend(lg, be).MustCreateBackendBuckets() + + if len(tc.clusterVersion) > 0 { + t.Logf("Populate the cluster version: %s", tc.clusterVersion) + schema.NewMembershipBackend(lg, be).MustSaveClusterVersionToBackend(semver.New(tc.clusterVersion)) + } else { + t.Log("Skip populating cluster version due to not provided") + } + + tx := be.BatchTx() + tx.LockOutsideApply() + t.Log("Populate the consistent index and term") + ci := cindex.NewConsistentIndex(be) + ci.SetConsistentIndex(tc.consistentIndex, tc.term) + ci.UnsafeSave(tx) + tx.Unlock() + + t.Logf("Populate members: %d", len(tc.members)) + memberBackend := schema.NewMembershipBackend(lg, be) + for _, mID := range tc.members { + memberBackend.MustSaveMemberToBackend(&membership.Member{ID: types.ID(mID)}) + } + + t.Logf("Populate learner: %d", len(tc.learners)) + for _, mID := range tc.learners { + memberBackend.MustSaveMemberToBackend(&membership.Member{ID: types.ID(mID), RaftAttributes: membership.RaftAttributes{IsLearner: true}}) + } + + t.Logf("Populate removed members: %d", len(tc.removedMembers)) + for _, mID := range tc.removedMembers { + memberBackend.MustDeleteMemberFromBackend(types.ID(mID)) + } + + t.Log("Committing bbolt db") + be.ForceCommit() + require.NoError(t, be.Close()) + + t.Log("Creating a new v2 snapshot file based on the v3 store") + err = createV2SnapshotFromV3Store(dataDir, backend.NewDefaultBackend(lg, filepath.Join(dataDir, "member/snap/db"))) + if len(tc.expectedErrMsg) > 0 { + require.ErrorContains(t, err, tc.expectedErrMsg) + return + } + require.NoError(t, err) + + t.Log("Loading & verifying the new latest v2 snapshot file") + newV2Snap, err := getLatestV2Snapshot(lg, dataDir) + require.NoError(t, err) + require.Equal(t, raftpb.SnapshotMetadata{Index: tc.consistentIndex, Term: tc.term, ConfState: raftpb.ConfState{Voters: tc.members, Learners: tc.learners}}, newV2Snap.Metadata) + + st := v2store.New(etcdserver.StoreClusterPrefix, etcdserver.StoreKeysPrefix) + require.NoError(t, st.Recovery(newV2Snap.Data)) + + cv, err := st.Get(path.Join(etcdserver.StoreClusterPrefix, "version"), false, false) + if len(tc.clusterVersion) > 0 { + require.NoError(t, err) + if !semver.New(*cv.Node.Value).Equal(*semver.New(tc.clusterVersion)) { + t.Fatalf("Unexpected cluster version, got %s, want %s", semver.New(*cv.Node.Value).String(), tc.clusterVersion) + } + } else { + require.ErrorContains(t, err, "Key not found") + } + + members, err := st.Get(path.Join(etcdserver.StoreClusterPrefix, "members"), true, true) + require.NoError(t, err) + require.Len(t, members.Node.Nodes, len(tc.members)+len(tc.learners)) + + removedMembers, err := st.Get(path.Join(etcdserver.StoreClusterPrefix, "removed_members"), true, true) + if len(tc.removedMembers) > 0 { + require.NoError(t, err) + require.Equal(t, len(tc.removedMembers), len(removedMembers.Node.Nodes)) + } else { + require.ErrorContains(t, err, "Key not found") + } + }) + } +} diff --git a/etcdutl/etcdutl/migrate_command.go b/etcdutl/etcdutl/migrate_command.go index 0a114268524..4ce76173ec9 100644 --- a/etcdutl/etcdutl/migrate_command.go +++ b/etcdutl/etcdutl/migrate_command.go @@ -129,7 +129,7 @@ func migrateCommandFunc(c *migrateConfig) error { tx := be.BatchTx() current, err := schema.DetectSchemaVersion(c.lg, be.ReadTx()) if err != nil { - c.lg.Error("failed to detect storage version. Please make sure you are using data dir from etcd v3.5 and older") + c.lg.Error("failed to detect storage version. Please make sure you are using data dir from etcd v3.5 and older", zap.Error(err)) return err } if current == *c.targetVersion { @@ -137,6 +137,20 @@ func migrateCommandFunc(c *migrateConfig) error { return nil } + // only generate a v2 snapshot file for downgrade case + if c.targetVersion.LessThan(current) { + // Update cluster version + schema.NewMembershipBackend(c.lg, be).MustSaveClusterVersionToBackend(c.targetVersion) + + // forcibly create a v2 snapshot file + // TODO: remove in 3.8 + if err = createV2SnapshotFromV3Store(c.dataDir, be); err != nil { + c.lg.Error("Failed to create v2 snapshot file", zap.Error(err)) + return err + } + c.lg.Info("Generated a v2 snapshot file") + } + if err = c.finalize(); err != nil { c.lg.Error("Failed to finalize config", zap.Error(err)) return err @@ -150,6 +164,7 @@ func migrateCommandFunc(c *migrateConfig) error { c.lg.Info("normal migrate failed, trying with force", zap.Error(err)) migrateForce(c.lg, tx, c.targetVersion) } + be.ForceCommit() return nil } diff --git a/tests/e2e/utl_migrate_test.go b/tests/e2e/utl_migrate_test.go index 5ee933f0ef2..5cae065487e 100644 --- a/tests/e2e/utl_migrate_test.go +++ b/tests/e2e/utl_migrate_test.go @@ -49,6 +49,7 @@ func TestEtctlutlMigrate(t *testing.T) { expectLogsSubString string expectStorageVersion *semver.Version + expectTargetBinary string }{ { name: "Invalid target version string", @@ -81,23 +82,25 @@ func TestEtctlutlMigrate(t *testing.T) { { name: "Migrate v3.5 to v3.5 is no-op", clusterVersion: e2e.LastVersion, - clusterSize: 1, targetVersion: "3.5", + clusterSize: 1, expectLogsSubString: "storage version up-to-date\t" + `{"storage-version": "3.5"}`, }, { name: "Upgrade 1 member cluster from v3.5 to v3.6 should work", clusterVersion: e2e.LastVersion, - clusterSize: 1, targetVersion: "3.6", + clusterSize: 1, expectStorageVersion: &version.V3_6, + expectTargetBinary: e2e.BinPath.Etcd, }, { name: "Upgrade 3 member cluster from v3.5 to v3.6 should work", clusterVersion: e2e.LastVersion, - clusterSize: 3, targetVersion: "3.6", + clusterSize: 3, expectStorageVersion: &version.V3_6, + expectTargetBinary: e2e.BinPath.Etcd, }, { name: "Migrate v3.6 to v3.6 is no-op", @@ -112,6 +115,7 @@ func TestEtctlutlMigrate(t *testing.T) { clusterSize: 1, expectLogsSubString: "updated storage version", expectStorageVersion: nil, // 3.5 doesn't have the field `storageVersion`, so it returns nil. + expectTargetBinary: e2e.BinPath.EtcdLastRelease, }, { name: "Downgrade 3 member cluster from v3.6 to v3.5 should work", @@ -119,6 +123,7 @@ func TestEtctlutlMigrate(t *testing.T) { clusterSize: 3, expectLogsSubString: "updated storage version", expectStorageVersion: nil, // 3.5 doesn't have the field `storageVersion`, so it returns nil. + expectTargetBinary: e2e.BinPath.EtcdLastRelease, }, { name: "Upgrade v3.6 to v3.7 with force should work", @@ -141,7 +146,7 @@ func TestEtctlutlMigrate(t *testing.T) { epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t, e2e.WithVersion(tc.clusterVersion), e2e.WithDataDirPath(dataDirPath), - e2e.WithClusterSize(1), + e2e.WithClusterSize(tc.clusterSize), e2e.WithKeepDataDir(true), // Set low SnapshotCount to ensure wal snapshot is done e2e.WithSnapshotCount(1), @@ -163,7 +168,7 @@ func TestEtctlutlMigrate(t *testing.T) { require.NoError(t, e2e.SpawnWithExpect(append(prefixArgs, "put", fmt.Sprintf("%d", i), "value"), expect.ExpectedResponse{Value: "OK"})) } - t.Log("Stopping the the members") + t.Log("Stopping all the servers") for i := 0; i < len(epc.Procs); i++ { t.Logf("Stopping server %d: %v", i, epc.Procs[i].EndpointsGRPC()) err = epc.Procs[i].Stop() @@ -190,6 +195,33 @@ func TestEtctlutlMigrate(t *testing.T) { assert.Equal(t, tc.expectStorageVersion, ver) be.Close() } + + if len(tc.expectTargetBinary) == 0 || !fileutil.Exist(tc.expectTargetBinary) { + return + } + + t.Log("Start all members with new binary") + for i := 0; i < len(epc.Procs); i++ { + t.Logf("Replace binary for member %d: %v", i, epc.Procs[i].EndpointsGRPC()) + member := epc.Procs[i] + member.Config().ExecPath = tc.expectTargetBinary + } + require.NoError(t, epc.Start(context.TODO())) + + t.Log("Verify the versions of all members") + for i := 0; i < len(epc.Procs); i++ { + t.Logf("Verify the version of member %d: %v", i, epc.Procs[i].EndpointsGRPC()) + ver := tc.targetVersion + ".0" + expectedVer := version.Versions{ + Server: ver, + Cluster: ver, + } + if tc.expectStorageVersion != nil { + expectedVer.Storage = ver + } + + e2e.ValidateVersion(t, epc.Cfg, epc.Procs[i], expectedVer) + } }) } }