Skip to content

Commit

Permalink
Create a v2 snapshot when running etcdutl migrate command
Browse files Browse the repository at this point in the history
Also added test to cover the etcdutl migrate command

Signed-off-by: Benjamin Wang <[email protected]>
  • Loading branch information
ahrtr committed Jan 15, 2025
1 parent ce4b4e5 commit 099e356
Show file tree
Hide file tree
Showing 6 changed files with 521 additions and 172 deletions.
90 changes: 90 additions & 0 deletions etcdutl/etcdutl/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,19 @@ package etcdutl

import (
"errors"
"fmt"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"go.etcd.io/etcd/client/pkg/v3/logutil"
"go.etcd.io/etcd/pkg/v3/cobrautl"
"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
"go.etcd.io/etcd/server/v3/storage/backend"
"go.etcd.io/etcd/server/v3/storage/datadir"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.etcd.io/etcd/server/v3/storage/wal"
"go.etcd.io/etcd/server/v3/storage/wal/walpb"
"go.etcd.io/raft/v3/raftpb"
Expand Down Expand Up @@ -68,3 +73,88 @@ func getLatestV2Snapshot(lg *zap.Logger, dataDir string) (*raftpb.Snapshot, erro

return snapshot, nil
}

func createV2SnapshotFromV3Store(dataDir string, be backend.Backend) error {
var (
lg = GetLogger()

snapDir = datadir.ToSnapDir(dataDir)
walDir = datadir.ToWALDir(dataDir)
)

ci, term := schema.ReadConsistentIndex(be.ReadTx())

cl := membership.NewCluster(lg)
cl.SetBackend(schema.NewMembershipBackend(lg, be))
cl.UnsafeLoad()

latestWALSnap, err := getLatestWALSnap(lg, dataDir)
if err != nil {
return err
}

// Each time before creating the v2 snapshot, etcdserve always flush
// the backend storage (bbolt db), so the consistent index should never
// less than the Index or term of the latest snapshot.
if ci < latestWALSnap.Index || term < latestWALSnap.Term {
// This should never happen
return fmt.Errorf("consistent_index [Index: %d, Term: %d] is less than the latest snapshot [Index: %d, Term: %d]", ci, term, latestWALSnap.Index, latestWALSnap.Term)
}

voters, learners := getVotersAndLearners(cl)
confState := raftpb.ConfState{
Voters: voters,
Learners: learners,
}

// create the v2 snaspshot file
raftSnap := raftpb.Snapshot{
Data: etcdserver.GetMembershipInfoInV2Format(lg, cl),
Metadata: raftpb.SnapshotMetadata{
Index: ci,
Term: term,
ConfState: confState,
},
}
sn := snap.New(lg, snapDir)
if err = sn.SaveSnap(raftSnap); err != nil {
return err
}

// save WAL snapshot record
w, err := wal.Open(lg, walDir, latestWALSnap)
if err != nil {
return err
}
defer w.Close()
// We must read all records to locate the tail of the last valid WAL file.
_, st, _, err := w.ReadAll()
if err != nil {
return err
}

if err := w.SaveSnapshot(walpb.Snapshot{Index: ci, Term: term, ConfState: &confState}); err != nil {
return err
}
if err := w.Save(raftpb.HardState{Term: term, Commit: ci, Vote: st.Vote}, nil); err != nil {
return err
}
return w.Sync()
}

func getVotersAndLearners(cl *membership.RaftCluster) ([]uint64, []uint64) {
var (
voters []uint64
learners []uint64
)
for _, m := range cl.Members() {
if m.IsLearner {
learners = append(learners, uint64(m.ID))
continue
}

voters = append(voters, uint64(m.ID))
}

return voters, learners
}
Loading

0 comments on commit 099e356

Please sign in to comment.