Skip to content

Commit

Permalink
v2 etcdctl backup: producing consistent state of membership
Browse files Browse the repository at this point in the history
  • Loading branch information
ptabor committed Apr 2, 2021
1 parent e2fdc2f commit 4843d3e
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 40 deletions.
134 changes: 94 additions & 40 deletions etcdctl/ctlv2/command/backup_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package command

import (
"encoding/binary"
"log"
"os"
"path"
Expand All @@ -27,9 +26,13 @@ import (
"go.etcd.io/etcd/pkg/v3/fileutil"
"go.etcd.io/etcd/pkg/v3/idutil"
"go.etcd.io/etcd/pkg/v3/pbutil"
"go.etcd.io/etcd/pkg/v3/types"
"go.etcd.io/etcd/raft/v3/raftpb"
"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
"go.etcd.io/etcd/server/v3/etcdserver/api/snap"
"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
"go.etcd.io/etcd/server/v3/etcdserver/cindex"
"go.etcd.io/etcd/server/v3/mvcc/backend"
"go.etcd.io/etcd/server/v3/wal"
"go.etcd.io/etcd/server/v3/wal/walpb"

Expand All @@ -54,11 +57,42 @@ func NewBackupCommand() cli.Command {
}
}

type desiredCluster struct {
clusterId types.ID
nodeId types.ID
members []*membership.Member
confState raftpb.ConfState
}

func newDesiredCluster() desiredCluster {
idgen := idutil.NewGenerator(0, time.Now())
nodeID := idgen.Next()
clusterID := idgen.Next()

return desiredCluster{
clusterId: types.ID(clusterID),
nodeId: types.ID(nodeID),
members: []*membership.Member{
{
ID: types.ID(nodeID),
Attributes: membership.Attributes{
Name: "etcdctl-backup",
ClientURLs: []string{"http://use-flag--force-new-cluster:2080"},
},
RaftAttributes: membership.RaftAttributes{
PeerURLs: []string{"http://use-flag--force-new-cluster:2080"},
}}},
confState: raftpb.ConfState{Voters: []uint64{nodeID}},
}
}

// handleBackup handles a request that intends to do a backup.
func handleBackup(c *cli.Context) error {
var srcWAL string
var destWAL string

lg := zap.NewExample()

withV3 := c.Bool("with-v3")
srcSnap := filepath.Join(c.String("data-dir"), "member", "snap")
destSnap := filepath.Join(c.String("backup-dir"), "member", "snap")
Expand All @@ -79,13 +113,12 @@ func handleBackup(c *cli.Context) error {
log.Fatalf("failed creating backup snapshot dir %v: %v", destSnap, err)
}

walsnap := saveSnap(destSnap, srcSnap)
metadata, state, ents := loadWAL(srcWAL, walsnap, withV3)
saveDB(filepath.Join(destSnap, "db"), filepath.Join(srcSnap, "db"), state.Commit, withV3)
desired := newDesiredCluster()

idgen := idutil.NewGenerator(0, time.Now())
metadata.NodeID = idgen.Next()
metadata.ClusterID = idgen.Next()
walsnap := saveSnap(lg, destSnap, srcSnap, &desired)
metadata, state, ents := loadWAL(srcWAL, walsnap, withV3)
destDbPath := filepath.Join(destSnap, "db")
saveDB(lg, destDbPath, filepath.Join(srcSnap, "db"), state.Commit, &desired, withV3)

neww, err := wal.Create(zap.NewExample(), destWAL, pbutil.MustMarshal(&metadata))
if err != nil {
Expand All @@ -102,22 +135,42 @@ func handleBackup(c *cli.Context) error {
return nil
}

func saveSnap(destSnap, srcSnap string) (walsnap walpb.Snapshot) {
ss := snap.New(zap.NewExample(), srcSnap)
func saveSnap(lg *zap.Logger, destSnap, srcSnap string, desired *desiredCluster) (walsnap walpb.Snapshot) {
ss := snap.New(lg, srcSnap)
snapshot, err := ss.Load()
if err != nil && err != snap.ErrNoSnapshot {
log.Fatal(err)
}
if snapshot != nil {
walsnap.Index, walsnap.Term, walsnap.ConfState = snapshot.Metadata.Index, snapshot.Metadata.Term, &snapshot.Metadata.ConfState
newss := snap.New(zap.NewExample(), destSnap)
walsnap.Index, walsnap.Term, walsnap.ConfState = snapshot.Metadata.Index, snapshot.Metadata.Term, &desired.confState
newss := snap.New(lg, destSnap)
snapshot.Metadata.ConfState = desired.confState
snapshot.Data = mustTranslateV2store(lg, snapshot.Data, desired)
if err = newss.SaveSnap(*snapshot); err != nil {
log.Fatal(err)
}
}
return walsnap
}

func mustTranslateV2store(lg *zap.Logger, storeData []byte, desired *desiredCluster) []byte {
st := v2store.New()
if err := st.Recovery(storeData); err != nil {
lg.Panic("cannot translate v2store", zap.Error(err))
}

raftCluster := membership.NewClusterFromMembers(lg, desired.clusterId, desired.members)
raftCluster.SetID(desired.nodeId, desired.clusterId)
raftCluster.SetStore(st)
raftCluster.PushToStorage()

outputData, err := st.Save()
if err != nil {
lg.Panic("cannot save v2store", zap.Error(err))
}
return outputData
}

func loadWAL(srcWAL string, walsnap walpb.Snapshot, v3 bool) (etcdserverpb.Metadata, raftpb.HardState, []raftpb.Entry) {
w, err := wal.OpenForRead(zap.NewExample(), srcWAL, walsnap)
if err != nil {
Expand Down Expand Up @@ -145,6 +198,7 @@ func loadWAL(srcWAL string, walsnap walpb.Snapshot, v3 bool) (etcdserverpb.Metad
i--
}
for i = 0; i < len(ents); i++ {
log.Printf("PROCESSING: %v\n\n", ents[i])
ents[i].Index -= removed
if ents[i].Type == raftpb.EntryConfChange {
log.Println("ignoring EntryConfChange raft entry")
Expand All @@ -161,6 +215,8 @@ func loadWAL(srcWAL string, walsnap walpb.Snapshot, v3 bool) (etcdserverpb.Metad
pbutil.MustUnmarshal(v2Req, ents[i].Data)
}

log.Printf("Req: %v\n\n%v\n\n", raftReq, v2Req)

if v2Req != nil && v2Req.Method == "PUT" && memberAttrRE.MatchString(v2Req.Path) {
log.Println("ignoring member attribute update on", v2Req.Path)
remove()
Expand All @@ -184,7 +240,9 @@ func loadWAL(srcWAL string, walsnap walpb.Snapshot, v3 bool) (etcdserverpb.Metad
}

// saveDB copies the v3 backend and strips cluster information.
func saveDB(destDB, srcDB string, idx uint64, v3 bool) {
func saveDB(lg *zap.Logger, destDB, srcDB string, idx uint64, desired *desiredCluster, v3 bool) {
//lg := zap.NewExample()

// open src db to safely copy db state
if v3 {
var src *bolt.DB
Expand Down Expand Up @@ -223,37 +281,33 @@ func saveDB(destDB, srcDB string, idx uint64, v3 bool) {
}
}

db, err := bolt.Open(destDB, 0644, &bolt.Options{})
if err != nil {
log.Fatal(err)
}
tx, err := db.Begin(true)
if err != nil {
log.Fatal(err)
}
be := backend.NewDefaultBackend(destDB)
defer be.Close()

// remove membership information; should be clobbered by --force-new-cluster
// TODO: Consider refactoring to use backend.Backend instead of bolt
// and membership.TrimMembershipFromBackend.
for _, bucket := range []string{"members", "members_removed", "cluster"} {
tx.DeleteBucket([]byte(bucket))
}
raftCluster := membership.NewClusterFromMembers(lg, desired.clusterId, desired.members)
raftCluster.SetID(desired.nodeId, desired.clusterId)
raftCluster.SetBackend(be)
raftCluster.PushToStorage()

log.Printf("\n TRIMMED !!!! \n\n\n\n\n")

//if err := membership.TrimClusterFromBackend(be); err != nil {
// log.Fatal(err)
//}

// update consistent index to match hard state
if !v3 {
idxBytes := make([]byte, 8)
binary.BigEndian.PutUint64(idxBytes, idx)
b, err := tx.CreateBucketIfNotExists([]byte("meta"))
if err != nil {
log.Fatal(err)
}
b.Put([]byte("consistent_index"), idxBytes)
}
tx := be.BatchTx()
tx.Lock()
defer tx.Unlock()
log.Printf("\n CINDEX !!!! \n\n\n\n\n")
tx.UnsafeCreateBucket([]byte("meta"))
ci := cindex.NewConsistentIndex(tx)
log.Printf("\n CINDEX START !!!! \n\n\n\n\n")

if err := tx.Commit(); err != nil {
log.Fatal(err)
}
if err := db.Close(); err != nil {
log.Fatal(err)
ci.SetConsistentIndex(idx)
log.Printf("\n CINDEX SET !!!! \n\n\n\n\n")
ci.UnsafeSave(tx)
log.Printf("\n CINDEX SAVE !!!! \n\n\n\n\n")
}

}
15 changes: 15 additions & 0 deletions server/etcdserver/api/membership/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -854,3 +854,18 @@ func (c *RaftCluster) VotingMemberIDs() []types.ID {
sort.Sort(types.IDSlice(ids))
return ids
}

func (c *RaftCluster) PushToStorage() {
if c.be != nil {
TrimMembershipFromBackend(c.lg, c.be)
for _, m := range c.members {
mustSaveMemberToBackend(c.lg, c.be, m)
}
}
if c.v2store != nil {
TrimMembershipFromV2Store(c.lg, c.v2store)
for _, m := range c.members {
mustSaveMemberToStore(c.lg, c.v2store, m)
}
}
}
19 changes: 19 additions & 0 deletions server/etcdserver/api/membership/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,25 @@ func TrimMembershipFromBackend(lg *zap.Logger, be backend.Backend) error {
return nil
}

func TrimMembershipFromV2Store(lg *zap.Logger, s v2store.Store) error {
members, removed := membersFromStore(lg, s)

for mID := range members {
_, err := s.Delete(MemberStoreKey(mID), true, true)
if err != nil {
return err
}
}
for mID := range removed {
_, err := s.Delete(RemovedMemberStoreKey(mID), true, true)
if err != nil {
return err
}
}

return nil
}

func mustSaveClusterVersionToBackend(be backend.Backend, ver *semver.Version) {
ckey := backendClusterVersionKey()

Expand Down
4 changes: 4 additions & 0 deletions server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,8 @@ func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
}
}
for _, m := range cl.Members() {
mjson, _ := json.Marshal(m)
cfg.Logger.Info("MEMBER", zap.String("json", string(mjson)))
if m.ID != id {
tr.AddPeer(m.ID, m.PeerURLs)
}
Expand Down Expand Up @@ -2084,6 +2086,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
var r pb.Request
rp := &r
pbutil.MustUnmarshal(rp, e.Data)
s.lg.Info("Applying: ", zap.Stringer("rp", rp))
s.w.Trigger(r.ID, s.applyV2Request((*RequestV2)(rp)))
return
}
Expand Down Expand Up @@ -2150,6 +2153,7 @@ func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.Con

lg := s.Logger()
*confState = *s.r.ApplyConfChange(cc)
lg.Info("Applying conf change.", zap.Stringer("post-conf-state", confState), zap.Stringer("cc", &cc))
switch cc.Type {
case raftpb.ConfChangeAddNode, raftpb.ConfChangeAddLearnerNode:
confChangeContext := new(membership.ConfigChangeContext)
Expand Down

0 comments on commit 4843d3e

Please sign in to comment.