-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathexported.go
172 lines (145 loc) · 4.23 KB
/
exported.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
// package bk is intended to create a client side load balancer or
// rate limiter for API integrations. This library is specifically designed to
// wrap the `Do` method of the http.Client but since it uses an interface
// abstraction it can wrap any interface and limit requests.
//
// Controls
// - Delay between requests
// - Number of retries per request
// - Concurrency limit for the client
package bk
import (
"context"
"errors"
"fmt"
"net/http"
"time"
)
type RoundTrip func(*http.Request) (*http.Response, error)
// New creates a new instance of the bridgekeeper for use with an api. New
// returns an interface implementation of Client which replaces the
// implementation of an http.Client interface so that it looks like an
// http.Client and can perform the same functions but it limits the requests
// using the parameters defined when created. NOTE: If a request timeout is not
// set at creation then the default HTTP client request timeout will be used
func New(
ctx context.Context,
fn RoundTrip,
delay time.Duration,
retries int,
concurrency int,
requestTimeout time.Duration,
) *Keeper {
if requestTimeout < time.Nanosecond {
requestTimeout = http.DefaultClient.Timeout
}
if retries < 0 {
retries = 0
}
if delay <= 0 {
delay = time.Nanosecond
}
// ensure the concurrency is setup above zero
if concurrency < 1 {
concurrency = 1
}
// Setup a background context if no context is passed
if ctx == nil {
ctx = context.Background()
}
ctx, cancel := context.WithCancel(ctx)
// If a nil client is passed to the bridgekeeper then initialize using the
// default http client
if fn == nil {
fn = http.DefaultClient.Do
}
k := &Keeper{
ctx: ctx,
cancel: cancel,
fn: fn,
retries: retries,
delay: delay,
ticker: time.NewTicker(delay),
concurrency: concurrency,
concurrencyticker: make(chan bool, concurrency),
requestTimeout: requestTimeout,
}
// Initialize the concurrency channel for managing concurrent calls
for i := 0; i < k.concurrency; i++ {
select {
case <-k.ctx.Done():
case k.concurrencyticker <- true:
}
}
// Setup requests channel
k.requests = k.receive()
go k.cleanup()
return k
}
// cleanup deals with cleaning any struct values for the keeper
func (k *Keeper) cleanup() {
<-k.ctx.Done()
if k.ticker != nil {
k.ticker.Stop()
}
}
// RoundTrip is a wrapper for the Do method of the bridgekeeper. This is
// necessary for the bridgekeeper to implement the http.RoundTripper
func (k *Keeper) RoundTrip(req *http.Request) (*http.Response, error) {
return k.Do(req)
}
// Do sends the http request through the bridgekeeper to be executed against the
// endpoint when there are available threads to do so. This returns an http
// response which is returned from the execution of the http request as well
// as an error
//
// XXX: Possibly add in defer here that determines if the response is nil
// and executes the wrapped `Do` method directly
func (k *Keeper) Do(request *http.Request) (*http.Response, error) {
if request == nil {
return nil, errors.New("request cannot be nil")
}
// Fail open if the bridgekeeper request was canceled
select {
case <-k.ctx.Done():
return k.fn(request)
default:
// If the request has a context then use it
ctx := request.Context()
if ctx == nil || ctx == context.Background() {
ctx = k.ctx
// Add the context to the request if it didn't already
// have one assigned
request = request.WithContext(ctx)
}
// Enforce request specific timeout
ctx, cancel := context.WithTimeout(ctx, k.requestTimeout)
defer cancel()
var responsechan = make(chan responseWrapper)
defer close(responsechan)
// Create the request wrapper to send to receive
req := &requestWrapper{
ctx: ctx,
request: request,
response: responsechan,
}
// Send the request to the processing channel of the bridgekeeper
go func() {
select {
case <-ctx.Done():
return
case k.requests <- req:
}
}()
// Wait for the response from the request
select {
case <-ctx.Done():
return nil, ctx.Err()
case resp, ok := <-responsechan:
if !ok {
return nil, fmt.Errorf("response channel closed prematurely")
}
return resp.response, resp.err
}
}
}