Skip to content

Commit

Permalink
Fix passing default grpc call options in Kubernetes client
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <[email protected]>
  • Loading branch information
serathius committed Jul 24, 2024
1 parent f2ea60d commit 6356463
Show file tree
Hide file tree
Showing 9 changed files with 82 additions and 119 deletions.
36 changes: 18 additions & 18 deletions client/v3/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,67 +134,67 @@ func NewAuthFromAuthClient(remote pb.AuthClient, c *Client) Auth {

func (auth *authClient) Authenticate(ctx context.Context, name string, password string) (*AuthenticateResponse, error) {
resp, err := auth.remote.Authenticate(ctx, &pb.AuthenticateRequest{Name: name, Password: password}, auth.callOpts...)
return (*AuthenticateResponse)(resp), ContextError(ctx, err)
return (*AuthenticateResponse)(resp), toErr(ctx, err)
}

func (auth *authClient) AuthEnable(ctx context.Context) (*AuthEnableResponse, error) {
resp, err := auth.remote.AuthEnable(ctx, &pb.AuthEnableRequest{}, auth.callOpts...)
return (*AuthEnableResponse)(resp), ContextError(ctx, err)
return (*AuthEnableResponse)(resp), toErr(ctx, err)
}

func (auth *authClient) AuthDisable(ctx context.Context) (*AuthDisableResponse, error) {
resp, err := auth.remote.AuthDisable(ctx, &pb.AuthDisableRequest{}, auth.callOpts...)
return (*AuthDisableResponse)(resp), ContextError(ctx, err)
return (*AuthDisableResponse)(resp), toErr(ctx, err)
}

func (auth *authClient) AuthStatus(ctx context.Context) (*AuthStatusResponse, error) {
resp, err := auth.remote.AuthStatus(ctx, &pb.AuthStatusRequest{}, auth.callOpts...)
return (*AuthStatusResponse)(resp), ContextError(ctx, err)
return (*AuthStatusResponse)(resp), toErr(ctx, err)
}

func (auth *authClient) UserAdd(ctx context.Context, name string, password string) (*AuthUserAddResponse, error) {
resp, err := auth.remote.UserAdd(ctx, &pb.AuthUserAddRequest{Name: name, Password: password, Options: &authpb.UserAddOptions{NoPassword: false}}, auth.callOpts...)
return (*AuthUserAddResponse)(resp), ContextError(ctx, err)
return (*AuthUserAddResponse)(resp), toErr(ctx, err)
}

func (auth *authClient) UserAddWithOptions(ctx context.Context, name string, password string, options *UserAddOptions) (*AuthUserAddResponse, error) {
resp, err := auth.remote.UserAdd(ctx, &pb.AuthUserAddRequest{Name: name, Password: password, Options: (*authpb.UserAddOptions)(options)}, auth.callOpts...)
return (*AuthUserAddResponse)(resp), ContextError(ctx, err)
return (*AuthUserAddResponse)(resp), toErr(ctx, err)
}

func (auth *authClient) UserDelete(ctx context.Context, name string) (*AuthUserDeleteResponse, error) {
resp, err := auth.remote.UserDelete(ctx, &pb.AuthUserDeleteRequest{Name: name}, auth.callOpts...)
return (*AuthUserDeleteResponse)(resp), ContextError(ctx, err)
return (*AuthUserDeleteResponse)(resp), toErr(ctx, err)
}

func (auth *authClient) UserChangePassword(ctx context.Context, name string, password string) (*AuthUserChangePasswordResponse, error) {
resp, err := auth.remote.UserChangePassword(ctx, &pb.AuthUserChangePasswordRequest{Name: name, Password: password}, auth.callOpts...)
return (*AuthUserChangePasswordResponse)(resp), ContextError(ctx, err)
return (*AuthUserChangePasswordResponse)(resp), toErr(ctx, err)
}

func (auth *authClient) UserGrantRole(ctx context.Context, user string, role string) (*AuthUserGrantRoleResponse, error) {
resp, err := auth.remote.UserGrantRole(ctx, &pb.AuthUserGrantRoleRequest{User: user, Role: role}, auth.callOpts...)
return (*AuthUserGrantRoleResponse)(resp), ContextError(ctx, err)
return (*AuthUserGrantRoleResponse)(resp), toErr(ctx, err)
}

func (auth *authClient) UserGet(ctx context.Context, name string) (*AuthUserGetResponse, error) {
resp, err := auth.remote.UserGet(ctx, &pb.AuthUserGetRequest{Name: name}, auth.callOpts...)
return (*AuthUserGetResponse)(resp), ContextError(ctx, err)
return (*AuthUserGetResponse)(resp), toErr(ctx, err)
}

func (auth *authClient) UserList(ctx context.Context) (*AuthUserListResponse, error) {
resp, err := auth.remote.UserList(ctx, &pb.AuthUserListRequest{}, auth.callOpts...)
return (*AuthUserListResponse)(resp), ContextError(ctx, err)
return (*AuthUserListResponse)(resp), toErr(ctx, err)
}

func (auth *authClient) UserRevokeRole(ctx context.Context, name string, role string) (*AuthUserRevokeRoleResponse, error) {
resp, err := auth.remote.UserRevokeRole(ctx, &pb.AuthUserRevokeRoleRequest{Name: name, Role: role}, auth.callOpts...)
return (*AuthUserRevokeRoleResponse)(resp), ContextError(ctx, err)
return (*AuthUserRevokeRoleResponse)(resp), toErr(ctx, err)
}

func (auth *authClient) RoleAdd(ctx context.Context, name string) (*AuthRoleAddResponse, error) {
resp, err := auth.remote.RoleAdd(ctx, &pb.AuthRoleAddRequest{Name: name}, auth.callOpts...)
return (*AuthRoleAddResponse)(resp), ContextError(ctx, err)
return (*AuthRoleAddResponse)(resp), toErr(ctx, err)
}

func (auth *authClient) RoleGrantPermission(ctx context.Context, name string, key, rangeEnd string, permType PermissionType) (*AuthRoleGrantPermissionResponse, error) {
Expand All @@ -204,27 +204,27 @@ func (auth *authClient) RoleGrantPermission(ctx context.Context, name string, ke
PermType: authpb.Permission_Type(permType),
}
resp, err := auth.remote.RoleGrantPermission(ctx, &pb.AuthRoleGrantPermissionRequest{Name: name, Perm: perm}, auth.callOpts...)
return (*AuthRoleGrantPermissionResponse)(resp), ContextError(ctx, err)
return (*AuthRoleGrantPermissionResponse)(resp), toErr(ctx, err)
}

func (auth *authClient) RoleGet(ctx context.Context, role string) (*AuthRoleGetResponse, error) {
resp, err := auth.remote.RoleGet(ctx, &pb.AuthRoleGetRequest{Role: role}, auth.callOpts...)
return (*AuthRoleGetResponse)(resp), ContextError(ctx, err)
return (*AuthRoleGetResponse)(resp), toErr(ctx, err)
}

func (auth *authClient) RoleList(ctx context.Context) (*AuthRoleListResponse, error) {
resp, err := auth.remote.RoleList(ctx, &pb.AuthRoleListRequest{}, auth.callOpts...)
return (*AuthRoleListResponse)(resp), ContextError(ctx, err)
return (*AuthRoleListResponse)(resp), toErr(ctx, err)
}

func (auth *authClient) RoleRevokePermission(ctx context.Context, role string, key, rangeEnd string) (*AuthRoleRevokePermissionResponse, error) {
resp, err := auth.remote.RoleRevokePermission(ctx, &pb.AuthRoleRevokePermissionRequest{Role: role, Key: []byte(key), RangeEnd: []byte(rangeEnd)}, auth.callOpts...)
return (*AuthRoleRevokePermissionResponse)(resp), ContextError(ctx, err)
return (*AuthRoleRevokePermissionResponse)(resp), toErr(ctx, err)
}

func (auth *authClient) RoleDelete(ctx context.Context, role string) (*AuthRoleDeleteResponse, error) {
resp, err := auth.remote.RoleDelete(ctx, &pb.AuthRoleDeleteRequest{Role: role}, auth.callOpts...)
return (*AuthRoleDeleteResponse)(resp), ContextError(ctx, err)
return (*AuthRoleDeleteResponse)(resp), toErr(ctx, err)
}

func StrToPermissionType(s string) (PermissionType, error) {
Expand Down
6 changes: 2 additions & 4 deletions client/v3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func (c *Client) Close() error {
c.Lease.Close()
}
if c.conn != nil {
return ContextError(c.ctx, c.conn.Close())
return toErr(c.ctx, c.conn.Close())
}
return c.ctx.Err()
}
Expand Down Expand Up @@ -573,9 +573,7 @@ func isUnavailableErr(ctx context.Context, err error) bool {
return false
}

// ContextError converts the error into an EtcdError if the error message matches one of
// the defined messages; otherwise, it tries to retrieve the context error.
func ContextError(ctx context.Context, err error) error {
func toErr(ctx context.Context, err error) error {
if err == nil {
return nil
}
Expand Down
10 changes: 5 additions & 5 deletions client/v3/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (c *cluster) memberAdd(ctx context.Context, peerAddrs []string, isLearner b
}
resp, err := c.remote.MemberAdd(ctx, r, c.callOpts...)
if err != nil {
return nil, ContextError(ctx, err)
return nil, toErr(ctx, err)
}
return (*MemberAddResponse)(resp), nil
}
Expand All @@ -102,7 +102,7 @@ func (c *cluster) MemberRemove(ctx context.Context, id uint64) (*MemberRemoveRes
r := &pb.MemberRemoveRequest{ID: id}
resp, err := c.remote.MemberRemove(ctx, r, c.callOpts...)
if err != nil {
return nil, ContextError(ctx, err)
return nil, toErr(ctx, err)
}
return (*MemberRemoveResponse)(resp), nil
}
Expand All @@ -119,7 +119,7 @@ func (c *cluster) MemberUpdate(ctx context.Context, id uint64, peerAddrs []strin
if err == nil {
return (*MemberUpdateResponse)(resp), nil
}
return nil, ContextError(ctx, err)
return nil, toErr(ctx, err)
}

func (c *cluster) MemberList(ctx context.Context) (*MemberListResponse, error) {
Expand All @@ -128,14 +128,14 @@ func (c *cluster) MemberList(ctx context.Context) (*MemberListResponse, error) {
if err == nil {
return (*MemberListResponse)(resp), nil
}
return nil, ContextError(ctx, err)
return nil, toErr(ctx, err)
}

func (c *cluster) MemberPromote(ctx context.Context, id uint64) (*MemberPromoteResponse, error) {
r := &pb.MemberPromoteRequest{ID: id}
resp, err := c.remote.MemberPromote(ctx, r, c.callOpts...)
if err != nil {
return nil, ContextError(ctx, err)
return nil, toErr(ctx, err)
}
return (*MemberPromoteResponse)(resp), nil
}
87 changes: 26 additions & 61 deletions client/v3/kubernetes/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,24 +31,20 @@ func New(cfg clientv3.Config) (*Client, error) {
}
kc := &Client{
Client: c,
kv: clientv3.RetryKVClient(c),
}
kc.Kubernetes = kc
return kc, nil
}

type Client struct {
*clientv3.Client
Kubernetes Interface
kv pb.KVClient
}

var _ Interface = (*Client)(nil)

func (k Client) Get(ctx context.Context, key string, opts GetOptions) (resp GetResponse, err error) {
rangeResp, err := k.kv.Range(ctx, getRequest(key, opts.Revision))
rangeResp, err := k.KV.Get(ctx, key, clientv3.WithRev(opts.Revision), clientv3.WithLimit(1))
if err != nil {
return resp, clientv3.ContextError(ctx, err)
return resp, err
}
resp.Revision = rangeResp.Header.Revision
if len(rangeResp.Kvs) == 1 {
Expand All @@ -58,17 +54,14 @@ func (k Client) Get(ctx context.Context, key string, opts GetOptions) (resp GetR
}

func (k Client) List(ctx context.Context, prefix string, opts ListOptions) (resp ListResponse, err error) {
rangeStart := prefix + opts.Continue
rangeStart := prefix
if opts.Continue != "" {
rangeStart = opts.Continue + "\x00"
}
rangeEnd := clientv3.GetPrefixRangeEnd(prefix)

rangeResp, err := k.kv.Range(ctx, &pb.RangeRequest{
Key: []byte(rangeStart),
RangeEnd: []byte(rangeEnd),
Limit: opts.Limit,
Revision: opts.Revision,
})
rangeResp, err := k.KV.Get(ctx, rangeStart, clientv3.WithRange(rangeEnd), clientv3.WithLimit(opts.Limit), clientv3.WithRev(opts.Revision))
if err != nil {
return resp, clientv3.ContextError(ctx, err)
return resp, err
}
resp.Kvs = rangeResp.Kvs
resp.Count = rangeResp.Count
Expand All @@ -77,28 +70,27 @@ func (k Client) List(ctx context.Context, prefix string, opts ListOptions) (resp
}

func (k Client) Count(ctx context.Context, prefix string, _ CountOptions) (int64, error) {
resp, err := k.kv.Range(ctx, &pb.RangeRequest{
Key: []byte(prefix),
RangeEnd: []byte(clientv3.GetPrefixRangeEnd(prefix)),
CountOnly: true,
})
resp, err := k.KV.Get(ctx, prefix, clientv3.WithPrefix(), clientv3.WithCountOnly())
if err != nil {
return 0, clientv3.ContextError(ctx, err)
return 0, err
}
return resp.Count, nil
}

func (k Client) OptimisticPut(ctx context.Context, key string, value []byte, expectedRevision int64, opts PutOptions) (resp PutResponse, err error) {
onSuccess := &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: &pb.PutRequest{Key: []byte(key), Value: value, Lease: int64(opts.LeaseID)}}}
txn := k.KV.Txn(ctx).If(
clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision),
).Then(
clientv3.OpPut(key, string(value), clientv3.WithLease(opts.LeaseID)),
)

var onFailure *pb.RequestOp
if opts.GetOnFailure {
onFailure = &pb.RequestOp{Request: &pb.RequestOp_RequestRange{RequestRange: getRequest(key, 0)}}
txn = txn.Else(clientv3.OpGet(key))
}

txnResp, err := k.optimisticTxn(ctx, key, expectedRevision, onSuccess, onFailure)
txnResp, err := txn.Commit()
if err != nil {
return resp, clientv3.ContextError(ctx, err)
return resp, err
}
resp.Succeeded = txnResp.Succeeded
resp.Revision = txnResp.Header.Revision
Expand All @@ -109,16 +101,17 @@ func (k Client) OptimisticPut(ctx context.Context, key string, value []byte, exp
}

func (k Client) OptimisticDelete(ctx context.Context, key string, expectedRevision int64, opts DeleteOptions) (resp DeleteResponse, err error) {
onSuccess := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{RequestDeleteRange: &pb.DeleteRangeRequest{Key: []byte(key)}}}

var onFailure *pb.RequestOp
txn := k.KV.Txn(ctx).If(
clientv3.Compare(clientv3.ModRevision(key), "=", expectedRevision),
).Then(
clientv3.OpDelete(key),
)
if opts.GetOnFailure {
onFailure = &pb.RequestOp{Request: &pb.RequestOp_RequestRange{RequestRange: getRequest(key, 0)}}
txn = txn.Else(clientv3.OpGet(key))
}

txnResp, err := k.optimisticTxn(ctx, key, expectedRevision, onSuccess, onFailure)
txnResp, err := txn.Commit()
if err != nil {
return resp, clientv3.ContextError(ctx, err)
return resp, err
}
resp.Succeeded = txnResp.Succeeded
resp.Revision = txnResp.Header.Revision
Expand All @@ -128,34 +121,6 @@ func (k Client) OptimisticDelete(ctx context.Context, key string, expectedRevisi
return resp, nil
}

func (k Client) optimisticTxn(ctx context.Context, key string, expectedRevision int64, onSuccess, onFailure *pb.RequestOp) (*pb.TxnResponse, error) {
txn := &pb.TxnRequest{
Compare: []*pb.Compare{
{
Result: pb.Compare_EQUAL,
Target: pb.Compare_MOD,
Key: []byte(key),
TargetUnion: &pb.Compare_ModRevision{ModRevision: expectedRevision},
},
},
}
if onSuccess != nil {
txn.Success = []*pb.RequestOp{onSuccess}
}
if onFailure != nil {
txn.Failure = []*pb.RequestOp{onFailure}
}
return k.kv.Txn(ctx, txn)
}

func getRequest(key string, revision int64) *pb.RangeRequest {
return &pb.RangeRequest{
Key: []byte(key),
Revision: revision,
Limit: 1,
}
}

func kvFromTxnResponse(resp *pb.ResponseOp) *mvccpb.KeyValue {
getResponse := resp.GetResponseRange()
if len(getResponse.Kvs) == 1 {
Expand Down
10 changes: 5 additions & 5 deletions client/v3/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,23 +112,23 @@ func NewKVFromKVClient(remote pb.KVClient, c *Client) KV {

func (kv *kv) Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) {
r, err := kv.Do(ctx, OpPut(key, val, opts...))
return r.put, ContextError(ctx, err)
return r.put, toErr(ctx, err)
}

func (kv *kv) Get(ctx context.Context, key string, opts ...OpOption) (*GetResponse, error) {
r, err := kv.Do(ctx, OpGet(key, opts...))
return r.get, ContextError(ctx, err)
return r.get, toErr(ctx, err)
}

func (kv *kv) Delete(ctx context.Context, key string, opts ...OpOption) (*DeleteResponse, error) {
r, err := kv.Do(ctx, OpDelete(key, opts...))
return r.del, ContextError(ctx, err)
return r.del, toErr(ctx, err)
}

func (kv *kv) Compact(ctx context.Context, rev int64, opts ...CompactOption) (*CompactResponse, error) {
resp, err := kv.remote.Compact(ctx, OpCompact(rev, opts...).toRequest(), kv.callOpts...)
if err != nil {
return nil, ContextError(ctx, err)
return nil, toErr(ctx, err)
}
return (*CompactResponse)(resp), err
}
Expand Down Expand Up @@ -173,5 +173,5 @@ func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) {
default:
panic("Unknown op")
}
return OpResponse{}, ContextError(ctx, err)
return OpResponse{}, toErr(ctx, err)
}
Loading

0 comments on commit 6356463

Please sign in to comment.