Skip to content

Commit

Permalink
Remove v2 apply logic
Browse files Browse the repository at this point in the history
v2 store is no longer available in v3.6.
We can remove apply logic for it as they will never be used.

Only v2 PUT is neeeded as it applies to v3 storage and etcd v3.5 uses it for setting member
attributes and cluster version.

Signed-off-by: Marek Siarkowicz <[email protected]>
  • Loading branch information
serathius committed Nov 22, 2023
1 parent fd0882b commit 2e2536c
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 308 deletions.
85 changes: 20 additions & 65 deletions server/etcdserver/apply_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,7 @@ const v2Version = "v2"

// ApplierV2 is the interface for processing V2 raft messages
type ApplierV2 interface {
Delete(r *RequestV2) Response
Post(r *RequestV2) Response
Put(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) Response
QGet(r *RequestV2) Response
Sync(r *RequestV2) Response
}

func NewApplierV2(lg *zap.Logger, s v2store.Store, c *membership.RaftCluster) ApplierV2 {
Expand All @@ -57,64 +53,28 @@ type applierV2store struct {
cluster *membership.RaftCluster
}

func (a *applierV2store) Delete(r *RequestV2) Response {
switch {
case r.PrevIndex > 0 || r.PrevValue != "":
return toResponse(a.store.CompareAndDelete(r.Path, r.PrevValue, r.PrevIndex))
default:
return toResponse(a.store.Delete(r.Path, r.Dir, r.Recursive))
}
}

func (a *applierV2store) Post(r *RequestV2) Response {
return toResponse(a.store.Create(r.Path, r.Dir, r.Val, true, r.TTLOptions()))
}

func (a *applierV2store) Put(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) Response {
ttlOptions := r.TTLOptions()
exists, existsSet := pbutil.GetBool(r.PrevExist)
switch {
case existsSet:
if exists {
if r.PrevIndex == 0 && r.PrevValue == "" {
return toResponse(a.store.Update(r.Path, r.Val, ttlOptions))
}
return toResponse(a.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions))
if storeMemberAttributeRegexp.MatchString(r.Path) {
id := membership.MustParseMemberIDFromKey(a.lg, path.Dir(r.Path))
var attr membership.Attributes
if err := json.Unmarshal([]byte(r.Val), &attr); err != nil {
a.lg.Panic("failed to unmarshal", zap.String("value", r.Val), zap.Error(err))
}
return toResponse(a.store.Create(r.Path, r.Dir, r.Val, false, ttlOptions))
case r.PrevIndex > 0 || r.PrevValue != "":
return toResponse(a.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions))
default:
if storeMemberAttributeRegexp.MatchString(r.Path) {
id := membership.MustParseMemberIDFromKey(a.lg, path.Dir(r.Path))
var attr membership.Attributes
if err := json.Unmarshal([]byte(r.Val), &attr); err != nil {
a.lg.Panic("failed to unmarshal", zap.String("value", r.Val), zap.Error(err))
}
if a.cluster != nil {
a.cluster.UpdateAttributes(id, attr, shouldApplyV3)
}
// return an empty response since there is no consumer.
return Response{}
if a.cluster != nil {
a.cluster.UpdateAttributes(id, attr, shouldApplyV3)
}
// TODO remove v2 version set to avoid the conflict between v2 and v3 in etcd 3.6
if r.Path == membership.StoreClusterVersionKey() {
if a.cluster != nil {
// persist to backend given v2store can be very stale
a.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)), api.UpdateCapability, shouldApplyV3)
}
return Response{}
// return an empty response since there is no consumer.
return Response{}
}
// TODO remove v2 version set to avoid the conflict between v2 and v3 in etcd 3.6
if r.Path == membership.StoreClusterVersionKey() {
if a.cluster != nil {
// persist to backend given v2store can be very stale
a.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)), api.UpdateCapability, shouldApplyV3)
}
return toResponse(a.store.Set(r.Path, r.Dir, r.Val, ttlOptions))
return Response{}
}
}

func (a *applierV2store) QGet(r *RequestV2) Response {
return toResponse(a.store.Get(r.Path, r.Recursive, r.Sorted))
}

func (a *applierV2store) Sync(r *RequestV2) Response {
a.store.DeleteExpiredKeys(time.Unix(0, r.Time))
a.lg.Panic("Encountered no longer supported v2 WAL entry", zap.String("method", r.Method))
return Response{}
}

Expand All @@ -136,16 +96,11 @@ func (s *EtcdServer) applyV2Request(r *RequestV2, shouldApplyV3 membership.Shoul
}(time.Now())

switch r.Method {
case "POST":
return s.applyV2.Post(r)
case "PUT":
return s.applyV2.Put(r, shouldApplyV3)
case "DELETE":
return s.applyV2.Delete(r)
case "QGET":
return s.applyV2.QGet(r)
case "SYNC":
return s.applyV2.Sync(r)
case "POST", "DELETE", "QGET", "SYNC":
s.lg.Panic("Encountered no longer supported v2 WAL entry", zap.String("method", r.Method))
return Response{}
default:
// This should never be reached, but just in case:
return Response{Err: errors.ErrUnknownMethod}
Expand Down
243 changes: 0 additions & 243 deletions server/etcdserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,249 +133,6 @@ func TestApplyRepeat(t *testing.T) {
}
}

func TestApplyRequest(t *testing.T) {
tests := []struct {
req pb.Request

wresp Response
wactions []testutil.Action
}{
// POST ==> Create
{
pb.Request{Method: "POST", ID: 1},
Response{Event: &v2store.Event{}},
[]testutil.Action{
{
Name: "Create",
Params: []any{"", false, "", true, v2store.TTLOptionSet{ExpireTime: time.Time{}}},
},
},
},
// POST ==> Create, with expiration
{
pb.Request{Method: "POST", ID: 1, Expiration: 1337},
Response{Event: &v2store.Event{}},
[]testutil.Action{
{
Name: "Create",
Params: []any{"", false, "", true, v2store.TTLOptionSet{ExpireTime: time.Unix(0, 1337)}},
},
},
},
// POST ==> Create, with dir
{
pb.Request{Method: "POST", ID: 1, Dir: true},
Response{Event: &v2store.Event{}},
[]testutil.Action{
{
Name: "Create",
Params: []any{"", true, "", true, v2store.TTLOptionSet{ExpireTime: time.Time{}}},
},
},
},
// PUT ==> Set
{
pb.Request{Method: "PUT", ID: 1},
Response{Event: &v2store.Event{}},
[]testutil.Action{
{
Name: "Set",
Params: []any{"", false, "", v2store.TTLOptionSet{ExpireTime: time.Time{}}},
},
},
},
// PUT ==> Set, with dir
{
pb.Request{Method: "PUT", ID: 1, Dir: true},
Response{Event: &v2store.Event{}},
[]testutil.Action{
{
Name: "Set",
Params: []any{"", true, "", v2store.TTLOptionSet{ExpireTime: time.Time{}}},
},
},
},
// PUT with PrevExist=true ==> Update
{
pb.Request{Method: "PUT", ID: 1, PrevExist: pbutil.Boolp(true)},
Response{Event: &v2store.Event{}},
[]testutil.Action{
{
Name: "Update",
Params: []any{"", "", v2store.TTLOptionSet{ExpireTime: time.Time{}}},
},
},
},
// PUT with PrevExist=false ==> Create
{
pb.Request{Method: "PUT", ID: 1, PrevExist: pbutil.Boolp(false)},
Response{Event: &v2store.Event{}},
[]testutil.Action{
{
Name: "Create",
Params: []any{"", false, "", false, v2store.TTLOptionSet{ExpireTime: time.Time{}}},
},
},
},
// PUT with PrevExist=true *and* PrevIndex set ==> CompareAndSwap
{
pb.Request{Method: "PUT", ID: 1, PrevExist: pbutil.Boolp(true), PrevIndex: 1},
Response{Event: &v2store.Event{}},
[]testutil.Action{
{
Name: "CompareAndSwap",
Params: []any{"", "", uint64(1), "", v2store.TTLOptionSet{ExpireTime: time.Time{}}},
},
},
},
// PUT with PrevExist=false *and* PrevIndex set ==> Create
{
pb.Request{Method: "PUT", ID: 1, PrevExist: pbutil.Boolp(false), PrevIndex: 1},
Response{Event: &v2store.Event{}},
[]testutil.Action{
{
Name: "Create",
Params: []any{"", false, "", false, v2store.TTLOptionSet{ExpireTime: time.Time{}}},
},
},
},
// PUT with PrevIndex set ==> CompareAndSwap
{
pb.Request{Method: "PUT", ID: 1, PrevIndex: 1},
Response{Event: &v2store.Event{}},
[]testutil.Action{
{
Name: "CompareAndSwap",
Params: []any{"", "", uint64(1), "", v2store.TTLOptionSet{ExpireTime: time.Time{}}},
},
},
},
// PUT with PrevValue set ==> CompareAndSwap
{
pb.Request{Method: "PUT", ID: 1, PrevValue: "bar"},
Response{Event: &v2store.Event{}},
[]testutil.Action{
{
Name: "CompareAndSwap",
Params: []any{"", "bar", uint64(0), "", v2store.TTLOptionSet{ExpireTime: time.Time{}}},
},
},
},
// PUT with PrevIndex and PrevValue set ==> CompareAndSwap
{
pb.Request{Method: "PUT", ID: 1, PrevIndex: 1, PrevValue: "bar"},
Response{Event: &v2store.Event{}},
[]testutil.Action{
{
Name: "CompareAndSwap",
Params: []any{"", "bar", uint64(1), "", v2store.TTLOptionSet{ExpireTime: time.Time{}}},
},
},
},
// DELETE ==> Delete
{
pb.Request{Method: "DELETE", ID: 1},
Response{Event: &v2store.Event{}},
[]testutil.Action{
{
Name: "Delete",
Params: []any{"", false, false},
},
},
},
// DELETE with PrevIndex set ==> CompareAndDelete
{
pb.Request{Method: "DELETE", ID: 1, PrevIndex: 1},
Response{Event: &v2store.Event{}},
[]testutil.Action{
{
Name: "CompareAndDelete",
Params: []any{"", "", uint64(1)},
},
},
},
// DELETE with PrevValue set ==> CompareAndDelete
{
pb.Request{Method: "DELETE", ID: 1, PrevValue: "bar"},
Response{Event: &v2store.Event{}},
[]testutil.Action{
{
Name: "CompareAndDelete",
Params: []any{"", "bar", uint64(0)},
},
},
},
// DELETE with PrevIndex *and* PrevValue set ==> CompareAndDelete
{
pb.Request{Method: "DELETE", ID: 1, PrevIndex: 5, PrevValue: "bar"},
Response{Event: &v2store.Event{}},
[]testutil.Action{
{
Name: "CompareAndDelete",
Params: []any{"", "bar", uint64(5)},
},
},
},
// QGET ==> Get
{
pb.Request{Method: "QGET", ID: 1},
Response{Event: &v2store.Event{}},
[]testutil.Action{
{
Name: "Get",
Params: []any{"", false, false},
},
},
},
// SYNC ==> DeleteExpiredKeys
{
pb.Request{Method: "SYNC", ID: 1},
Response{},
[]testutil.Action{
{
Name: "DeleteExpiredKeys",
Params: []any{time.Unix(0, 0)},
},
},
},
{
pb.Request{Method: "SYNC", ID: 1, Time: 12345},
Response{},
[]testutil.Action{
{
Name: "DeleteExpiredKeys",
Params: []any{time.Unix(0, 12345)},
},
},
},
// Unknown method - error
{
pb.Request{Method: "BADMETHOD", ID: 1},
Response{Err: errors.ErrUnknownMethod},
[]testutil.Action{},
},
}

for i, tt := range tests {
st := mockstore.NewRecorder()
srv := &EtcdServer{
lgMu: new(sync.RWMutex),
lg: zaptest.NewLogger(t),
v2store: st,
}
srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
resp := srv.applyV2Request((*RequestV2)(&tt.req), membership.ApplyBoth)

if !reflect.DeepEqual(resp, tt.wresp) {
t.Errorf("#%d: resp = %+v, want %+v", i, resp, tt.wresp)
}
gaction := st.Action()
if !reflect.DeepEqual(gaction, tt.wactions) {
t.Errorf("#%d: action = %#v, want %#v", i, gaction, tt.wactions)
}
}
}

func TestApplyRequestOnAdminMemberAttributes(t *testing.T) {
be, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, be)
Expand Down

0 comments on commit 2e2536c

Please sign in to comment.