Skip to content

Commit

Permalink
Updating to validate v2store and be for ConfChange
Browse files Browse the repository at this point in the history
Signed-off-by: Geeta Gharpure <[email protected]>
  • Loading branch information
Geeta Gharpure committed Aug 24, 2023
1 parent f377db2 commit 7caef10
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 8 deletions.
42 changes: 40 additions & 2 deletions server/etcdserver/api/membership/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type RaftCluster struct {

v2store v2store.Store
be MembershipBackend
rs *ReplayStore

sync.Mutex // guards the fields below
version *semver.Version
Expand Down Expand Up @@ -116,6 +117,7 @@ func NewCluster(lg *zap.Logger, opts ...ClusterOption) *RaftCluster {
removed: make(map[types.ID]bool),
downgradeInfo: &serverversion.DowngradeInfo{Enabled: false},
maxLearners: clOpts.maxLearners,
rs: NewReplayStore(lg),
}
}

Expand Down Expand Up @@ -303,9 +305,33 @@ func (c *RaftCluster) Recover(onSet func(*zap.Logger, *semver.Version)) {

// ValidateConfigurationChange takes a proposed ConfChange and
// ensures that it is still valid.
func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange, shouldApplyV3 ShouldApplyV3) error {
// TODO: this must be switched to backend as well.
membersMap, removedMap := membersFromStore(c.lg, c.v2store)
var membersMap map[types.ID]*Member
var removedMap map[types.ID]bool
if c.v2store != nil {
membersMap, removedMap = membersFromStore(c.lg, c.v2store)
if err := c.validateConfigurationChange(cc, membersMap, removedMap); err != nil {
return err
}
}

if c.be != nil && shouldApplyV3 {
membersMap, removedMap = c.be.MustReadMembersFromBackend()
if err := c.validateConfigurationChange(cc, membersMap, removedMap); err != nil {
return err
}
} else {
membersMap, removedMap = c.rs.Members()
if err := c.validateConfigurationChange(cc, membersMap, removedMap); err != nil {
return err
}
}

return nil
}

func (c *RaftCluster) validateConfigurationChange(cc raftpb.ConfChange, membersMap map[types.ID]*Member, removedMap map[types.ID]bool) error {
id := types.ID(cc.NodeID)
if removedMap[id] {
return ErrIDRemoved
Expand Down Expand Up @@ -395,6 +421,8 @@ func (c *RaftCluster) AddMember(m *Member, shouldApplyV3 ShouldApplyV3) {
}
if c.be != nil && shouldApplyV3 {
c.be.MustSaveMemberToBackend(m)
} else {
c.rs.AddMember(m)
}

c.members[m.ID] = m
Expand All @@ -420,6 +448,8 @@ func (c *RaftCluster) RemoveMember(id types.ID, shouldApplyV3 ShouldApplyV3) {
}
if c.be != nil && shouldApplyV3 {
c.be.MustDeleteMemberFromBackend(id)
} else {
c.rs.RemoveMember(id)
}

m, ok := c.members[id]
Expand Down Expand Up @@ -457,6 +487,8 @@ func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes, shouldApply
}
if c.be != nil && shouldApplyV3 {
c.be.MustSaveMemberToBackend(m)
} else {
c.rs.AddMember(m)
}
return
}
Expand Down Expand Up @@ -491,6 +523,8 @@ func (c *RaftCluster) PromoteMember(id types.ID, shouldApplyV3 ShouldApplyV3) {
}
if c.be != nil && shouldApplyV3 {
c.be.MustSaveMemberToBackend(c.members[id])
} else {
c.rs.AddMember(c.members[id])
}

c.lg.Info(
Expand All @@ -510,6 +544,8 @@ func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes,
}
if c.be != nil && shouldApplyV3 {
c.be.MustSaveMemberToBackend(c.members[id])
} else {
c.rs.AddMember(c.members[id])
}

c.lg.Info(
Expand Down Expand Up @@ -559,6 +595,8 @@ func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*zap.Logger, *s
}
if c.be != nil && shouldApplyV3 {
c.be.MustSaveClusterVersionToBackend(ver)
} else {
c.rs.SetVersion(ver)
}
if oldVer != nil {
ClusterVersionMetrics.With(prometheus.Labels{"cluster_version": version.Cluster(oldVer.String())}).Set(0)
Expand Down
25 changes: 23 additions & 2 deletions server/etcdserver/api/membership/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,8 @@ func TestClusterValidateAndAssignIDs(t *testing.T) {

func TestClusterValidateConfigurationChange(t *testing.T) {
cl := NewCluster(zaptest.NewLogger(t), WithMaxLearners(1))
be := newMembershipBackend()
cl.SetBackend(be)
cl.SetStore(v2store.New())
for i := 1; i <= 4; i++ {
var isLearner bool
Expand Down Expand Up @@ -455,7 +457,7 @@ func TestClusterValidateConfigurationChange(t *testing.T) {
},
}
for i, tt := range tests {
err := cl.ValidateConfigurationChange(tt.cc)
err := cl.ValidateConfigurationChange(tt.cc, true)
if err != tt.werr {
t.Errorf("#%d: validateConfigurationChange error = %v, want %v", i, err, tt.werr)
}
Expand Down Expand Up @@ -647,7 +649,8 @@ func TestNodeToMember(t *testing.T) {
}

func newTestCluster(t testing.TB, membs []*Member) *RaftCluster {
c := &RaftCluster{lg: zaptest.NewLogger(t), members: make(map[types.ID]*Member), removed: make(map[types.ID]bool)}
lg := zaptest.NewLogger(t)
c := &RaftCluster{lg: lg, members: make(map[types.ID]*Member), removed: make(map[types.ID]bool), rs: NewReplayStore(lg)}
for _, m := range membs {
c.members[m.ID] = m
}
Expand Down Expand Up @@ -1042,3 +1045,21 @@ func TestClusterStore(t *testing.T) {
})
}
}

func TestValidateConfigurationChange_AddMemberTwice(t *testing.T) {
// Create an initial cluster configuration with one member
cluster := newTestCluster(t, nil)
cluster.AddMember(newTestMember(1, nil, "node1", nil), false)

// The ValidateConfigurationChange function should detect duplicate addition regardless of backend consistent index(shouldApply is false).
ctx, err := json.Marshal(&ConfigChangeContext{Member: Member{ID: types.ID(1)}})
if err != nil {
t.Fatal(err)
}
cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1, Context: ctx}
if err := cluster.ValidateConfigurationChange(cc, false); err == nil {
t.Fatal("expected an error when adding the same member again, but got no error")
} else if err != ErrIDExists {
t.Fatalf("expected ErrIDExists, but got %v", err)
}
}
58 changes: 58 additions & 0 deletions server/etcdserver/api/membership/replay_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package membership

import (
"github.com/coreos/go-semver/semver"
"go.uber.org/zap"

"go.etcd.io/etcd/client/pkg/v3/types"
)

// ReplayStore represents a store for managing cluster members and their removal status.
type ReplayStore struct {
lg *zap.Logger
members map[types.ID]*Member
removed map[types.ID]bool
version *semver.Version
}

// NewReplayStore creates a new instance of ReplayStore.
func NewReplayStore(lg *zap.Logger) *ReplayStore {
return &ReplayStore{
lg: lg,
members: make(map[types.ID]*Member),
removed: make(map[types.ID]bool),
}
}

// AddMember adds a member to the ReplayStore. If it already exists, it gets updated.
func (rs *ReplayStore) AddMember(member *Member) {
rs.members[member.ID] = member
}

// RemoveMember marks a member as removed in the ReplayStore.
func (rs *ReplayStore) RemoveMember(memberID types.ID) {
rs.removed[memberID] = true
delete(rs.members, memberID)
}

func (rs *ReplayStore) Members() (map[types.ID]*Member, map[types.ID]bool) {
return rs.members, rs.removed
}

func (rs *ReplayStore) SetVersion(ver *semver.Version) {
rs.version = ver
}
2 changes: 1 addition & 1 deletion server/etcdserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1986,7 +1986,7 @@ func removeNeedlessRangeReqs(txn *pb.TxnRequest) {
// applyConfChange applies a ConfChange to the server. It is only
// invoked with a ConfChange that has already passed through Raft
func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState, shouldApplyV3 membership.ShouldApplyV3) (bool, error) {
if err := s.cluster.ValidateConfigurationChange(cc); err != nil {
if err := s.cluster.ValidateConfigurationChange(cc, shouldApplyV3); err != nil {
cc.NodeID = raft.None
s.r.ApplyConfChange(cc)

Expand Down
17 changes: 14 additions & 3 deletions server/etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,8 +514,14 @@ func TestApplyRequestOnAdminMemberAttributes(t *testing.T) {
}

func TestApplyConfChangeError(t *testing.T) {
cl := membership.NewCluster(zaptest.NewLogger(t))
lg := zaptest.NewLogger(t)
be, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, be)

cl := membership.NewCluster(lg)
cl.SetBackend(schema.NewMembershipBackend(lg, be))
cl.SetStore(v2store.New())

for i := 1; i <= 4; i++ {
cl.AddMember(&membership.Member{ID: types.ID(i)}, true)
}
Expand Down Expand Up @@ -602,8 +608,14 @@ func TestApplyConfChangeError(t *testing.T) {
}

func TestApplyConfChangeShouldStop(t *testing.T) {
cl := membership.NewCluster(zaptest.NewLogger(t))
lg := zaptest.NewLogger(t)
be, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, be)

cl := membership.NewCluster(lg)
cl.SetBackend(schema.NewMembershipBackend(lg, be))
cl.SetStore(v2store.New())

for i := 1; i <= 3; i++ {
cl.AddMember(&membership.Member{ID: types.ID(i)}, true)
}
Expand All @@ -612,7 +624,6 @@ func TestApplyConfChangeShouldStop(t *testing.T) {
Node: newNodeNop(),
transport: newNopTransporter(),
})
lg := zaptest.NewLogger(t)
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: lg,
Expand Down

0 comments on commit 7caef10

Please sign in to comment.