diff --git a/tests/robustness/client/client.go b/tests/robustness/client/client.go index 2a1e76ce030..531fbacba7c 100644 --- a/tests/robustness/client/client.go +++ b/tests/robustness/client/client.go @@ -46,6 +46,8 @@ type RecordingClient struct { kvOperations *model.AppendableHistory } +var _ clientv3.KV = (*RecordingClient)(nil) + type TimedWatchEvent struct { model.WatchEvent Time time.Duration @@ -81,15 +83,13 @@ func (c *RecordingClient) Report() report.ClientReport { } } -func (c *RecordingClient) Get(ctx context.Context, key string, revision int64) (kv *mvccpb.KeyValue, rev int64, err error) { - resp, err := c.Range(ctx, key, "", revision, 0) - if err != nil { - return nil, 0, err - } - if len(resp.Kvs) == 1 { - kv = resp.Kvs[0] - } - return kv, resp.Header.Revision, nil +func (c *RecordingClient) Do(ctx context.Context, op clientv3.Op) (clientv3.OpResponse, error) { + panic("not implemented") +} + +func (c *RecordingClient) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) { + revision := clientv3.OpGet(key, opts...).Rev() + return c.Range(ctx, key, "", revision, 0) } func (c *RecordingClient) Range(ctx context.Context, start, end string, revision, limit int64) (*clientv3.GetResponse, error) { @@ -112,7 +112,7 @@ func (c *RecordingClient) Range(ctx context.Context, start, end string, revision return resp, err } -func (c *RecordingClient) Put(ctx context.Context, key, value string) (*clientv3.PutResponse, error) { +func (c *RecordingClient) Put(ctx context.Context, key, value string, _ ...clientv3.OpOption) (*clientv3.PutResponse, error) { c.kvMux.Lock() defer c.kvMux.Unlock() callTime := time.Since(c.baseTime) @@ -122,7 +122,7 @@ func (c *RecordingClient) Put(ctx context.Context, key, value string) (*clientv3 return resp, err } -func (c *RecordingClient) Delete(ctx context.Context, key string) (*clientv3.DeleteResponse, error) { +func (c *RecordingClient) Delete(ctx context.Context, key string, _ ...clientv3.OpOption) (*clientv3.DeleteResponse, error) { c.kvMux.Lock() defer c.kvMux.Unlock() callTime := time.Since(c.baseTime) @@ -132,23 +132,48 @@ func (c *RecordingClient) Delete(ctx context.Context, key string) (*clientv3.Del return resp, err } -func (c *RecordingClient) Txn(ctx context.Context, conditions []clientv3.Cmp, onSuccess []clientv3.Op, onFailure []clientv3.Op) (*clientv3.TxnResponse, error) { - txn := c.client.Txn(ctx).If( - conditions..., - ).Then( - onSuccess..., - ).Else( - onFailure..., - ) - c.kvMux.Lock() - defer c.kvMux.Unlock() - callTime := time.Since(c.baseTime) - resp, err := txn.Commit() - returnTime := time.Since(c.baseTime) - c.kvOperations.AppendTxn(conditions, onSuccess, onFailure, callTime, returnTime, resp, err) +type wrappedTxn struct { + txn clientv3.Txn + conditions []clientv3.Cmp + onSuccess []clientv3.Op + onFailure []clientv3.Op + c *RecordingClient +} + +var _ clientv3.Txn = (*wrappedTxn)(nil) + +func (w *wrappedTxn) If(cs ...clientv3.Cmp) clientv3.Txn { + w.conditions = append(w.conditions, cs...) + w.txn = w.txn.If(cs...) + return w +} + +func (w *wrappedTxn) Then(ops ...clientv3.Op) clientv3.Txn { + w.onSuccess = append(w.onSuccess, ops...) + w.txn = w.txn.Then(ops...) + return w +} + +func (w *wrappedTxn) Else(ops ...clientv3.Op) clientv3.Txn { + w.onFailure = append(w.onFailure, ops...) + w.txn = w.txn.Else(ops...) + return w +} + +func (w *wrappedTxn) Commit() (*clientv3.TxnResponse, error) { + w.c.kvMux.Lock() + defer w.c.kvMux.Unlock() + callTime := time.Since(w.c.baseTime) + resp, err := w.txn.Commit() + returnTime := time.Since(w.c.baseTime) + w.c.kvOperations.AppendTxn(w.conditions, w.onSuccess, w.onFailure, callTime, returnTime, resp, err) return resp, err } +func (c *RecordingClient) Txn(ctx context.Context) clientv3.Txn { + return &wrappedTxn{txn: c.client.Txn(ctx), c: c} +} + func (c *RecordingClient) LeaseGrant(ctx context.Context, ttl int64) (*clientv3.LeaseGrantResponse, error) { c.kvMux.Lock() defer c.kvMux.Unlock() @@ -190,7 +215,7 @@ func (c *RecordingClient) Defragment(ctx context.Context) (*clientv3.DefragmentR return resp, err } -func (c *RecordingClient) Compact(ctx context.Context, rev int64) (*clientv3.CompactResponse, error) { +func (c *RecordingClient) Compact(ctx context.Context, rev int64, _ ...clientv3.CompactOption) (*clientv3.CompactResponse, error) { c.kvMux.Lock() defer c.kvMux.Unlock() callTime := time.Since(c.baseTime) diff --git a/tests/robustness/failpoint/trigger.go b/tests/robustness/failpoint/trigger.go index 55ef0614ea6..9ae9a808481 100644 --- a/tests/robustness/failpoint/trigger.go +++ b/tests/robustness/failpoint/trigger.go @@ -21,6 +21,7 @@ import ( "testing" "time" + clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/tests/v3/framework/e2e" "go.etcd.io/etcd/tests/v3/robustness/client" "go.etcd.io/etcd/tests/v3/robustness/identity" @@ -67,10 +68,12 @@ func (t triggerCompact) Trigger(ctx context.Context, _ *testing.T, member e2e.Et var rev int64 for { - _, rev, err = cc.Get(ctx, "/", 0) + var resp *clientv3.GetResponse + resp, err = cc.Get(ctx, "/", clientv3.WithRev(0)) if err != nil { return nil, fmt.Errorf("failed to get revision: %w", err) } + rev = resp.Header.Revision if !t.multiBatchCompaction || rev > int64(clus.Cfg.ServerConfig.ExperimentalCompactionBatchLimit) { break diff --git a/tests/robustness/traffic/etcd.go b/tests/robustness/traffic/etcd.go index 6210ee93d59..d8823ecd3e4 100644 --- a/tests/robustness/traffic/etcd.go +++ b/tests/robustness/traffic/etcd.go @@ -170,9 +170,17 @@ func (c etcdTrafficClient) Request(ctx context.Context, request etcdRequestType, var limit int64 switch request { case StaleGet: - _, rev, err = c.client.Get(opCtx, c.randomKey(), lastRev) + var resp *clientv3.GetResponse + resp, err = c.client.Get(opCtx, c.randomKey(), clientv3.WithRev(lastRev)) + if err == nil { + rev = resp.Header.Revision + } case Get: - _, rev, err = c.client.Get(opCtx, c.randomKey(), 0) + var resp *clientv3.GetResponse + resp, err = c.client.Get(opCtx, c.randomKey(), clientv3.WithRev(0)) + if err == nil { + rev = resp.Header.Revision + } case List: var resp *clientv3.GetResponse resp, err = c.client.Range(ctx, c.keyPrefix, clientv3.GetPrefixRangeEnd(c.keyPrefix), 0, limit) @@ -205,15 +213,22 @@ func (c etcdTrafficClient) Request(ctx context.Context, request etcdRequestType, } case MultiOpTxn: var resp *clientv3.TxnResponse - resp, err = c.client.Txn(opCtx, nil, c.pickMultiTxnOps(), nil) + resp, err = c.client.Txn(opCtx).Then( + c.pickMultiTxnOps()..., + ).Commit() if resp != nil { rev = resp.Header.Revision } case CompareAndSet: var kv *mvccpb.KeyValue key := c.randomKey() - kv, rev, err = c.client.Get(opCtx, key, 0) + var resp *clientv3.GetResponse + resp, err = c.client.Get(opCtx, key, clientv3.WithRev(0)) if err == nil { + rev = resp.Header.Revision + if len(resp.Kvs) == 1 { + kv = resp.Kvs[0] + } c.limiter.Wait(ctx) var expectedRevision int64 if kv != nil { @@ -221,7 +236,11 @@ func (c etcdTrafficClient) Request(ctx context.Context, request etcdRequestType, } txnCtx, txnCancel := context.WithTimeout(ctx, RequestTimeout) var resp *clientv3.TxnResponse - resp, err = c.client.Txn(txnCtx, []clientv3.Cmp{clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision)}, []clientv3.Op{clientv3.OpPut(key, fmt.Sprintf("%d", c.idProvider.NewRequestID()))}, nil) + resp, err = c.client.Txn(txnCtx).If( + clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision), + ).Then( + clientv3.OpPut(key, fmt.Sprintf("%d", c.idProvider.NewRequestID())), + ).Commit() txnCancel() if resp != nil { rev = resp.Header.Revision diff --git a/tests/robustness/traffic/kubernetes.go b/tests/robustness/traffic/kubernetes.go index 52dfe5cc8f5..aadeefc6f79 100644 --- a/tests/robustness/traffic/kubernetes.go +++ b/tests/robustness/traffic/kubernetes.go @@ -26,6 +26,7 @@ import ( "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/client/v3/kubernetes" "go.etcd.io/etcd/pkg/v3/stringutil" "go.etcd.io/etcd/tests/v3/robustness/client" "go.etcd.io/etcd/tests/v3/robustness/identity" @@ -56,7 +57,7 @@ func (t kubernetesTraffic) ExpectUniqueRevision() bool { } func (t kubernetesTraffic) Run(ctx context.Context, c *client.RecordingClient, limiter *rate.Limiter, ids identity.Provider, lm identity.LeaseIDStorage, nonUniqueWriteLimiter ConcurrencyLimiter, finish <-chan struct{}) { - kc := &kubernetesClient{client: c} + kc := kubernetes.Client{Client: &clientv3.Client{KV: c}} s := newStorage() keyPrefix := "/registry/" + t.resource + "/" g := errgroup.Group{} @@ -75,7 +76,7 @@ func (t kubernetesTraffic) Run(ctx context.Context, c *client.RecordingClient, l if err != nil { continue } - t.Watch(ctx, kc, s, limiter, keyPrefix, rev+1) + t.Watch(ctx, c, s, limiter, keyPrefix, rev+1) } }) g.Go(func() error { @@ -105,49 +106,47 @@ func (t kubernetesTraffic) Run(ctx context.Context, c *client.RecordingClient, l g.Wait() } -func (t kubernetesTraffic) Read(ctx context.Context, kc *kubernetesClient, s *storage, limiter *rate.Limiter, keyPrefix string, limit int) (rev int64, err error) { - rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix) - +func (t kubernetesTraffic) Read(ctx context.Context, kc kubernetes.Interface, s *storage, limiter *rate.Limiter, keyPrefix string, limit int) (rev int64, err error) { hasMore := true - rangeStart := keyPrefix var kvs []*mvccpb.KeyValue var revision int64 + var cont string for hasMore { readCtx, cancel := context.WithTimeout(ctx, RequestTimeout) - resp, err := kc.Range(readCtx, rangeStart, rangeEnd, revision, int64(limit)) + resp, err := kc.List(readCtx, keyPrefix, kubernetes.ListOptions{Continue: cont, Revision: revision, Limit: int64(limit)}) cancel() if err != nil { return 0, err } limiter.Wait(ctx) - hasMore = resp.More - if len(resp.Kvs) > 0 && hasMore { - rangeStart = string(resp.Kvs[len(resp.Kvs)-1].Key) + "\x00" - } kvs = append(kvs, resp.Kvs...) if revision == 0 { - revision = resp.Header.Revision + revision = resp.Revision + } + hasMore = resp.Count > int64(len(resp.Kvs)) + if hasMore { + cont = string(kvs[len(kvs)-1].Key) } } s.Reset(revision, kvs) return revision, nil } -func (t kubernetesTraffic) Write(ctx context.Context, kc *kubernetesClient, ids identity.Provider, s *storage, limiter *rate.Limiter, nonUniqueWriteLimiter ConcurrencyLimiter) (err error) { +func (t kubernetesTraffic) Write(ctx context.Context, kc kubernetes.Interface, ids identity.Provider, s *storage, limiter *rate.Limiter, nonUniqueWriteLimiter ConcurrencyLimiter) (err error) { writeCtx, cancel := context.WithTimeout(ctx, RequestTimeout) defer cancel() count := s.Count() if count < t.averageKeyCount/2 { - err = kc.OptimisticCreate(writeCtx, t.generateKey(), fmt.Sprintf("%d", ids.NewRequestID())) + _, err = kc.OptimisticPut(writeCtx, t.generateKey(), []byte(fmt.Sprintf("%d", ids.NewRequestID())), 0, kubernetes.PutOptions{}) } else { key, rev := s.PickRandom() if rev == 0 { return errors.New("storage empty") } if count > t.averageKeyCount*3/2 && nonUniqueWriteLimiter.Take() { - _, err = kc.OptimisticDelete(writeCtx, key, rev) + _, err = kc.OptimisticDelete(writeCtx, key, rev, kubernetes.DeleteOptions{GetOnFailure: true}) nonUniqueWriteLimiter.Return() } else { shouldReturn := false @@ -159,11 +158,11 @@ func (t kubernetesTraffic) Write(ctx context.Context, kc *kubernetesClient, ids op := random.PickRandom(choices) switch op { case KubernetesDelete: - _, err = kc.OptimisticDelete(writeCtx, key, rev) + _, err = kc.OptimisticDelete(writeCtx, key, rev, kubernetes.DeleteOptions{GetOnFailure: true}) case KubernetesUpdate: - _, err = kc.OptimisticUpdate(writeCtx, key, fmt.Sprintf("%d", ids.NewRequestID()), rev) + _, err = kc.OptimisticPut(writeCtx, key, []byte(fmt.Sprintf("%d", ids.NewRequestID())), rev, kubernetes.PutOptions{GetOnFailure: true}) case KubernetesCreate: - err = kc.OptimisticCreate(writeCtx, t.generateKey(), fmt.Sprintf("%d", ids.NewRequestID())) + _, err = kc.OptimisticPut(writeCtx, t.generateKey(), []byte(fmt.Sprintf("%d", ids.NewRequestID())), rev, kubernetes.PutOptions{}) default: panic(fmt.Sprintf("invalid choice: %q", op)) } @@ -188,7 +187,7 @@ func filterOutNonUniqueKubernetesWrites(choices []random.ChoiceWeight[Kubernetes return resp } -func (t kubernetesTraffic) Watch(ctx context.Context, kc *kubernetesClient, s *storage, limiter *rate.Limiter, keyPrefix string, revision int64) { +func (t kubernetesTraffic) Watch(ctx context.Context, c *client.RecordingClient, s *storage, limiter *rate.Limiter, keyPrefix string, revision int64) { watchCtx, cancel := context.WithTimeout(ctx, WatchTimeout) defer cancel() @@ -196,7 +195,7 @@ func (t kubernetesTraffic) Watch(ctx context.Context, kc *kubernetesClient, s *s // in the cluster: // https://github.com/kubernetes/kubernetes/blob/2016fab3085562b4132e6d3774b6ded5ba9939fd/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go#L872 watchCtx = clientv3.WithRequireLeader(watchCtx) - for e := range kc.client.Watch(watchCtx, keyPrefix, revision, true, true, true) { + for e := range c.Watch(watchCtx, keyPrefix, revision, true, true, true) { s.Update(e) } limiter.Wait(ctx) @@ -214,61 +213,6 @@ const ( KubernetesCreate KubernetesRequestType = "create" ) -type kubernetesClient struct { - client *client.RecordingClient -} - -func (k kubernetesClient) List(ctx context.Context, prefix string, revision, limit int64) (*clientv3.GetResponse, error) { - resp, err := k.client.Range(ctx, prefix, clientv3.GetPrefixRangeEnd(prefix), revision, limit) - if err != nil { - return nil, err - } - return resp, err -} - -func (k kubernetesClient) Range(ctx context.Context, start, end string, revision, limit int64) (*clientv3.GetResponse, error) { - return k.client.Range(ctx, start, end, revision, limit) -} - -func (k kubernetesClient) OptimisticDelete(ctx context.Context, key string, expectedRevision int64) (*mvccpb.KeyValue, error) { - return k.optimisticOperationOrGet(ctx, key, clientv3.OpDelete(key), expectedRevision) -} - -func (k kubernetesClient) OptimisticUpdate(ctx context.Context, key, value string, expectedRevision int64) (*mvccpb.KeyValue, error) { - return k.optimisticOperationOrGet(ctx, key, clientv3.OpPut(key, value), expectedRevision) -} - -func (k kubernetesClient) OptimisticCreate(ctx context.Context, key, value string) error { - _, err := k.client.Txn(ctx, []clientv3.Cmp{clientv3.Compare(clientv3.ModRevision(key), "=", 0)}, []clientv3.Op{clientv3.OpPut(key, value)}, nil) - return err -} - -func (k kubernetesClient) RequestProgress(ctx context.Context) error { - // Kubernetes makes RequestProgress calls by requiring a leader to be - // present in the cluster: - // https://github.com/kubernetes/kubernetes/blob/2016fab3085562b4132e6d3774b6ded5ba9939fd/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go#L87 - return k.client.RequestProgress(clientv3.WithRequireLeader(ctx)) -} - -// Kubernetes optimistically assumes that key didn't change since it was last observed, so it executes operations within a transaction conditioned on key not changing. -// However, if the keys value changed it wants imminently to read it, thus the Get operation on failure. -func (k kubernetesClient) optimisticOperationOrGet(ctx context.Context, key string, operation clientv3.Op, expectedRevision int64) (*mvccpb.KeyValue, error) { - resp, err := k.client.Txn(ctx, []clientv3.Cmp{clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision)}, []clientv3.Op{operation}, []clientv3.Op{clientv3.OpGet(key)}) - if err != nil { - return nil, err - } - if !resp.Succeeded { - getResp := (*clientv3.GetResponse)(resp.Responses[0].GetResponseRange()) - if err != nil || len(getResp.Kvs) == 0 { - return nil, err - } - if len(getResp.Kvs) == 1 { - return getResp.Kvs[0], err - } - } - return nil, err -} - type storage struct { mux sync.RWMutex keyRevision map[string]int64