Skip to content

Commit

Permalink
Introduce Kubernetes interface to etcd client
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <[email protected]>
  • Loading branch information
serathius committed May 31, 2024
1 parent c9063a0 commit 7d7b0c4
Show file tree
Hide file tree
Showing 3 changed files with 226 additions and 12 deletions.
2 changes: 2 additions & 0 deletions client/v3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type Client struct {
Watcher
Auth
Maintenance
Kubernetes Kubernetes

conn *grpc.ClientConn

Expand Down Expand Up @@ -439,6 +440,7 @@ func newClient(cfg *Config) (*Client, error) {
client.KV = NewKV(client)
client.Lease = NewLease(client)
client.Watcher = NewWatcher(client)
client.Kubernetes = NewKubernetes(client)
client.Auth = NewAuth(client)
client.Maintenance = NewMaintenance(client)

Expand Down
201 changes: 201 additions & 0 deletions client/v3/kubernetes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
package clientv3

import (
"context"

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/mvccpb"
)

func NewKubernetes(c *Client) Kubernetes {
return &kubernetes{kv: RetryKVClient(c), watcher: newWatchFromWatchClient(pb.NewWatchClient(c.conn), c)}
}

type Kubernetes interface {
Get(ctx context.Context, key string, opts GetOptions) (KubernetesGetResponse, error)
List(ctx context.Context, prefix string, opts ListOptions) (KubernetesListResponse, error)
Count(ctx context.Context, prefix string) (int64, error)
OptimisticPut(ctx context.Context, key string, value []byte, opts PutOptions) (KubernetesPutResponse, error)
OptimisticDelete(ctx context.Context, key string, opts DeleteOptions) (KubernetesDeleteResponse, error)
Watch(ctx context.Context, key string, opts WatchOptions) (WatchID, WatchChan)
RequestProgress(ctx context.Context, id WatchID) error
}

type WatchID string

type GetOptions struct {
Revision int64
}

type ListOptions struct {
Revision int64
Limit int64
Continue string
}

type WatchOptions struct {
ID WatchID
Revision int64
Prefix bool
ProgressNotify bool
}

type PutOptions struct {
ExpectedRevision int64
LeaseID LeaseID
}

type DeleteOptions struct {
ExpectedRevision int64
}

type KubernetesGetResponse struct {
KV *mvccpb.KeyValue
Revision int64
}

type KubernetesListResponse struct {
KVs []*mvccpb.KeyValue
Count int64
Revision int64
}

type KubernetesPutResponse struct {
KV *mvccpb.KeyValue
Succeeded bool
Revision int64
}

type KubernetesDeleteResponse struct {
KV *mvccpb.KeyValue
Succeeded bool
Revision int64
}

type kubernetes struct {
kv pb.KVClient
watcher *watcher
}

func (k kubernetes) Get(ctx context.Context, key string, opts GetOptions) (resp KubernetesGetResponse, err error) {
rangeResp, err := k.kv.Range(ctx, &pb.RangeRequest{
Key: []byte(key),
Revision: opts.Revision,
Limit: 1,
})
if err != nil {
return resp, toErr(ctx, err)
}
resp.Revision = rangeResp.Header.Revision
if len(rangeResp.Kvs) == 1 {
resp.KV = rangeResp.Kvs[0]
}
return resp, nil
}

func (k kubernetes) List(ctx context.Context, prefix string, opts ListOptions) (resp KubernetesListResponse, err error) {
rangeStart := prefix + opts.Continue
rangeEnd := GetPrefixRangeEnd(prefix)

rangeResp, err := k.kv.Range(ctx, &pb.RangeRequest{
Key: []byte(rangeStart),
RangeEnd: []byte(rangeEnd),
Limit: opts.Limit,
Revision: opts.Revision,
})
if err != nil {
return resp, toErr(ctx, err)
}
resp.KVs = rangeResp.Kvs
resp.Count = rangeResp.Count
resp.Revision = rangeResp.Header.Revision
return resp, nil
}

func (k kubernetes) Count(ctx context.Context, prefix string) (int64, error) {
resp, err := k.kv.Range(ctx, &pb.RangeRequest{
Key: []byte(prefix),
RangeEnd: []byte(GetPrefixRangeEnd(prefix)),
CountOnly: true,
})
if err != nil {
return 0, toErr(ctx, err)
}
return resp.Count, nil
}

func (k kubernetes) OptimisticPut(ctx context.Context, key string, value []byte, opts PutOptions) (resp KubernetesPutResponse, err error) {
put := &pb.RequestOp{Request: &pb.RequestOp_RequestPut{&pb.PutRequest{Key: []byte(key), Value: value, Lease: int64(opts.LeaseID)}}}

txnResp, err := k.optimisticTxn(ctx, key, opts.ExpectedRevision, put)
if err != nil {
return resp, toErr(ctx, err)
}
resp.Succeeded = txnResp.Succeeded
resp.Revision = txnResp.Header.Revision
if !txnResp.Succeeded {
resp.KV = kvFromTxnResponse(txnResp.Responses[0])
}
return resp, nil
}

func (k kubernetes) OptimisticDelete(ctx context.Context, key string, opts DeleteOptions) (resp KubernetesDeleteResponse, err error) {
del := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{&pb.DeleteRangeRequest{Key: []byte(key)}}}

txnResp, err := k.optimisticTxn(ctx, key, opts.ExpectedRevision, del)
if err != nil {
return resp, toErr(ctx, err)
}
resp.Succeeded = txnResp.Succeeded
resp.Revision = txnResp.Header.Revision
if !txnResp.Succeeded {
resp.KV = kvFromTxnResponse(txnResp.Responses[0])
}
return resp, nil
}

func (k kubernetes) optimisticTxn(ctx context.Context, key string, expectRevision int64, onSuccess *pb.RequestOp) (*pb.TxnResponse, error) {
txn := &pb.TxnRequest{
Compare: []*pb.Compare{
{
Key: []byte(key),
TargetUnion: &pb.Compare_ModRevision{ModRevision: expectRevision},
Result: pb.Compare_EQUAL,
},
},
Success: []*pb.RequestOp{onSuccess},
Failure: []*pb.RequestOp{
{Request: &pb.RequestOp_RequestRange{RequestRange: &pb.RangeRequest{Key: []byte(key), Limit: 1}}},
},
}
return k.kv.Txn(ctx, txn)
}

func kvFromTxnResponse(resp *pb.ResponseOp) *mvccpb.KeyValue {
getResponse := resp.GetResponseRange()
if len(getResponse.Kvs) == 1 {
return getResponse.Kvs[0]
}
return nil
}

func (k kubernetes) Watch(ctx context.Context, key string, opts WatchOptions) (WatchID, WatchChan) {
if opts.ID == "" {
opts.ID = WatchID(streamKeyFromCtx(ctx))
}
ops := []OpOption{
WithRev(opts.Revision), WithPrevKV(),
}

if opts.Prefix {
ops = append(ops, WithPrefix())
}
if opts.ProgressNotify {
ops = append(ops, WithProgressNotify())
}
return opts.ID, k.watcher.watch(ctx, string(opts.ID), key, ops...)
}

func (k kubernetes) RequestProgress(ctx context.Context, id WatchID) error {
return k.watcher.requestProgress(ctx, string(id))
}
35 changes: 23 additions & 12 deletions client/v3/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ type watchGrpcStream struct {
// ctx controls internal remote.Watch requests
ctx context.Context
// ctxKey is the key used when looking up this stream's context
ctxKey string
cancel context.CancelFunc
streamKey string
cancel context.CancelFunc

// substreams holds all active watchers on this grpc stream
substreams map[int64]*watcherStream
Expand Down Expand Up @@ -249,6 +249,10 @@ func NewWatcher(c *Client) Watcher {
}

func NewWatchFromWatchClient(wc pb.WatchClient, c *Client) Watcher {
return newWatchFromWatchClient(wc, c)
}

func newWatchFromWatchClient(wc pb.WatchClient, c *Client) *watcher {
w := &watcher{
remote: wc,
streams: make(map[string]*watchGrpcStream),
Expand All @@ -271,14 +275,14 @@ func (vc *valCtx) Deadline() (time.Time, bool) { return zeroTime, false }
func (vc *valCtx) Done() <-chan struct{} { return valCtxCh }
func (vc *valCtx) Err() error { return nil }

func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
func (w *watcher) newWatcherGrpcStream(inctx context.Context, streamKey string) *watchGrpcStream {
ctx, cancel := context.WithCancel(&valCtx{inctx})
wgs := &watchGrpcStream{
owner: w,
remote: w.remote,
callOpts: w.callOpts,
ctx: ctx,
ctxKey: streamKeyFromCtx(inctx),
streamKey: streamKey,
cancel: cancel,
substreams: make(map[int64]*watcherStream),
respc: make(chan *pb.WatchResponse),
Expand All @@ -295,6 +299,11 @@ func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {

// Watch posts a watch request to run() and waits for a new watcher channel
func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {
ctxKey := streamKeyFromCtx(ctx)
return w.watch(ctx, ctxKey, key, opts...)
}

func (w *watcher) watch(ctx context.Context, streamKey, key string, opts ...OpOption) WatchChan {
ow := opWatch(key, opts...)

var filters []pb.WatchCreateRequest_FilterType
Expand All @@ -319,7 +328,6 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
}

ok := false
ctxKey := streamKeyFromCtx(ctx)

var closeCh chan WatchResponse
for {
Expand All @@ -332,10 +340,10 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
close(ch)
return ch
}
wgs := w.streams[ctxKey]
wgs := w.streams[streamKey]
if wgs == nil {
wgs = w.newWatcherGrpcStream(ctx)
w.streams[ctxKey] = wgs
wgs = w.newWatcherGrpcStream(ctx, streamKey)
w.streams[streamKey] = wgs
}
donec := wgs.donec
reqc := wgs.reqc
Expand Down Expand Up @@ -404,16 +412,19 @@ func (w *watcher) Close() (err error) {
// RequestProgress requests a progress notify response be sent in all watch channels.
func (w *watcher) RequestProgress(ctx context.Context) (err error) {
ctxKey := streamKeyFromCtx(ctx)
return w.requestProgress(ctx, ctxKey)
}

func (w *watcher) requestProgress(ctx context.Context, streamKey string) (err error) {
w.mu.Lock()
if w.streams == nil {
w.mu.Unlock()
return fmt.Errorf("no stream found for context")
}
wgs := w.streams[ctxKey]
wgs := w.streams[streamKey]
if wgs == nil {
wgs = w.newWatcherGrpcStream(ctx)
w.streams[ctxKey] = wgs
wgs = w.newWatcherGrpcStream(ctx, streamKey)
w.streams[streamKey] = wgs
}
donec := wgs.donec
reqc := wgs.reqc
Expand Down Expand Up @@ -450,7 +461,7 @@ func (w *watcher) closeStream(wgs *watchGrpcStream) {
close(wgs.donec)
wgs.cancel()
if w.streams != nil {
delete(w.streams, wgs.ctxKey)
delete(w.streams, wgs.streamKey)
}
w.mu.Unlock()
}
Expand Down

0 comments on commit 7d7b0c4

Please sign in to comment.