Skip to content

Commit

Permalink
Create a v2 snapshot when running etcdutl migrate command
Browse files Browse the repository at this point in the history
Also added test to cover the etcdutl migrate command

Signed-off-by: Benjamin Wang <[email protected]>
  • Loading branch information
ahrtr committed Jan 16, 2025
1 parent ab819b5 commit e3fb899
Show file tree
Hide file tree
Showing 4 changed files with 334 additions and 6 deletions.
90 changes: 90 additions & 0 deletions etcdutl/etcdutl/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,19 @@ package etcdutl

import (
"errors"
"fmt"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"go.etcd.io/etcd/client/pkg/v3/logutil"
"go.etcd.io/etcd/pkg/v3/cobrautl"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/datadir"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.etcd.io/etcd/server/v3/storage/wal"
"go.etcd.io/etcd/server/v3/storage/wal/walpb"
"go.etcd.io/raft/v3/raftpb"
Expand Down Expand Up @@ -68,3 +73,88 @@ func getLatestV2Snapshot(lg *zap.Logger, dataDir string) (*raftpb.Snapshot, erro

return snapshot, nil
}

func createV2SnapshotFromV3Store(dataDir string, be backend.Backend) error {
var (
lg = GetLogger()

snapDir = datadir.ToSnapDir(dataDir)
walDir = datadir.ToWALDir(dataDir)
)

ci, term := schema.ReadConsistentIndex(be.ReadTx())

cl := membership.NewCluster(lg)
cl.SetBackend(schema.NewMembershipBackend(lg, be))
cl.UnsafeLoad()

latestWALSnap, err := getLatestWALSnap(lg, dataDir)
if err != nil {
return err
}

// Each time before creating the v2 snapshot, etcdserve always flush
// the backend storage (bbolt db), so the consistent index should never
// less than the Index or term of the latest snapshot.
if ci < latestWALSnap.Index || term < latestWALSnap.Term {
// This should never happen
return fmt.Errorf("consistent_index [Index: %d, Term: %d] is less than the latest snapshot [Index: %d, Term: %d]", ci, term, latestWALSnap.Index, latestWALSnap.Term)
}

voters, learners := getVotersAndLearners(cl)
confState := raftpb.ConfState{
Voters: voters,
Learners: learners,
}

// create the v2 snaspshot file
raftSnap := raftpb.Snapshot{
Data: etcdserver.GetMembershipInfoInV2Format(lg, cl),
Metadata: raftpb.SnapshotMetadata{
Index: ci,
Term: term,
ConfState: confState,
},
}
sn := snap.New(lg, snapDir)
if err = sn.SaveSnap(raftSnap); err != nil {
return err
}

// save WAL snapshot record
w, err := wal.Open(lg, walDir, latestWALSnap)
if err != nil {
return err
}
defer w.Close()
// We must read all records to locate the tail of the last valid WAL file.
_, st, _, err := w.ReadAll()
if err != nil {
return err
}

if err := w.SaveSnapshot(walpb.Snapshot{Index: ci, Term: term, ConfState: &confState}); err != nil {
return err
}
if err := w.Save(raftpb.HardState{Term: term, Commit: ci, Vote: st.Vote}, nil); err != nil {
return err
}
return w.Sync()
}

func getVotersAndLearners(cl *membership.RaftCluster) ([]uint64, []uint64) {
var (
voters []uint64
learners []uint64
)
for _, m := range cl.Members() {
if m.IsLearner {
learners = append(learners, uint64(m.ID))
continue
}

voters = append(voters, uint64(m.ID))
}

return voters, learners
}
191 changes: 191 additions & 0 deletions etcdutl/etcdutl/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,26 @@
package etcdutl

import (
"path"
"path/filepath"
"testing"

"github.com/coreos/go-semver/semver"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/client/pkg/v3/fileutil"
"go.etcd.io/etcd/client/pkg/v3/types"
"go.etcd.io/etcd/pkg/v3/pbutil"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/datadir"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.etcd.io/etcd/server/v3/storage/wal"
"go.etcd.io/etcd/server/v3/storage/wal/walpb"
"go.etcd.io/raft/v3/raftpb"
Expand Down Expand Up @@ -141,3 +151,184 @@ func TestGetLatestWalSnap(t *testing.T) {
})
}
}

func TestCreateV2SnapshotFromV3Store(t *testing.T) {
testCases := []struct {
name string
consistentIndex uint64
term uint64
clusterVersion string
members []uint64
learners []uint64
removedMembers []uint64
expectedErrMsg string
}{
{
name: "unexpected term: less than the last snapshot.term",
consistentIndex: 3,
term: 1,
expectedErrMsg: "less than the latest snapshot",
},
{
name: "unexpected consistent index: less than the last snapshot.index",
consistentIndex: 1,
term: 3,
expectedErrMsg: "less than the latest snapshot",
},
{
name: "normal case",
consistentIndex: 32,
term: 4,
clusterVersion: "3.5.0",
members: []uint64{100, 200},
learners: []uint64{300},
removedMembers: []uint64{400, 500},
},
{
name: "empty cluster version",
consistentIndex: 45,
term: 4,
clusterVersion: "",
members: []uint64{110, 200},
learners: []uint64{350},
removedMembers: []uint64{450, 500},
},
{
name: "no learner",
consistentIndex: 7,
term: 5,
clusterVersion: "3.5.0",
members: []uint64{150, 200},
removedMembers: []uint64{450, 550},
},
{
name: "no removed members",
consistentIndex: 34,
term: 6,
clusterVersion: "3.7.0",
members: []uint64{160, 200},
learners: []uint64{300},
},
{
name: "no learner and removed members",
consistentIndex: 19,
term: 5,
clusterVersion: "3.6.0",
members: []uint64{120, 220},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
dataDir := t.TempDir()
lg := zap.NewNop()

require.NoError(t, fileutil.TouchDirAll(lg, datadir.ToMemberDir(dataDir)))
require.NoError(t, fileutil.TouchDirAll(lg, datadir.ToWALDir(dataDir)))
require.NoError(t, fileutil.TouchDirAll(lg, datadir.ToSnapDir(dataDir)))

t.Log("Populate the wal file")
w, err := wal.Create(lg, datadir.ToWALDir(dataDir), pbutil.MustMarshal(
&etcdserverpb.Metadata{
NodeID: 1,
ClusterID: 2,
},
))
require.NoError(t, err)
err = w.SaveSnapshot(walpb.Snapshot{Index: 2, Term: 2, ConfState: &raftpb.ConfState{Voters: []uint64{1}}})
require.NoError(t, err)
err = w.Save(raftpb.HardState{Term: 2, Commit: 2, Vote: 1}, nil)
require.NoError(t, err)
err = w.Close()
require.NoError(t, err)

t.Log("Generate a v2 snapshot file")
ss := snap.New(lg, datadir.ToSnapDir(dataDir))
err = ss.SaveSnap(raftpb.Snapshot{Metadata: raftpb.SnapshotMetadata{Index: 2, Term: 2, ConfState: raftpb.ConfState{Voters: []uint64{1}}}})
require.NoError(t, err)

t.Log("Load and verify the latest v2 snapshot file")
oldV2Snap, err := getLatestV2Snapshot(lg, dataDir)
require.NoError(t, err)
require.Equal(t, raftpb.SnapshotMetadata{Index: 2, Term: 2, ConfState: raftpb.ConfState{Voters: []uint64{1}}}, oldV2Snap.Metadata)

t.Log("Prepare the bbolt db")
be := backend.NewDefaultBackend(lg, filepath.Join(dataDir, "member/snap/db"))
schema.CreateMetaBucket(be.BatchTx())
schema.NewMembershipBackend(lg, be).MustCreateBackendBuckets()

if len(tc.clusterVersion) > 0 {
t.Logf("Populate the cluster version: %s", tc.clusterVersion)
schema.NewMembershipBackend(lg, be).MustSaveClusterVersionToBackend(semver.New(tc.clusterVersion))
} else {
t.Log("Skip populating cluster version due to not provided")
}

tx := be.BatchTx()
tx.LockOutsideApply()
t.Log("Populate the consistent index and term")
ci := cindex.NewConsistentIndex(be)
ci.SetConsistentIndex(tc.consistentIndex, tc.term)
ci.UnsafeSave(tx)
tx.Unlock()

t.Logf("Populate members: %d", len(tc.members))
memberBackend := schema.NewMembershipBackend(lg, be)
for _, mID := range tc.members {
memberBackend.MustSaveMemberToBackend(&membership.Member{ID: types.ID(mID)})
}

t.Logf("Populate learner: %d", len(tc.learners))
for _, mID := range tc.learners {
memberBackend.MustSaveMemberToBackend(&membership.Member{ID: types.ID(mID), RaftAttributes: membership.RaftAttributes{IsLearner: true}})
}

t.Logf("Populate removed members: %d", len(tc.removedMembers))
for _, mID := range tc.removedMembers {
memberBackend.MustDeleteMemberFromBackend(types.ID(mID))
}

t.Log("Committing bbolt db")
be.ForceCommit()
require.NoError(t, be.Close())

t.Log("Creating a new v2 snapshot file based on the v3 store")
err = createV2SnapshotFromV3Store(dataDir, backend.NewDefaultBackend(lg, filepath.Join(dataDir, "member/snap/db")))
if len(tc.expectedErrMsg) > 0 {
require.ErrorContains(t, err, tc.expectedErrMsg)
return
}
require.NoError(t, err)

t.Log("Loading & verifying the new latest v2 snapshot file")
newV2Snap, err := getLatestV2Snapshot(lg, dataDir)
require.NoError(t, err)
require.Equal(t, raftpb.SnapshotMetadata{Index: tc.consistentIndex, Term: tc.term, ConfState: raftpb.ConfState{Voters: tc.members, Learners: tc.learners}}, newV2Snap.Metadata)

st := v2store.New(etcdserver.StoreClusterPrefix, etcdserver.StoreKeysPrefix)
require.NoError(t, st.Recovery(newV2Snap.Data))

cv, err := st.Get(path.Join(etcdserver.StoreClusterPrefix, "version"), false, false)
if len(tc.clusterVersion) > 0 {
require.NoError(t, err)
if !semver.New(*cv.Node.Value).Equal(*semver.New(tc.clusterVersion)) {
t.Fatalf("Unexpected cluster version, got %s, want %s", semver.New(*cv.Node.Value).String(), tc.clusterVersion)
}
} else {
require.ErrorContains(t, err, "Key not found")
}

members, err := st.Get(path.Join(etcdserver.StoreClusterPrefix, "members"), true, true)
require.NoError(t, err)
require.Len(t, members.Node.Nodes, len(tc.members)+len(tc.learners))

removedMembers, err := st.Get(path.Join(etcdserver.StoreClusterPrefix, "removed_members"), true, true)
if len(tc.removedMembers) > 0 {
require.NoError(t, err)
require.Equal(t, len(tc.removedMembers), len(removedMembers.Node.Nodes))
} else {
require.ErrorContains(t, err, "Key not found")
}
})
}
}
17 changes: 16 additions & 1 deletion etcdutl/etcdutl/migrate_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,28 @@ func migrateCommandFunc(c *migrateConfig) error {
tx := be.BatchTx()
current, err := schema.DetectSchemaVersion(c.lg, be.ReadTx())
if err != nil {
c.lg.Error("failed to detect storage version. Please make sure you are using data dir from etcd v3.5 and older")
c.lg.Error("failed to detect storage version. Please make sure you are using data dir from etcd v3.5 and older", zap.Error(err))
return err
}
if current == *c.targetVersion {
c.lg.Info("storage version up-to-date", zap.String("storage-version", storageVersionToString(&current)))
return nil
}

// only generate a v2 snapshot file for downgrade case
if c.targetVersion.LessThan(current) {
// Update cluster version
schema.NewMembershipBackend(c.lg, be).MustSaveClusterVersionToBackend(c.targetVersion)

// forcibly create a v2 snapshot file
// TODO: remove in 3.8
if err = createV2SnapshotFromV3Store(c.dataDir, be); err != nil {
c.lg.Error("Failed to create v2 snapshot file", zap.Error(err))
return err
}
c.lg.Info("Generated a v2 snapshot file")
}

if err = c.finalize(); err != nil {
c.lg.Error("Failed to finalize config", zap.Error(err))
return err
Expand All @@ -150,6 +164,7 @@ func migrateCommandFunc(c *migrateConfig) error {
c.lg.Info("normal migrate failed, trying with force", zap.Error(err))
migrateForce(c.lg, tx, c.targetVersion)
}

be.ForceCommit()
return nil
}
Expand Down
Loading

0 comments on commit e3fb899

Please sign in to comment.