Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove context from appliers #18677

Merged
merged 1 commit into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 19 additions & 19 deletions server/etcdserver/apply/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ type applierV3 interface {
// delegates the actual execution to the applyFunc method.
Apply(r *pb.InternalRaftRequest, applyFunc applyFunc) *Result

Put(ctx context.Context, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error)
Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, *traceutil.Trace, error)
DeleteRange(ctx context.Context, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, *traceutil.Trace, error)
Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error)
Put(p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error)
Range(r *pb.RangeRequest) (*pb.RangeResponse, *traceutil.Trace, error)
DeleteRange(dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, *traceutil.Trace, error)
Txn(rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error)
Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error)

LeaseGrant(lc *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error)
Expand Down Expand Up @@ -150,20 +150,20 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest, applyFunc applyFunc)
return applyFunc(r)
}

func (a *applierV3backend) Put(ctx context.Context, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
return mvcctxn.Put(ctx, a.lg, a.lessor, a.kv, p)
func (a *applierV3backend) Put(p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
return mvcctxn.Put(context.TODO(), a.lg, a.lessor, a.kv, p)
}

func (a *applierV3backend) DeleteRange(ctx context.Context, dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, *traceutil.Trace, error) {
return mvcctxn.DeleteRange(ctx, a.lg, a.kv, dr)
func (a *applierV3backend) DeleteRange(dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, *traceutil.Trace, error) {
return mvcctxn.DeleteRange(context.TODO(), a.lg, a.kv, dr)
}

func (a *applierV3backend) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, *traceutil.Trace, error) {
return mvcctxn.Range(ctx, a.lg, a.kv, r)
func (a *applierV3backend) Range(r *pb.RangeRequest) (*pb.RangeResponse, *traceutil.Trace, error) {
return mvcctxn.Range(context.TODO(), a.lg, a.kv, r)
}

func (a *applierV3backend) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
return mvcctxn.Txn(ctx, a.lg, rt, a.txnModeWriteWithSharedBuffer, a.kv, a.lessor)
func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
return mvcctxn.Txn(context.TODO(), a.lg, rt, a.txnModeWriteWithSharedBuffer, a.kv, a.lessor)
}

func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) {
Expand Down Expand Up @@ -248,15 +248,15 @@ type applierV3Capped struct {
// with Puts so that the number of keys in the store is capped.
func newApplierV3Capped(base applierV3) applierV3 { return &applierV3Capped{applierV3: base} }

func (a *applierV3Capped) Put(_ context.Context, _ *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
func (a *applierV3Capped) Put(_ *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
return nil, nil, errors.ErrNoSpace
}

func (a *applierV3Capped) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
func (a *applierV3Capped) Txn(r *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
if a.q.Cost(r) > 0 {
return nil, nil, errors.ErrNoSpace
}
return a.applierV3.Txn(ctx, r)
return a.applierV3.Txn(r)
}

func (a *applierV3Capped) LeaseGrant(_ *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
Expand Down Expand Up @@ -454,18 +454,18 @@ func newQuotaApplierV3(lg *zap.Logger, quotaBackendBytesCfg int64, be backend.Ba
return &quotaApplierV3{app, serverstorage.NewBackendQuota(lg, quotaBackendBytesCfg, be, "v3-applier")}
}

func (a *quotaApplierV3) Put(ctx context.Context, p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
func (a *quotaApplierV3) Put(p *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
ok := a.q.Available(p)
resp, trace, err := a.applierV3.Put(ctx, p)
resp, trace, err := a.applierV3.Put(p)
if err == nil && !ok {
err = errors.ErrNoSpace
}
return resp, trace, err
}

func (a *quotaApplierV3) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
func (a *quotaApplierV3) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
ok := a.q.Available(rt)
resp, trace, err := a.applierV3.Txn(ctx, rt)
resp, trace, err := a.applierV3.Txn(rt)
if err == nil && !ok {
err = errors.ErrNoSpace
}
Expand Down
17 changes: 8 additions & 9 deletions server/etcdserver/apply/apply_auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package apply

import (
"context"
"sync"

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
Expand Down Expand Up @@ -63,7 +62,7 @@ func (aa *authApplierV3) Apply(r *pb.InternalRaftRequest, applyFunc applyFunc) *
return ret
}

func (aa *authApplierV3) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
func (aa *authApplierV3) Put(r *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
if err := aa.as.IsPutPermitted(&aa.authInfo, r.Key); err != nil {
return nil, nil, err
}
Expand All @@ -82,17 +81,17 @@ func (aa *authApplierV3) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResp
return nil, nil, err
}
}
return aa.applierV3.Put(ctx, r)
return aa.applierV3.Put(r)
}

func (aa *authApplierV3) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, *traceutil.Trace, error) {
func (aa *authApplierV3) Range(r *pb.RangeRequest) (*pb.RangeResponse, *traceutil.Trace, error) {
if err := aa.as.IsRangePermitted(&aa.authInfo, r.Key, r.RangeEnd); err != nil {
return nil, nil, err
}
return aa.applierV3.Range(ctx, r)
return aa.applierV3.Range(r)
}

func (aa *authApplierV3) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, *traceutil.Trace, error) {
func (aa *authApplierV3) DeleteRange(r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, *traceutil.Trace, error) {
if err := aa.as.IsDeleteRangePermitted(&aa.authInfo, r.Key, r.RangeEnd); err != nil {
return nil, nil, err
}
Expand All @@ -103,14 +102,14 @@ func (aa *authApplierV3) DeleteRange(ctx context.Context, r *pb.DeleteRangeReque
}
}

return aa.applierV3.DeleteRange(ctx, r)
return aa.applierV3.DeleteRange(r)
}

func (aa *authApplierV3) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
func (aa *authApplierV3) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
if err := txn.CheckTxnAuth(aa.as, &aa.authInfo, rt); err != nil {
return nil, nil, err
}
return aa.applierV3.Txn(ctx, rt)
return aa.applierV3.Txn(rt)
}

func (aa *authApplierV3) LeaseRevoke(lc *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
Expand Down
27 changes: 8 additions & 19 deletions server/etcdserver/apply/apply_auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package apply

import (
"context"
"errors"
"testing"
"time"
Expand Down Expand Up @@ -437,12 +436,10 @@ func TestAuthApplierV3_Put(t *testing.T) {

authApplier := defaultAuthApplierV3(t)
mustCreateRolesAndEnableAuth(t, authApplier)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
setAuthInfo(authApplier, tc.userName)
_, _, err := authApplier.Put(ctx, tc.request)
_, _, err := authApplier.Put(tc.request)
require.Equalf(t, tc.expectError, err, "Put returned unexpected error (or lack thereof), expected: %v, got: %v", tc.expectError, err)
})
}
Expand All @@ -452,8 +449,6 @@ func TestAuthApplierV3_Put(t *testing.T) {
func TestAuthApplierV3_LeasePut(t *testing.T) {
authApplier := defaultAuthApplierV3(t)
mustCreateRolesAndEnableAuth(t, authApplier)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

_, err := authApplier.LeaseGrant(&pb.LeaseGrantRequest{
TTL: lease.MaxLeaseTTL,
Expand All @@ -463,7 +458,7 @@ func TestAuthApplierV3_LeasePut(t *testing.T) {

// The user should be able to put the key
setAuthInfo(authApplier, userWriteOnly)
_, _, err = authApplier.Put(ctx, &pb.PutRequest{
_, _, err = authApplier.Put(&pb.PutRequest{
Key: []byte(key),
Value: []byte("1"),
Lease: leaseID,
Expand All @@ -472,7 +467,7 @@ func TestAuthApplierV3_LeasePut(t *testing.T) {

// Put a key under the lease outside user's key range
setAuthInfo(authApplier, userRoot)
_, _, err = authApplier.Put(ctx, &pb.PutRequest{
_, _, err = authApplier.Put(&pb.PutRequest{
Key: []byte(keyOutsideRange),
Value: []byte("1"),
Lease: leaseID,
Expand All @@ -481,7 +476,7 @@ func TestAuthApplierV3_LeasePut(t *testing.T) {

// The user should not be able to put the key anymore
setAuthInfo(authApplier, userWriteOnly)
_, _, err = authApplier.Put(ctx, &pb.PutRequest{
_, _, err = authApplier.Put(&pb.PutRequest{
Key: []byte(key),
Value: []byte("1"),
Lease: leaseID,
Expand Down Expand Up @@ -524,12 +519,10 @@ func TestAuthApplierV3_Range(t *testing.T) {

authApplier := defaultAuthApplierV3(t)
mustCreateRolesAndEnableAuth(t, authApplier)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
setAuthInfo(authApplier, tc.userName)
_, _, err := authApplier.Range(ctx, tc.request)
_, _, err := authApplier.Range(tc.request)
require.Equalf(t, tc.expectError, err, "Range returned unexpected error (or lack thereof), expected: %v, got: %v", tc.expectError, err)
})
}
Expand Down Expand Up @@ -593,7 +586,7 @@ func TestAuthApplierV3_DeleteRange(t *testing.T) {
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
setAuthInfo(authApplier, tc.userName)
_, _, err := authApplier.DeleteRange(context.Background(), tc.request)
_, _, err := authApplier.DeleteRange(tc.request)
require.Equalf(t, tc.expectError, err, "Range returned unexpected error (or lack thereof), expected: %v, got: %v", tc.expectError, err)
})
}
Expand Down Expand Up @@ -660,12 +653,10 @@ func TestAuthApplierV3_Txn(t *testing.T) {

authApplier := defaultAuthApplierV3(t)
mustCreateRolesAndEnableAuth(t, authApplier)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
setAuthInfo(authApplier, tc.userName)
_, _, err := authApplier.Txn(ctx, tc.request)
_, _, err := authApplier.Txn(tc.request)
require.Equalf(t, tc.expectError, err, "Range returned unexpected error (or lack thereof), expected: %v, got: %v", tc.expectError, err)
})
}
Expand All @@ -676,8 +667,6 @@ func TestAuthApplierV3_Txn(t *testing.T) {
func TestAuthApplierV3_LeaseRevoke(t *testing.T) {
authApplier := defaultAuthApplierV3(t)
mustCreateRolesAndEnableAuth(t, authApplier)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

_, err := authApplier.LeaseGrant(&pb.LeaseGrantRequest{
TTL: lease.MaxLeaseTTL,
Expand All @@ -700,7 +689,7 @@ func TestAuthApplierV3_LeaseRevoke(t *testing.T) {

// Put a key under the lease outside user's key range
setAuthInfo(authApplier, userRoot)
_, _, err = authApplier.Put(ctx, &pb.PutRequest{
_, _, err = authApplier.Put(&pb.PutRequest{
Key: []byte(keyOutsideRange),
Value: []byte("1"),
Lease: leaseID,
Expand Down
10 changes: 4 additions & 6 deletions server/etcdserver/apply/corrupt.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
package apply

import (
"context"

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/pkg/v3/traceutil"
"go.etcd.io/etcd/server/v3/etcdserver/errors"
Expand All @@ -28,19 +26,19 @@ type applierV3Corrupt struct {

func newApplierV3Corrupt(a applierV3) *applierV3Corrupt { return &applierV3Corrupt{a} }

func (a *applierV3Corrupt) Put(_ context.Context, _ *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
func (a *applierV3Corrupt) Put(_ *pb.PutRequest) (*pb.PutResponse, *traceutil.Trace, error) {
return nil, nil, errors.ErrCorrupt
}

func (a *applierV3Corrupt) Range(_ context.Context, _ *pb.RangeRequest) (*pb.RangeResponse, *traceutil.Trace, error) {
func (a *applierV3Corrupt) Range(_ *pb.RangeRequest) (*pb.RangeResponse, *traceutil.Trace, error) {
return nil, nil, errors.ErrCorrupt
}

func (a *applierV3Corrupt) DeleteRange(_ context.Context, _ *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, *traceutil.Trace, error) {
func (a *applierV3Corrupt) DeleteRange(_ *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, *traceutil.Trace, error) {
return nil, nil, errors.ErrCorrupt
}

func (a *applierV3Corrupt) Txn(_ context.Context, _ *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
func (a *applierV3Corrupt) Txn(_ *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
return nil, nil, errors.ErrCorrupt
}

Expand Down
10 changes: 4 additions & 6 deletions server/etcdserver/apply/uber_applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package apply

import (
"context"
"errors"
"time"

Expand Down Expand Up @@ -121,7 +120,6 @@ func (a *uberApplier) Apply(r *pb.InternalRaftRequest) *Result {
// dispatch translates the request (r) into appropriate call (like Put) on
// the underlying applyV3 object.
func (a *uberApplier) dispatch(r *pb.InternalRaftRequest) *Result {
ctx := context.TODO()
op := "unknown"
ar := &Result{}
defer func(start time.Time) {
Expand All @@ -136,16 +134,16 @@ func (a *uberApplier) dispatch(r *pb.InternalRaftRequest) *Result {
switch {
case r.Range != nil:
op = "Range"
ar.Resp, ar.Trace, ar.Err = a.applyV3.Range(ctx, r.Range)
ar.Resp, ar.Trace, ar.Err = a.applyV3.Range(r.Range)
case r.Put != nil:
op = "Put"
ar.Resp, ar.Trace, ar.Err = a.applyV3.Put(ctx, r.Put)
ar.Resp, ar.Trace, ar.Err = a.applyV3.Put(r.Put)
case r.DeleteRange != nil:
op = "DeleteRange"
ar.Resp, ar.Trace, ar.Err = a.applyV3.DeleteRange(ctx, r.DeleteRange)
ar.Resp, ar.Trace, ar.Err = a.applyV3.DeleteRange(r.DeleteRange)
case r.Txn != nil:
op = "Txn"
ar.Resp, ar.Trace, ar.Err = a.applyV3.Txn(ctx, r.Txn)
ar.Resp, ar.Trace, ar.Err = a.applyV3.Txn(r.Txn)
case r.Compaction != nil:
op = "Compaction"
ar.Resp, ar.Physc, ar.Trace, ar.Err = a.applyV3.Compaction(r.Compaction)
Expand Down
Loading