From 7d7b0c4d5b47e037f2d373d17ac75a316785292e Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Mon, 31 Jul 2023 11:09:37 +0200 Subject: [PATCH] Introduce Kubernetes interface to etcd client Signed-off-by: Marek Siarkowicz --- client/v3/client.go | 2 + client/v3/kubernetes.go | 201 ++++++++++++++++++++++++++++++++++++++++ client/v3/watch.go | 35 ++++--- 3 files changed, 226 insertions(+), 12 deletions(-) create mode 100644 client/v3/kubernetes.go diff --git a/client/v3/client.go b/client/v3/client.go index 8a2225b2277c..e74d7d9082ff 100644 --- a/client/v3/client.go +++ b/client/v3/client.go @@ -49,6 +49,7 @@ type Client struct { Watcher Auth Maintenance + Kubernetes Kubernetes conn *grpc.ClientConn @@ -439,6 +440,7 @@ func newClient(cfg *Config) (*Client, error) { client.KV = NewKV(client) client.Lease = NewLease(client) client.Watcher = NewWatcher(client) + client.Kubernetes = NewKubernetes(client) client.Auth = NewAuth(client) client.Maintenance = NewMaintenance(client) diff --git a/client/v3/kubernetes.go b/client/v3/kubernetes.go new file mode 100644 index 000000000000..99eecc264951 --- /dev/null +++ b/client/v3/kubernetes.go @@ -0,0 +1,201 @@ +package clientv3 + +import ( + "context" + + pb "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/api/v3/mvccpb" +) + +func NewKubernetes(c *Client) Kubernetes { + return &kubernetes{kv: RetryKVClient(c), watcher: newWatchFromWatchClient(pb.NewWatchClient(c.conn), c)} +} + +type Kubernetes interface { + Get(ctx context.Context, key string, opts GetOptions) (KubernetesGetResponse, error) + List(ctx context.Context, prefix string, opts ListOptions) (KubernetesListResponse, error) + Count(ctx context.Context, prefix string) (int64, error) + OptimisticPut(ctx context.Context, key string, value []byte, opts PutOptions) (KubernetesPutResponse, error) + OptimisticDelete(ctx context.Context, key string, opts DeleteOptions) (KubernetesDeleteResponse, error) + Watch(ctx context.Context, key string, opts WatchOptions) (WatchID, WatchChan) + RequestProgress(ctx context.Context, id WatchID) error +} + +type WatchID string + +type GetOptions struct { + Revision int64 +} + +type ListOptions struct { + Revision int64 + Limit int64 + Continue string +} + +type WatchOptions struct { + ID WatchID + Revision int64 + Prefix bool + ProgressNotify bool +} + +type PutOptions struct { + ExpectedRevision int64 + LeaseID LeaseID +} + +type DeleteOptions struct { + ExpectedRevision int64 +} + +type KubernetesGetResponse struct { + KV *mvccpb.KeyValue + Revision int64 +} + +type KubernetesListResponse struct { + KVs []*mvccpb.KeyValue + Count int64 + Revision int64 +} + +type KubernetesPutResponse struct { + KV *mvccpb.KeyValue + Succeeded bool + Revision int64 +} + +type KubernetesDeleteResponse struct { + KV *mvccpb.KeyValue + Succeeded bool + Revision int64 +} + +type kubernetes struct { + kv pb.KVClient + watcher *watcher +} + +func (k kubernetes) Get(ctx context.Context, key string, opts GetOptions) (resp KubernetesGetResponse, err error) { + rangeResp, err := k.kv.Range(ctx, &pb.RangeRequest{ + Key: []byte(key), + Revision: opts.Revision, + Limit: 1, + }) + if err != nil { + return resp, toErr(ctx, err) + } + resp.Revision = rangeResp.Header.Revision + if len(rangeResp.Kvs) == 1 { + resp.KV = rangeResp.Kvs[0] + } + return resp, nil +} + +func (k kubernetes) List(ctx context.Context, prefix string, opts ListOptions) (resp KubernetesListResponse, err error) { + rangeStart := prefix + opts.Continue + rangeEnd := GetPrefixRangeEnd(prefix) + + rangeResp, err := k.kv.Range(ctx, &pb.RangeRequest{ + Key: []byte(rangeStart), + RangeEnd: []byte(rangeEnd), + Limit: opts.Limit, + Revision: opts.Revision, + }) + if err != nil { + return resp, toErr(ctx, err) + } + resp.KVs = rangeResp.Kvs + resp.Count = rangeResp.Count + resp.Revision = rangeResp.Header.Revision + return resp, nil +} + +func (k kubernetes) Count(ctx context.Context, prefix string) (int64, error) { + resp, err := k.kv.Range(ctx, &pb.RangeRequest{ + Key: []byte(prefix), + RangeEnd: []byte(GetPrefixRangeEnd(prefix)), + CountOnly: true, + }) + if err != nil { + return 0, toErr(ctx, err) + } + return resp.Count, nil +} + +func (k kubernetes) OptimisticPut(ctx context.Context, key string, value []byte, opts PutOptions) (resp KubernetesPutResponse, err error) { + put := &pb.RequestOp{Request: &pb.RequestOp_RequestPut{&pb.PutRequest{Key: []byte(key), Value: value, Lease: int64(opts.LeaseID)}}} + + txnResp, err := k.optimisticTxn(ctx, key, opts.ExpectedRevision, put) + if err != nil { + return resp, toErr(ctx, err) + } + resp.Succeeded = txnResp.Succeeded + resp.Revision = txnResp.Header.Revision + if !txnResp.Succeeded { + resp.KV = kvFromTxnResponse(txnResp.Responses[0]) + } + return resp, nil +} + +func (k kubernetes) OptimisticDelete(ctx context.Context, key string, opts DeleteOptions) (resp KubernetesDeleteResponse, err error) { + del := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{&pb.DeleteRangeRequest{Key: []byte(key)}}} + + txnResp, err := k.optimisticTxn(ctx, key, opts.ExpectedRevision, del) + if err != nil { + return resp, toErr(ctx, err) + } + resp.Succeeded = txnResp.Succeeded + resp.Revision = txnResp.Header.Revision + if !txnResp.Succeeded { + resp.KV = kvFromTxnResponse(txnResp.Responses[0]) + } + return resp, nil +} + +func (k kubernetes) optimisticTxn(ctx context.Context, key string, expectRevision int64, onSuccess *pb.RequestOp) (*pb.TxnResponse, error) { + txn := &pb.TxnRequest{ + Compare: []*pb.Compare{ + { + Key: []byte(key), + TargetUnion: &pb.Compare_ModRevision{ModRevision: expectRevision}, + Result: pb.Compare_EQUAL, + }, + }, + Success: []*pb.RequestOp{onSuccess}, + Failure: []*pb.RequestOp{ + {Request: &pb.RequestOp_RequestRange{RequestRange: &pb.RangeRequest{Key: []byte(key), Limit: 1}}}, + }, + } + return k.kv.Txn(ctx, txn) +} + +func kvFromTxnResponse(resp *pb.ResponseOp) *mvccpb.KeyValue { + getResponse := resp.GetResponseRange() + if len(getResponse.Kvs) == 1 { + return getResponse.Kvs[0] + } + return nil +} + +func (k kubernetes) Watch(ctx context.Context, key string, opts WatchOptions) (WatchID, WatchChan) { + if opts.ID == "" { + opts.ID = WatchID(streamKeyFromCtx(ctx)) + } + ops := []OpOption{ + WithRev(opts.Revision), WithPrevKV(), + } + + if opts.Prefix { + ops = append(ops, WithPrefix()) + } + if opts.ProgressNotify { + ops = append(ops, WithProgressNotify()) + } + return opts.ID, k.watcher.watch(ctx, string(opts.ID), key, ops...) +} + +func (k kubernetes) RequestProgress(ctx context.Context, id WatchID) error { + return k.watcher.requestProgress(ctx, string(id)) +} diff --git a/client/v3/watch.go b/client/v3/watch.go index 41a6ec976333..2161412121a1 100644 --- a/client/v3/watch.go +++ b/client/v3/watch.go @@ -162,8 +162,8 @@ type watchGrpcStream struct { // ctx controls internal remote.Watch requests ctx context.Context // ctxKey is the key used when looking up this stream's context - ctxKey string - cancel context.CancelFunc + streamKey string + cancel context.CancelFunc // substreams holds all active watchers on this grpc stream substreams map[int64]*watcherStream @@ -249,6 +249,10 @@ func NewWatcher(c *Client) Watcher { } func NewWatchFromWatchClient(wc pb.WatchClient, c *Client) Watcher { + return newWatchFromWatchClient(wc, c) +} + +func newWatchFromWatchClient(wc pb.WatchClient, c *Client) *watcher { w := &watcher{ remote: wc, streams: make(map[string]*watchGrpcStream), @@ -271,14 +275,14 @@ func (vc *valCtx) Deadline() (time.Time, bool) { return zeroTime, false } func (vc *valCtx) Done() <-chan struct{} { return valCtxCh } func (vc *valCtx) Err() error { return nil } -func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream { +func (w *watcher) newWatcherGrpcStream(inctx context.Context, streamKey string) *watchGrpcStream { ctx, cancel := context.WithCancel(&valCtx{inctx}) wgs := &watchGrpcStream{ owner: w, remote: w.remote, callOpts: w.callOpts, ctx: ctx, - ctxKey: streamKeyFromCtx(inctx), + streamKey: streamKey, cancel: cancel, substreams: make(map[int64]*watcherStream), respc: make(chan *pb.WatchResponse), @@ -295,6 +299,11 @@ func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream { // Watch posts a watch request to run() and waits for a new watcher channel func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan { + ctxKey := streamKeyFromCtx(ctx) + return w.watch(ctx, ctxKey, key, opts...) +} + +func (w *watcher) watch(ctx context.Context, streamKey, key string, opts ...OpOption) WatchChan { ow := opWatch(key, opts...) var filters []pb.WatchCreateRequest_FilterType @@ -319,7 +328,6 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch } ok := false - ctxKey := streamKeyFromCtx(ctx) var closeCh chan WatchResponse for { @@ -332,10 +340,10 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch close(ch) return ch } - wgs := w.streams[ctxKey] + wgs := w.streams[streamKey] if wgs == nil { - wgs = w.newWatcherGrpcStream(ctx) - w.streams[ctxKey] = wgs + wgs = w.newWatcherGrpcStream(ctx, streamKey) + w.streams[streamKey] = wgs } donec := wgs.donec reqc := wgs.reqc @@ -404,16 +412,19 @@ func (w *watcher) Close() (err error) { // RequestProgress requests a progress notify response be sent in all watch channels. func (w *watcher) RequestProgress(ctx context.Context) (err error) { ctxKey := streamKeyFromCtx(ctx) + return w.requestProgress(ctx, ctxKey) +} +func (w *watcher) requestProgress(ctx context.Context, streamKey string) (err error) { w.mu.Lock() if w.streams == nil { w.mu.Unlock() return fmt.Errorf("no stream found for context") } - wgs := w.streams[ctxKey] + wgs := w.streams[streamKey] if wgs == nil { - wgs = w.newWatcherGrpcStream(ctx) - w.streams[ctxKey] = wgs + wgs = w.newWatcherGrpcStream(ctx, streamKey) + w.streams[streamKey] = wgs } donec := wgs.donec reqc := wgs.reqc @@ -450,7 +461,7 @@ func (w *watcher) closeStream(wgs *watchGrpcStream) { close(wgs.donec) wgs.cancel() if w.streams != nil { - delete(w.streams, wgs.ctxKey) + delete(w.streams, wgs.streamKey) } w.mu.Unlock() }