forked from tsuna/gohbase
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathadmin_client.go
156 lines (133 loc) · 3.95 KB
/
admin_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
// Copyright (C) 2016 The GoHBase Authors. All rights reserved.
// This file is part of GoHBase.
// Use of this source code is governed by the Apache License 2.0
// that can be found in the COPYING file.
package gohbase
import (
"context"
"errors"
"fmt"
log "github.com/sirupsen/logrus"
"github.com/tsuna/gohbase/hrpc"
"github.com/tsuna/gohbase/pb"
"github.com/tsuna/gohbase/region"
"github.com/tsuna/gohbase/zk"
)
// AdminClient to perform admistrative operations with HMaster
type AdminClient interface {
CreateTable(t *hrpc.CreateTable) error
DeleteTable(t *hrpc.DeleteTable) error
EnableTable(t *hrpc.EnableTable) error
DisableTable(t *hrpc.DisableTable) error
ClusterStatus() (*pb.ClusterStatus, error)
}
// NewAdminClient creates an admin HBase client.
func NewAdminClient(zkquorum string, options ...Option) AdminClient {
return newAdminClient(zkquorum, options...)
}
func newAdminClient(zkquorum string, options ...Option) AdminClient {
log.WithFields(log.Fields{
"Host": zkquorum,
}).Debug("Creating new admin client.")
c := &client{
clientType: adminClient,
rpcQueueSize: defaultRPCQueueSize,
flushInterval: defaultFlushInterval,
// empty region in order to be able to set client to it
adminRegionInfo: region.NewInfo(0, nil, nil, nil, nil, nil),
zkTimeout: defaultZkTimeout,
zkRoot: defaultZkRoot,
effectiveUser: defaultEffectiveUser,
regionLookupTimeout: region.DefaultLookupTimeout,
regionReadTimeout: region.DefaultReadTimeout,
}
for _, option := range options {
option(c)
}
c.zkClient = zk.NewClient(zkquorum, c.zkTimeout)
return c
}
//Get the status of the cluster
func (c *client) ClusterStatus() (*pb.ClusterStatus, error) {
pbmsg, err := c.SendRPC(hrpc.NewClusterStatus())
if err != nil {
return nil, err
}
r, ok := pbmsg.(*pb.GetClusterStatusResponse)
if !ok {
return nil, fmt.Errorf("sendRPC returned not a ClusterStatusResponse")
}
return r.GetClusterStatus(), nil
}
func (c *client) CreateTable(t *hrpc.CreateTable) error {
pbmsg, err := c.SendRPC(t)
if err != nil {
return err
}
r, ok := pbmsg.(*pb.CreateTableResponse)
if !ok {
return fmt.Errorf("sendRPC returned not a CreateTableResponse")
}
return c.checkProcedureWithBackoff(t.Context(), r.GetProcId())
}
func (c *client) DeleteTable(t *hrpc.DeleteTable) error {
pbmsg, err := c.SendRPC(t)
if err != nil {
return err
}
r, ok := pbmsg.(*pb.DeleteTableResponse)
if !ok {
return fmt.Errorf("sendRPC returned not a DeleteTableResponse")
}
return c.checkProcedureWithBackoff(t.Context(), r.GetProcId())
}
func (c *client) EnableTable(t *hrpc.EnableTable) error {
pbmsg, err := c.SendRPC(t)
if err != nil {
return err
}
r, ok := pbmsg.(*pb.EnableTableResponse)
if !ok {
return fmt.Errorf("sendRPC returned not a EnableTableResponse")
}
return c.checkProcedureWithBackoff(t.Context(), r.GetProcId())
}
func (c *client) DisableTable(t *hrpc.DisableTable) error {
pbmsg, err := c.SendRPC(t)
if err != nil {
return err
}
r, ok := pbmsg.(*pb.DisableTableResponse)
if !ok {
return fmt.Errorf("sendRPC returned not a DisableTableResponse")
}
return c.checkProcedureWithBackoff(t.Context(), r.GetProcId())
}
func (c *client) checkProcedureWithBackoff(ctx context.Context, procID uint64) error {
backoff := backoffStart
for {
pbmsg, err := c.SendRPC(hrpc.NewGetProcedureState(ctx, procID))
if err != nil {
return err
}
res := pbmsg.(*pb.GetProcedureResultResponse)
switch res.GetState() {
case pb.GetProcedureResultResponse_NOT_FOUND:
return fmt.Errorf("procedure not found")
case pb.GetProcedureResultResponse_FINISHED:
if fe := res.Exception; fe != nil {
ge := fe.GenericException
if ge == nil {
return errors.New("got unexpected empty exception")
}
return fmt.Errorf("procedure exception: %s: %s", ge.GetClassName(), ge.GetMessage())
}
return nil
default:
backoff, err = sleepAndIncreaseBackoff(ctx, backoff)
if err != nil {
return err
}
}
}
}