Skip to content

Commit

Permalink
Create a v2 snapshot when running etcdutl migrate command
Browse files Browse the repository at this point in the history
Also added test to cover the etcdutl migrate command

Signed-off-by: Benjamin Wang <[email protected]>
  • Loading branch information
ahrtr committed Jan 15, 2025
1 parent 1346f36 commit 1187204
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 29 deletions.
90 changes: 90 additions & 0 deletions etcdutl/etcdutl/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,19 @@ package etcdutl

import (
"errors"
"fmt"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"go.etcd.io/etcd/client/pkg/v3/logutil"
"go.etcd.io/etcd/pkg/v3/cobrautl"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/datadir"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.etcd.io/etcd/server/v3/storage/wal"
"go.etcd.io/etcd/server/v3/storage/wal/walpb"
"go.etcd.io/raft/v3/raftpb"
Expand Down Expand Up @@ -68,3 +73,88 @@ func getLatestV2Snapshot(lg *zap.Logger, dataDir string) (*raftpb.Snapshot, erro

return snapshot, nil
}

func createV2SnapshotFromV3Store(dataDir string, be backend.Backend) error {
var (
lg = GetLogger()

snapDir = datadir.ToSnapDir(dataDir)
walDir = datadir.ToWALDir(dataDir)
)

ci, term := schema.ReadConsistentIndex(be.ReadTx())

cl := membership.NewCluster(lg)
cl.SetBackend(schema.NewMembershipBackend(lg, be))
cl.UnsafeLoad()

latestWALSnap, err := getLatestWALSnap(lg, dataDir)
if err != nil {
return err
}

// Each time before creating the v2 snapshot, etcdserve always flush
// the backend storage (bbolt db), so the consistent index should never
// less than the Index or term of the latest snapshot.
if ci < latestWALSnap.Index || term < latestWALSnap.Term {
// This should never happen
return fmt.Errorf("consistent_index [Index: %d, Term: %d] is less than the latest snapshot [Index: %d, Term: %d]", ci, term, latestWALSnap.Index, latestWALSnap.Term)
}

voters, learners := getVotersAndLearners(cl)
confState := raftpb.ConfState{
Voters: voters,
Learners: learners,
}

// create the v2 snaspshot file
raftSnap := raftpb.Snapshot{
Data: etcdserver.GetMembershipInfoInV2Format(lg, cl),
Metadata: raftpb.SnapshotMetadata{
Index: ci,
Term: term,
ConfState: confState,
},
}
sn := snap.New(lg, snapDir)
if err = sn.SaveSnap(raftSnap); err != nil {
return err
}

// save WAL snapshot record
w, err := wal.Open(lg, walDir, latestWALSnap)
if err != nil {
return err
}
defer w.Close()
// We must read all records to locate the tail of the last valid WAL file.
_, st, _, err := w.ReadAll()
if err != nil {
return err
}

if err := w.SaveSnapshot(walpb.Snapshot{Index: ci, Term: term, ConfState: &confState}); err != nil {
return err
}
if err := w.Save(raftpb.HardState{Term: term, Commit: ci, Vote: st.Vote}, nil); err != nil {
return err
}
return w.Sync()
}

func getVotersAndLearners(cl *membership.RaftCluster) ([]uint64, []uint64) {
var (
voters []uint64
learners []uint64
)
for _, m := range cl.Members() {
if m.IsLearner {
learners = append(learners, uint64(m.ID))
continue
}

voters = append(voters, uint64(m.ID))
}

return voters, learners
}
67 changes: 48 additions & 19 deletions etcdutl/etcdutl/migrate_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ func (o *migrateOptions) AddFlags(cmd *cobra.Command) {

func (o *migrateOptions) Config() (*migrateConfig, error) {
c := &migrateConfig{
force: o.force,
lg: GetLogger(),
force: o.force,
dataDir: o.dataDir,
lg: GetLogger(),
}
var err error
dotCount := strings.Count(o.targetVersion, ".")
Expand All @@ -90,47 +91,73 @@ func (o *migrateOptions) Config() (*migrateConfig, error) {
return nil, fmt.Errorf(`target version %q not supported. Minimal "3.5"`, storageVersionToString(c.targetVersion))
}

dbPath := datadir.ToBackendFileName(o.dataDir)
c.be = backend.NewDefaultBackend(GetLogger(), dbPath)
return c, nil
}

type migrateConfig struct {
lg *zap.Logger
be backend.Backend
targetVersion *semver.Version
walVersion schema.WALVersion
dataDir string
force bool
}

walPath := datadir.ToWALDir(o.dataDir)
walSnap, err := getLatestWALSnap(c.lg, o.dataDir)
func (c *migrateConfig) finalize() error {
walPath := datadir.ToWALDir(c.dataDir)
walSnap, err := getLatestWALSnap(c.lg, c.dataDir)
if err != nil {
return nil, fmt.Errorf("failed to get the lastest snapshot: %w", err)
return fmt.Errorf("failed to get the lastest snapshot: %w", err)
}
w, err := wal.OpenForRead(c.lg, walPath, walSnap)
if err != nil {
return nil, fmt.Errorf(`failed to open wal: %w`, err)
return fmt.Errorf(`failed to open wal: %w`, err)
}
defer w.Close()
c.walVersion, err = wal.ReadWALVersion(w)
if err != nil {
return nil, fmt.Errorf(`failed to read wal: %w`, err)
return fmt.Errorf(`failed to read wal: %w`, err)
}

return c, nil
}

type migrateConfig struct {
lg *zap.Logger
be backend.Backend
targetVersion *semver.Version
walVersion schema.WALVersion
force bool
return nil
}

func migrateCommandFunc(c *migrateConfig) error {
dbPath := datadir.ToBackendFileName(c.dataDir)
c.be = backend.NewDefaultBackend(GetLogger(), dbPath)
defer c.be.Close()

tx := c.be.BatchTx()
current, err := schema.DetectSchemaVersion(c.lg, c.be.ReadTx())
if err != nil {
c.lg.Error("failed to detect storage version. Please make sure you are using data dir from etcd v3.5 and older")
c.lg.Error("failed to detect storage version. Please make sure you are using data dir from etcd v3.5 and older", zap.Error(err))
return err
}
if current == *c.targetVersion {
c.lg.Info("storage version up-to-date", zap.String("storage-version", storageVersionToString(&current)))
return nil
}

// only generate a v2 snapshot file for downgrade case
if c.targetVersion.LessThan(current) {
// Update cluster version
be := schema.NewMembershipBackend(c.lg, c.be)
be.MustSaveClusterVersionToBackend(c.targetVersion)

// forcibly create a v2 snapshot file
// TODO: remove in 3.8
if err = createV2SnapshotFromV3Store(c.dataDir, c.be); err != nil {
c.lg.Error("Failed to create v2 snapshot file", zap.Error(err))
return err
}
c.lg.Info("Generated a v2 snapshot file")
}

if err = c.finalize(); err != nil {
c.lg.Error("Failed to finalize config", zap.Error(err))
return err
}

err = schema.Migrate(c.lg, tx, c.walVersion, *c.targetVersion)
if err != nil {
if !c.force {
Expand All @@ -139,7 +166,9 @@ func migrateCommandFunc(c *migrateConfig) error {
c.lg.Info("normal migrate failed, trying with force", zap.Error(err))
migrateForce(c.lg, tx, c.targetVersion)
}

c.be.ForceCommit()

return nil
}

Expand Down
16 changes: 11 additions & 5 deletions server/etcdserver/api/membership/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,22 +256,28 @@ func (c *RaftCluster) SetVersionChangedNotifier(n *notify.Notifier) {
c.versionChanged = n
}

func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {
c.Lock()
defer c.Unlock()

func (c *RaftCluster) UnsafeLoad() {
if c.be != nil {
c.version = c.be.ClusterVersionFromBackend()
c.members, c.removed = c.be.MustReadMembersFromBackend()
} else {
c.version = clusterVersionFromStore(c.lg, c.v2store)
c.members, c.removed = membersFromStore(c.lg, c.v2store)
}
c.buildMembershipMetric()

if c.be != nil {
c.downgradeInfo = c.be.DowngradeInfoFromBackend()
}
}

func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {
c.Lock()
defer c.Unlock()

c.UnsafeLoad()

c.buildMembershipMetric()

sv := semver.Must(semver.NewVersion(version.Version))
if c.downgradeInfo != nil && c.downgradeInfo.Enabled {
c.lg.Info(
Expand Down
42 changes: 37 additions & 5 deletions tests/e2e/utl_migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func TestEtctlutlMigrate(t *testing.T) {

expectLogsSubString string
expectStorageVersion *semver.Version
expectTargetBinary string
}{
{
name: "Invalid target version string",
Expand Down Expand Up @@ -81,23 +82,25 @@ func TestEtctlutlMigrate(t *testing.T) {
{
name: "Migrate v3.5 to v3.5 is no-op",
clusterVersion: e2e.LastVersion,
clusterSize: 1,
targetVersion: "3.5",
clusterSize: 1,
expectLogsSubString: "storage version up-to-date\t" + `{"storage-version": "3.5"}`,
},
{
name: "Upgrade 1 member cluster from v3.5 to v3.6 should work",
clusterVersion: e2e.LastVersion,
clusterSize: 1,
targetVersion: "3.6",
clusterSize: 1,
expectStorageVersion: &version.V3_6,
expectTargetBinary: e2e.BinPath.Etcd,
},
{
name: "Upgrade 3 member cluster from v3.5 to v3.6 should work",
clusterVersion: e2e.LastVersion,
clusterSize: 3,
targetVersion: "3.6",
clusterSize: 3,
expectStorageVersion: &version.V3_6,
expectTargetBinary: e2e.BinPath.Etcd,
},
{
name: "Migrate v3.6 to v3.6 is no-op",
Expand All @@ -112,13 +115,15 @@ func TestEtctlutlMigrate(t *testing.T) {
clusterSize: 1,
expectLogsSubString: "updated storage version",
expectStorageVersion: nil, // 3.5 doesn't have the field `storageVersion`, so it returns nil.
expectTargetBinary: e2e.BinPath.EtcdLastRelease,
},
{
name: "Downgrade 3 member cluster from v3.6 to v3.5 should work",
targetVersion: "3.5",
clusterSize: 3,
expectLogsSubString: "updated storage version",
expectStorageVersion: nil, // 3.5 doesn't have the field `storageVersion`, so it returns nil.
expectTargetBinary: e2e.BinPath.EtcdLastRelease,
},
{
name: "Upgrade v3.6 to v3.7 with force should work",
Expand All @@ -141,7 +146,7 @@ func TestEtctlutlMigrate(t *testing.T) {
epc, err := e2e.NewEtcdProcessCluster(context.TODO(), t,
e2e.WithVersion(tc.clusterVersion),
e2e.WithDataDirPath(dataDirPath),
e2e.WithClusterSize(1),
e2e.WithClusterSize(tc.clusterSize),
e2e.WithKeepDataDir(true),
// Set low SnapshotCount to ensure wal snapshot is done
e2e.WithSnapshotCount(1),
Expand All @@ -163,7 +168,7 @@ func TestEtctlutlMigrate(t *testing.T) {
require.NoError(t, e2e.SpawnWithExpect(append(prefixArgs, "put", fmt.Sprintf("%d", i), "value"), expect.ExpectedResponse{Value: "OK"}))
}

t.Log("Stopping the the members")
t.Log("Stopping all the servers")
for i := 0; i < len(epc.Procs); i++ {
t.Logf("Stopping server %d: %v", i, epc.Procs[i].EndpointsGRPC())
err = epc.Procs[i].Stop()
Expand All @@ -190,6 +195,33 @@ func TestEtctlutlMigrate(t *testing.T) {
assert.Equal(t, tc.expectStorageVersion, ver)
be.Close()
}

if len(tc.expectTargetBinary) == 0 || !fileutil.Exist(tc.expectTargetBinary) {
return
}

t.Log("Start all members with new binary")
for i := 0; i < len(epc.Procs); i++ {
t.Logf("Replace binary for member %d: %v", i, epc.Procs[i].EndpointsGRPC())
member := epc.Procs[i]
member.Config().ExecPath = tc.expectTargetBinary
}
require.NoError(t, epc.Start(context.TODO()))

t.Log("Verify the versions of all members")
for i := 0; i < len(epc.Procs); i++ {
t.Logf("Verify the version of member %d: %v", i, epc.Procs[i].EndpointsGRPC())
ver := tc.targetVersion + ".0"
expectedVer := version.Versions{
Server: ver,
Cluster: ver,
}
if tc.expectStorageVersion != nil {
expectedVer.Storage = ver
}

e2e.ValidateVersion(t, epc.Cfg, epc.Procs[i], expectedVer)
}
})
}
}

0 comments on commit 1187204

Please sign in to comment.