-
Notifications
You must be signed in to change notification settings - Fork 15
/
nakadi.go
280 lines (237 loc) · 8.31 KB
/
nakadi.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
/*
Package nakadi is a client library for the Nakadi event broker. It provides convenient access
to many features of Nakadi's API. The package can be used to manage event type definitions.
The EventAPI can be used to inspect existing event types and allows further to create new event
types and to update existing ones. The SubscriptionAPI provides subscription management: existing
subscriptions can be fetched from Nakadi and of course it is also possible to create new ones.
The PublishAPI of this package is used to broadcast event types of all event type categories via
Nakadi. Last but not least, the package also implements a StreamAPI, which enables event processing
on top of Nakadi's subscription based high level API.
To make the communication with Nakadi more resilient all sub APIs of this package can be configured
to retry failed requests using an exponential back-off algorithm.
*/
package nakadi
import (
"bytes"
"encoding/json"
"io"
"net/http"
"time"
"github.com/cenkalti/backoff/v4"
"github.com/pkg/errors"
)
const (
defaultTimeOut = 30 * time.Second
defaultInitialRetryInterval = time.Millisecond * 10
defaultMaxRetryInterval = 10 * time.Second
defaultMaxElapsedTime = 30 * time.Second
)
// A Client represents a basic configuration to access a Nakadi instance. The client is used to configure
// other sub APIs of the `go-nakadi` package.
type Client struct {
nakadiURL string
tokenProvider func() (string, error)
timeout time.Duration
httpClient *http.Client
httpStreamClient *http.Client
}
// Middleware provides a chainable http.RoundTripper middleware that can be used
// to hook into requests e.g. for logging or tracing purposes.
type Middleware func(transport *http.Transport) http.RoundTripper
// ClientOptions contains all non mandatory parameters used to instantiate the Nakadi client.
type ClientOptions struct {
TokenProvider func() (string, error)
ConnectionTimeout time.Duration
Middleware Middleware
}
func (o *ClientOptions) withDefaults() *ClientOptions {
var copyOptions ClientOptions
if o != nil {
copyOptions = *o
}
if copyOptions.ConnectionTimeout == 0 {
copyOptions.ConnectionTimeout = defaultTimeOut
}
if copyOptions.Middleware == nil {
copyOptions.Middleware = func(transport *http.Transport) http.RoundTripper { return transport }
}
return ©Options
}
// New creates a new Nakadi client. New receives the URL of the Nakadi instance the client should connect to.
// In addition the second parameter options can be used to configure the behavior of the client and of all sub
// APIs in this package. The options may be nil.
func New(url string, options *ClientOptions) *Client {
options = options.withDefaults()
client := &Client{
nakadiURL: url,
timeout: options.ConnectionTimeout,
tokenProvider: options.TokenProvider,
httpClient: newHTTPClient(options.ConnectionTimeout, options.Middleware),
httpStreamClient: newHTTPStream(options.ConnectionTimeout)}
return client
}
// httpGET fetches json encoded data with a GET request.
func (c *Client) httpGET(backOff backoff.BackOff, url string, body interface{}, msg string) error {
var response *http.Response
err := backoff.Retry(func() error {
request, err := http.NewRequest("GET", url, nil)
if err != nil {
return backoff.Permanent(errors.Wrapf(err, "%s: unable to prepare request", msg))
}
if c.tokenProvider != nil {
token, err := c.tokenProvider()
if err != nil {
return backoff.Permanent(errors.Wrapf(err, "%s: unable to prepare request", msg))
}
request.Header.Set("Authorization", "Bearer "+token)
}
response, err = c.httpClient.Do(request)
if err != nil {
return errors.Wrap(err, msg)
}
if response.StatusCode >= 500 {
buffer, err := io.ReadAll(response.Body)
if err != nil {
return errors.Wrapf(err, "%s: unable to read response body", msg)
}
err = decodeResponseToError(buffer, msg)
_ = response.Body.Close()
return err
}
return nil
}, backOff)
if err != nil {
return err
}
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
buffer, err := io.ReadAll(response.Body)
if err != nil {
return errors.Wrap(err, "unable to read response body")
}
return decodeResponseToError(buffer, msg)
}
err = json.NewDecoder(response.Body).Decode(body)
if err != nil {
return errors.Wrap(err, "unable to decode response body")
}
return nil
}
// httpPUT sends json encoded data via PUT request and returns a response.
func (c *Client) httpPUT(backOff backoff.BackOff, url string, body interface{}, msg string) (*http.Response, error) {
encoded, err := json.Marshal(body)
if err != nil {
return nil, errors.Wrapf(err, "%s: unable to encode json body", msg)
}
var response *http.Response
err = backoff.Retry(func() error {
request, err := http.NewRequest("PUT", url, bytes.NewReader(encoded))
if err != nil {
return backoff.Permanent(errors.Wrapf(err, "%s: unable to prepare request", msg))
}
request.Header.Set("Content-Type", "application/json;charset=UTF-8")
if c.tokenProvider != nil {
token, err := c.tokenProvider()
if err != nil {
return backoff.Permanent(errors.Wrapf(err, "%s: unable to prepare request", msg))
}
request.Header.Set("Authorization", "Bearer "+token)
}
response, err = c.httpClient.Do(request)
if err != nil {
return errors.Wrap(err, msg)
}
if response.StatusCode >= 500 {
buffer, err := io.ReadAll(response.Body)
if err != nil {
return errors.Wrapf(err, "%s: unable to read response body", msg)
}
err = decodeResponseToError(buffer, msg)
_ = response.Body.Close()
return err
}
return nil
}, backOff)
return response, err
}
// httpPOST sends json encoded data via POST request and returns a response.
func (c *Client) httpPOST(backOff backoff.BackOff, url string, body interface{}, msg string) (*http.Response, error) {
encoded, err := json.Marshal(body)
if err != nil {
return nil, errors.Wrapf(err, "%s: unable to encode json body", msg)
}
var response *http.Response
err = backoff.Retry(func() error {
request, err := http.NewRequest("POST", url, bytes.NewReader(encoded))
if err != nil {
return backoff.Permanent(errors.Wrapf(err, "%s: unable to prepare request", msg))
}
request.Header.Set("Content-Type", "application/json;charset=UTF-8")
if c.tokenProvider != nil {
token, err := c.tokenProvider()
if err != nil {
return backoff.Permanent(errors.Wrapf(err, "%s: unable to prepare request", msg))
}
request.Header.Set("Authorization", "Bearer "+token)
}
response, err = c.httpClient.Do(request)
if err != nil {
return errors.Wrap(err, msg)
}
if response.StatusCode >= 500 {
buffer, err := io.ReadAll(response.Body)
if err != nil {
return errors.Wrapf(err, "%s: unable to read response body", msg)
}
err = decodeResponseToError(buffer, msg)
_ = response.Body.Close()
return err
}
return nil
}, backOff)
return response, err
}
// httpDELETE sends a DELETE request. On errors httpDELETE expects a response body to contain
// an error message in the format of application/problem+json.
func (c *Client) httpDELETE(backOff backoff.BackOff, url, msg string) error {
var response *http.Response
err := backoff.Retry(func() error {
request, err := http.NewRequest("DELETE", url, nil)
if err != nil {
return backoff.Permanent(errors.Wrapf(err, "%s: unable to prepare request", msg))
}
if c.tokenProvider != nil {
token, err := c.tokenProvider()
if err != nil {
return backoff.Permanent(errors.Wrapf(err, "%s: unable to prepare request", msg))
}
request.Header.Set("Authorization", "Bearer "+token)
}
response, err = c.httpClient.Do(request)
if err != nil {
return errors.Wrap(err, msg)
}
if response.StatusCode >= 500 {
buffer, err := io.ReadAll(response.Body)
if err != nil {
return errors.Wrapf(err, "%s: unable to read response body", msg)
}
err = decodeResponseToError(buffer, msg)
_ = response.Body.Close()
return err
}
return nil
}, backOff)
if err != nil {
return err
}
defer response.Body.Close()
if response.StatusCode != http.StatusOK && response.StatusCode != http.StatusNoContent {
buffer, err := io.ReadAll(response.Body)
if err != nil {
return errors.Wrapf(err, "%s: unable to read response body", msg)
}
return decodeResponseToError(buffer, msg)
}
return nil
}