Skip to content

Commit

Permalink
updated v2.send to use retryhttp client
Browse files Browse the repository at this point in the history
  • Loading branch information
maurafortino committed Dec 18, 2024
1 parent c97f205 commit b8e37f1
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 54 deletions.
83 changes: 31 additions & 52 deletions internal/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func NewSink(c Config, logger *zap.Logger, listener ancla.Register) (Sink, error
deliveryInterval: c.DeliveryInterval,
deliveryRetries: c.DeliveryRetries,
},
client: &http.Client{},
client: &http.Client{}, //TODO: do we want expose configuration for this client via config?
}
if len(wh.ReceiverURLs) == 0 && len(wh.DNSSrvRecord.FQDNs) == 0 {
return nil, fmt.Errorf("either `ReceiverURLs` or `DNSSrvRecord` must be used")
Expand All @@ -116,7 +116,7 @@ func NewSink(c Config, logger *zap.Logger, listener ancla.Register) (Sink, error
v2.updateUrls(urlCount, wh.ReceiverURLs[0], wh.ReceiverURLs[1:])
}

transport, err := NewSRVRecordDailer(wh.DNSSrvRecord)
transport, err := NewSRVRecordDialer(wh.DNSSrvRecord)
if err != nil {
return nil, errors.Join(err, fmt.Errorf("error recevied parsing urls"))
}
Expand Down Expand Up @@ -215,29 +215,6 @@ func getUrls(urls []string) (int, error) {

}

func (whs WebhookSink) Send(secret, acceptType string, msg *wrp.Message) error {
var errs error
if len(*whs.Hash) == len(whs.webooks) {
//TODO: flush out the error handling for kafka
if v2, ok := whs.webooks[whs.Hash.Get(GetKey(whs.HashField, msg))]; ok {
err := v2.send(secret, acceptType, msg)
if err != nil {
errs = errors.Join(errs, err)
}
}
} else {
//TODO: discuss with wes and john the default hashing logic
//for now: when no hash is given we will just loop through all the kafkas
for _, v2 := range whs.webooks {
err := v2.send(secret, acceptType, msg)
if err != nil {
errs = errors.Join(errs, err)
}
}
}
return errs
}

// worker is the routine that actually takes the queued messages and delivers
// them to the listeners outside webpa
func (v1 *WebhookV1) Send(secret, acceptType string, msg *wrp.Message) error {
Expand Down Expand Up @@ -650,22 +627,6 @@ func AddMessageHeaders(kafkaMsg *sarama.ProducerMessage, m *wrp.Message) {
}
}

func (v2 *WebhookV2) Update(c Config, l *zap.Logger, altUrls []string, id, failureUrl, receiverUrl string) {
//TODO: do we need to return an error if not - we should get rid of the error return
v2.id = id
v2.failureUrl = failureUrl
v2.deliveryInterval = c.DeliveryInterval
v2.deliveryRetries = c.DeliveryRetries
v2.logger = l

urlCount, err := getUrls(altUrls)
if err != nil {
l.Error("error recevied parsing urls", zap.Error(err))
}
v2.updateUrls(urlCount, receiverUrl, altUrls)

}

func (v2 *WebhookV2) updateUrls(urlCount int, url string, urls []string) {
v2.mutex.Lock()
defer v2.mutex.Unlock()
Expand All @@ -691,6 +652,29 @@ func (v2 *WebhookV2) updateUrls(urlCount int, url string, urls []string) {
}
}

func (whs WebhookSink) Send(secret, acceptType string, msg *wrp.Message) error {
var errs error
if len(*whs.Hash) == len(whs.webooks) {
//TODO: flush out the error handling for kafka
if v2, ok := whs.webooks[whs.Hash.Get(GetKey(whs.HashField, msg))]; ok {
err := v2.send(secret, acceptType, msg)
if err != nil {
errs = errors.Join(errs, err)
}
}
} else {
//TODO: discuss with wes and john the default hashing logic
//for now: when no hash is given we will just loop through all the kafkas
for _, v2 := range whs.webooks {
err := v2.send(secret, acceptType, msg)
if err != nil {
errs = errors.Join(errs, err)
}
}
}
return errs
}

// worker is the routine that actually takes the queued messages and delivers
// them to the listeners outside webpa
func (v2 *WebhookV2) send(secret, acceptType string, msg *wrp.Message) error {
Expand Down Expand Up @@ -759,17 +743,12 @@ func (v2 *WebhookV2) send(secret, acceptType string, msg *wrp.Message) error {
// Send it
v2.logger.Debug("attempting to send event", zap.String("event.source", msg.Source), zap.String("event.destination", msg.Destination))

var resp *http.Response
if v2.client != nil {
resp, err = v2.client.Do(req)
} else {
client, _ := retryhttp.NewClient(
// retryhttp.WithHTTPClient(s.clientMiddleware(s.client)),
retryhttp.WithRunner(v2.addRunner(req, event)),
retryhttp.WithRequesters(v2.updateRequest(v2.urls)),
)
resp, err = client.Do(req)
}
client, _ := retryhttp.NewClient(
retryhttp.WithHTTPClient(v2.client),
retryhttp.WithRunner(v2.addRunner(req, event)),
retryhttp.WithRequesters(v2.updateRequest(v2.urls)),
)
resp, err := client.Do(req)

var deliveryCounterLabels []string
code := metrics.MessageDroppedCode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/xmidt-org/webhook-schema"
)

func NewSRVRecordDailer(dnsSrvRecord webhook.DNSSrvRecord) (http.RoundTripper, error) {
func NewSRVRecordDialer(dnsSrvRecord webhook.DNSSrvRecord) (http.RoundTripper, error) {
if len(dnsSrvRecord.FQDNs) == 0 {
return http.DefaultTransport, nil
}
Expand Down Expand Up @@ -68,7 +68,7 @@ func (d *SRVRecordDialer) DialContext(ctx context.Context, _, _ string) (net.Con
conn, err := net.Dial("", host)
if err != nil {
errs = errors.Join(errs,
fmt.Errorf("%v: host `%s` [weight: %s, priortiy: %s] from srv record `%v`",
fmt.Errorf("%v: host `%s` [weight: %d, priortiy: %d] from srv record `%v`",
err, host, addr.Weight, addr.Priority, d.dnsSrvRecord.FQDNs))
continue
}
Expand Down

0 comments on commit b8e37f1

Please sign in to comment.