Skip to content

Commit

Permalink
Remove context from mvcc public interface
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <[email protected]>
  • Loading branch information
serathius committed Oct 4, 2024
1 parent 47e3b92 commit 68739d5
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 93 deletions.
7 changes: 1 addition & 6 deletions server/etcdserver/api/v3rpc/validationfuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package v3rpc

import (
"context"
"testing"

"go.uber.org/zap/zaptest"
Expand Down Expand Up @@ -167,15 +166,11 @@ func execTransaction(t *testing.T, req *pb.RequestOp) {
s := mvcc.NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, mvcc.StoreConfig{})
defer s.Close()

// setup cancelled context
ctx, cancel := context.WithCancel(context.TODO())
cancel()

request := &pb.TxnRequest{
Success: []*pb.RequestOp{req},
}

_, _, err := txn.Txn(ctx, zaptest.NewLogger(t), request, false, s, &lease.FakeLessor{})
_, _, err := txn.Txn(zaptest.NewLogger(t), request, false, s, &lease.FakeLessor{})
if err != nil {
t.Skipf("Application erroring. %s", err.Error())
}
Expand Down
8 changes: 4 additions & 4 deletions server/etcdserver/apply/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,19 +151,19 @@ func (a *applierV3backend) Apply(r *pb.InternalRaftRequest, applyFunc applyFunc)
}

func (a *applierV3backend) Put(p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
return mvcctxn.Put(context.TODO(), a.lg, a.lessor, a.kv, p)
return mvcctxn.Put(a.lg, a.lessor, a.kv, p)
}

func (a *applierV3backend) DeleteRange(dr *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, *traceutil.Trace, error) {
return mvcctxn.DeleteRange(context.TODO(), a.lg, a.kv, dr)
return mvcctxn.DeleteRange(a.lg, a.kv, dr)
}

func (a *applierV3backend) Range(r *pb.RangeRequest) (*pb.RangeResponse, *traceutil.Trace, error) {
return mvcctxn.Range(context.TODO(), a.lg, a.kv, r)
return mvcctxn.Range(a.lg, a.kv, r)
}

func (a *applierV3backend) Txn(rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) {
return mvcctxn.Txn(context.TODO(), a.lg, rt, a.txnModeWriteWithSharedBuffer, a.kv, a.lessor)
return mvcctxn.Txn(a.lg, rt, a.txnModeWriteWithSharedBuffer, a.kv, a.lessor)
}

func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) {
Expand Down
12 changes: 8 additions & 4 deletions server/etcdserver/txn/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import (
"go.etcd.io/etcd/server/v3/storage/mvcc"
)

func Put(ctx context.Context, lg *zap.Logger, lessor lease.Lessor, kv mvcc.KV, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
func Put(lg *zap.Logger, lessor lease.Lessor, kv mvcc.KV, p *pb.PutRequest) (resp *pb.PutResponse, trace *traceutil.Trace, err error) {
ctx := context.Background()
trace = traceutil.Get(ctx)
// create put tracing if the trace in context is empty
if trace.IsEmpty() {
Expand Down Expand Up @@ -94,7 +95,8 @@ func put(ctx context.Context, txnWrite mvcc.TxnWrite, p *pb.PutRequest) (resp *p
return resp, nil
}

func DeleteRange(ctx context.Context, lg *zap.Logger, kv mvcc.KV, dr *pb.DeleteRangeRequest) (resp *pb.DeleteRangeResponse, trace *traceutil.Trace, err error) {
func DeleteRange(lg *zap.Logger, kv mvcc.KV, dr *pb.DeleteRangeRequest) (resp *pb.DeleteRangeResponse, trace *traceutil.Trace, err error) {
ctx := context.Background()
trace = traceutil.Get(ctx)
// create delete tracing if the trace in context is empty
if trace.IsEmpty() {
Expand Down Expand Up @@ -133,7 +135,8 @@ func deleteRange(ctx context.Context, txnWrite mvcc.TxnWrite, dr *pb.DeleteRange
return resp, nil
}

func Range(ctx context.Context, lg *zap.Logger, kv mvcc.KV, r *pb.RangeRequest) (resp *pb.RangeResponse, trace *traceutil.Trace, err error) {
func Range(lg *zap.Logger, kv mvcc.KV, r *pb.RangeRequest) (resp *pb.RangeResponse, trace *traceutil.Trace, err error) {
ctx := context.Background()
trace = traceutil.Get(ctx)
if trace.IsEmpty() {
trace = traceutil.New("range", lg)
Expand Down Expand Up @@ -249,7 +252,8 @@ func executeRange(ctx context.Context, lg *zap.Logger, txnRead mvcc.TxnRead, r *
return resp, nil
}

func Txn(ctx context.Context, lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWithSharedBuffer bool, kv mvcc.KV, lessor lease.Lessor) (*pb.TxnResponse, *traceutil.Trace, error) {
func Txn(lg *zap.Logger, rt *pb.TxnRequest, txnModeWriteWithSharedBuffer bool, kv mvcc.KV, lessor lease.Lessor) (*pb.TxnResponse, *traceutil.Trace, error) {
ctx := context.Background()
trace := traceutil.Get(ctx)
if trace.IsEmpty() {
trace = traceutil.New("transaction", lg)
Expand Down
80 changes: 3 additions & 77 deletions server/etcdserver/txn/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
package txn

import (
"context"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -223,9 +221,7 @@ func TestCheckTxn(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
s, lessor := setup(t, tc.setup)

ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
_, _, err := Txn(ctx, zaptest.NewLogger(t), tc.txn, false, s, lessor)
_, _, err := Txn(zaptest.NewLogger(t), tc.txn, false, s, lessor)

gotErr := ""
if err != nil {
Expand All @@ -243,9 +239,7 @@ func TestCheckPut(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
s, lessor := setup(t, tc.setup)

ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
_, _, err := Put(ctx, zaptest.NewLogger(t), lessor, s, tc.op.GetRequestPut())
_, _, err := Put(zaptest.NewLogger(t), lessor, s, tc.op.GetRequestPut())

gotErr := ""
if err != nil {
Expand All @@ -263,9 +257,7 @@ func TestCheckRange(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
s, _ := setup(t, tc.setup)

ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
_, _, err := Range(ctx, zaptest.NewLogger(t), s, tc.op.GetRequestRange())
_, _, err := Range(zaptest.NewLogger(t), s, tc.op.GetRequestRange())

gotErr := ""
if err != nil {
Expand Down Expand Up @@ -304,72 +296,6 @@ func setup(t *testing.T, setup testSetup) (mvcc.KV, lease.Lessor) {
return s, lessor
}

func TestReadonlyTxnError(t *testing.T) {
b, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, b)
s := mvcc.NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, mvcc.StoreConfig{})
defer s.Close()

// setup cancelled context
ctx, cancel := context.WithCancel(context.TODO())
cancel()

// put some data to prevent early termination in rangeKeys
// we are expecting failure on cancelled context check
s.Put([]byte("foo"), []byte("bar"), lease.NoLease)

txn := &pb.TxnRequest{
Success: []*pb.RequestOp{
{
Request: &pb.RequestOp_RequestRange{
RequestRange: &pb.RangeRequest{
Key: []byte("foo"),
},
},
},
},
}

_, _, err := Txn(ctx, zaptest.NewLogger(t), txn, false, s, &lease.FakeLessor{})
if err == nil || !strings.Contains(err.Error(), "applyTxn: failed Range: rangeKeys: context cancelled: context canceled") {
t.Fatalf("Expected context canceled error, got %v", err)
}
}

func TestWriteTxnPanic(t *testing.T) {
b, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, b)
s := mvcc.NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, mvcc.StoreConfig{})
defer s.Close()

// setup cancelled context
ctx, cancel := context.WithCancel(context.TODO())
cancel()

// write txn that puts some data and then fails in range due to cancelled context
txn := &pb.TxnRequest{
Success: []*pb.RequestOp{
{
Request: &pb.RequestOp_RequestPut{
RequestPut: &pb.PutRequest{
Key: []byte("foo"),
Value: []byte("bar"),
},
},
},
{
Request: &pb.RequestOp_RequestRange{
RequestRange: &pb.RangeRequest{
Key: []byte("foo"),
},
},
},
},
}

assert.Panics(t, func() { Txn(ctx, zaptest.NewLogger(t), txn, false, s, &lease.FakeLessor{}) }, "Expected panic in Txn with writes")
}

func TestCheckTxnAuth(t *testing.T) {
be, _ := betesting.NewDefaultTmpBackend(t)
defer betesting.Close(t, be)
Expand Down
4 changes: 2 additions & 2 deletions server/etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (s *EtcdServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeRe
return s.authStore.IsRangePermitted(ai, r.Key, r.RangeEnd)
}

get := func() { resp, _, err = txn.Range(ctx, s.Logger(), s.KV(), r) }
get := func() { resp, _, err = txn.Range(s.Logger(), s.KV(), r) }
if serr := s.doSerialize(ctx, chk, get); serr != nil {
err = serr
return nil, err
Expand Down Expand Up @@ -183,7 +183,7 @@ func (s *EtcdServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse
}(time.Now())

get := func() {
resp, _, err = txn.Txn(ctx, s.Logger(), r, s.Cfg.ExperimentalTxnModeWriteWithSharedBuffer, s.KV(), s.lessor)
resp, _, err = txn.Txn(s.Logger(), r, s.Cfg.ExperimentalTxnModeWriteWithSharedBuffer, s.KV(), s.lessor)
}
if serr := s.doSerialize(ctx, chk, get); serr != nil {
return nil, serr
Expand Down

0 comments on commit 68739d5

Please sign in to comment.