Skip to content

Commit

Permalink
patch: update legacy eventType
Browse files Browse the repository at this point in the history
patch: update legacy eventType
  • Loading branch information
denopink committed Mar 12, 2024
1 parent 72852aa commit 85569a3
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 28 deletions.
11 changes: 7 additions & 4 deletions eventDispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,13 @@ func (d *eventDispatcher) OnDeviceEvent(event *device.Event) {
destination := routable.To()
contentType := event.Format.ContentType()
if strings.HasPrefix(destination, EventPrefix) {
eventType = destination[len(EventPrefix):]
url, err = d.dispatchEvent(eventType, contentType, event.Contents)
if err != nil {
d.logger.Error("Error dispatching event", zap.Any("eventType", eventType), zap.Any("destination", destination), zap.Error(err))
var l *wrp.Locator
if l, err = wrp.ParseLocator(destination); err == nil {
eventType = l.Authority
url, err = d.dispatchEvent(eventType, contentType, event.Contents)
if err != nil {
d.logger.Error("Error dispatching event", zap.Any("eventType", eventType), zap.Any("destination", destination), zap.Error(err))
}
}
} else if strings.HasPrefix(destination, DNSPrefix) {
eventType = event.Type.String()
Expand Down
72 changes: 64 additions & 8 deletions eventDispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package main
import (
"bytes"
"errors"
"fmt"
"io"
"testing"
"time"
Expand Down Expand Up @@ -234,9 +235,11 @@ func testEventDispatcherOnDeviceEventDispatchEvent(t *testing.T) {

func testEventDispatcherOnDeviceEventFullQueue(t *testing.T) {
var (
b bytes.Buffer
assert = assert.New(t)
require = require.New(t)
b bytes.Buffer
assert = assert.New(t)
require = require.New(t)
expectedEventType = "node-change"

outbounder = &Outbounder{
RequestTimeout: 100 * time.Millisecond,
EventEndpoints: map[string]interface{}{"default": []string{"nowhere.com"}},
Expand All @@ -257,24 +260,76 @@ func testEventDispatcherOnDeviceEventFullQueue(t *testing.T) {
require.NoError(err)

d.(*eventDispatcher).outbounds = make(chan outboundEnvelope)
dm.On("With", prometheus.Labels{eventLabel: "iot", codeLabel: messageDroppedCode, reasonLabel: fullQueueReason, urlLabel: "nowhere.com"}).Return().Once()
dm.On("With", prometheus.Labels{eventLabel: expectedEventType, codeLabel: messageDroppedCode, reasonLabel: fullQueueReason, urlLabel: "nowhere.com"}).Return().Once()
dm.On("Add", 1.).Return().Once()
d.OnDeviceEvent(&device.Event{
Type: device.MessageReceived,
Message: &wrp.Message{Destination: "event:iot"},
Message: &wrp.Message{Destination: fmt.Sprintf("event:%s/mac:11:22:33:44:55:66/Online/unknown/deb2eb69999", expectedEventType)},
Contents: []byte{1, 2},
})
assert.Greater(b.Len(), 0)
dm.AssertExpectations(t)
}

func testEventDispatcherOnDeviceEventMessageReceived(t *testing.T) {
var (
assert = assert.New(t)
require = require.New(t)
b bytes.Buffer
expectedEventType = "node-change"
m = wrp.Message{Destination: fmt.Sprintf("event:%s/mac:11:22:33:44:55:66/Online/unknown/deb2eb69999", expectedEventType)}
o = Outbounder{
Method: "PATCH",
EventEndpoints: map[string]interface{}{"default": []string{"nowhere.com"}},
Logger: zap.New(
zapcore.NewCore(zapcore.NewJSONEncoder(
zapcore.EncoderConfig{
MessageKey: "message",
}), zapcore.AddSync(&b), zapcore.ErrorLevel),
),
}
dispatcher, outbounds, err = NewEventDispatcher(NewTestOutboundMeasures(), &o, nil)
)

require.NotNil(dispatcher)
require.NotNil(outbounds)
require.NoError(err)

dispatcher.OnDeviceEvent(&device.Event{
Type: device.MessageReceived,
Message: &m,
})

require.Equal(1, len(outbounds))
e := <-outbounds
e.cancel()
<-e.request.Context().Done()

assert.Equal(o.method(), e.request.Method)
assert.Zero(b)
eventType, ok := e.request.Context().Value(eventTypeContextKey{}).(string)
require.True(ok)
assert.Equal(expectedEventType, eventType)
}

func testEventDispatcherOnDeviceEventFilterError(t *testing.T) {
var (
assert = assert.New(t)
require = require.New(t)
urlFilter = new(mockURLFilter)
expectedError = errors.New("expected")

dispatcher, outbounds, err = NewEventDispatcher(NewTestOutboundMeasures(), nil, urlFilter)
b bytes.Buffer
o = Outbounder{
Method: "PATCH",
EventEndpoints: map[string]interface{}{"default": []string{"nowhere.com"}},
Logger: zap.New(
zapcore.NewCore(zapcore.NewJSONEncoder(
zapcore.EncoderConfig{
MessageKey: "message",
}), zapcore.AddSync(&b), zapcore.ErrorLevel),
),
}
dispatcher, outbounds, err = NewEventDispatcher(NewTestOutboundMeasures(), &o, urlFilter)
)

require.NotNil(dispatcher)
Expand All @@ -289,8 +344,8 @@ func testEventDispatcherOnDeviceEventFilterError(t *testing.T) {
Message: &wrp.Message{Destination: "dns:doesnotmatter.com"},
})

// TODO verify logger's buffer isn't empty
assert.Equal(0, len(outbounds))
assert.NotZero(b)
urlFilter.AssertExpectations(t)
}

Expand Down Expand Up @@ -438,6 +493,7 @@ func TestEventDispatcherOnDeviceEvent(t *testing.T) {
test func(*testing.T)
}{
{"ConnectEvent", testEventDispatcherOnDeviceEventConnectEvent},
{"CorrectEventType", testEventDispatcherOnDeviceEventMessageReceived},
{"DisconnectEvent", testEventDispatcherOnDeviceEventDisconnectEvent},
{"Unroutable", testEventDispatcherOnDeviceEventUnroutable},
{"BadURLFilter", testEventDispatcherOnDeviceEventBadURLFilter},
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ require (
github.com/xmidt-org/sallust v0.2.2
github.com/xmidt-org/touchstone v0.1.5
github.com/xmidt-org/webpa-common/v2 v2.3.2
github.com/xmidt-org/wrp-go/v3 v3.4.0
github.com/xmidt-org/wrp-go/v3 v3.4.1
go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux v0.40.0
go.uber.org/zap v1.27.0
)
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -566,8 +566,8 @@ github.com/xmidt-org/touchstone v0.1.5 h1:Afm3P0EzCOWD1ITyVLsEDPVQkSE0t2ZhHyV+kO
github.com/xmidt-org/touchstone v0.1.5/go.mod h1:Dz0fA1eWjm/2WrsdEeaQZMevkmfdYTsAbQfLaTrB8Eo=
github.com/xmidt-org/webpa-common/v2 v2.3.2 h1:66RUmlkltI3iet55WLsSVUW6D5Z7+JpxWyrn84ScQc4=
github.com/xmidt-org/webpa-common/v2 v2.3.2/go.mod h1:WnMf2dLIZOQ5Gvje9Ges/ovHl2pqERFpfP+ST49v6bw=
github.com/xmidt-org/wrp-go/v3 v3.4.0 h1:CZ11X4LdPPSpk76bddl8PdNyW0TiCaIXtbUmKGhp9HQ=
github.com/xmidt-org/wrp-go/v3 v3.4.0/go.mod h1:jVMp/NDHgLnteXjVKryCVpqAaEs8HQun8bb19et8XUU=
github.com/xmidt-org/wrp-go/v3 v3.4.1 h1:WJUBwQPDja/v/nwtH14wJWLh22FEXiAGRuBMfxKPMh8=
github.com/xmidt-org/wrp-go/v3 v3.4.1/go.mod h1:j1kLLoPJmKkMFz/vlwP238WBoFhJgbPyJDN9W2V1TxY=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down
9 changes: 4 additions & 5 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,9 @@ const (
connectionUnexpectedlyClosedEOFReason = "connection_unexpectedly_closed_eof"
noErrReason = "no_err"
expectedCodeReason = "expected_code"
non202CodeReason = "non202"

// dropped message codes
non202Code = "non202"
expected202Code = "202"
messageDroppedCode = "message_dropped"

// outbound event delivery outcomes
Expand Down Expand Up @@ -268,7 +267,7 @@ func InstrumentOutboundSize(obs HistogramVec, next http.RoundTripper) promhttp.R
} else {
labels = prometheus.Labels{eventLabel: eventType, codeLabel: strconv.Itoa(response.StatusCode), reasonLabel: expectedCodeReason, urlLabel: request.URL.String()}
if response.StatusCode != http.StatusAccepted {
labels[reasonLabel] = non202Code
labels[reasonLabel] = non202CodeReason
}
}

Expand Down Expand Up @@ -299,7 +298,7 @@ func InstrumentOutboundDuration(obs HistogramVec, next http.RoundTripper) promht
} else {
labels = prometheus.Labels{eventLabel: eventType, codeLabel: strconv.Itoa(response.StatusCode), reasonLabel: expectedCodeReason, urlLabel: request.URL.String()}
if response.StatusCode != http.StatusAccepted {
labels[reasonLabel] = non202Code
labels[reasonLabel] = non202CodeReason
}
}

Expand Down Expand Up @@ -329,7 +328,7 @@ func InstrumentOutboundCounter(counter CounterVec, next http.RoundTripper) promh
} else {
labels = prometheus.Labels{eventLabel: eventType, codeLabel: strconv.Itoa(response.StatusCode), reasonLabel: noErrReason, urlLabel: request.URL.String()}
if response.StatusCode != http.StatusAccepted {
labels[reasonLabel] = non202Code
labels[reasonLabel] = non202CodeReason
}
}

Expand Down
6 changes: 3 additions & 3 deletions workerPool.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,10 @@ func (wp *WorkerPool) transact(e outboundEnvelope) {
url := e.request.URL.String()
switch response.StatusCode {
case http.StatusAccepted:
wp.logger.Debug("HTTP response", zap.String("status", response.Status), zap.String(eventLabel, eventType), zap.String(codeLabel, code), zap.String(reasonLabel, expected202Code), zap.String(urlLabel, url))
wp.logger.Debug("HTTP response", zap.String("status", response.Status), zap.String(eventLabel, eventType), zap.String(codeLabel, code), zap.String(reasonLabel, expectedCodeReason), zap.String(urlLabel, url))
default:
wp.droppedMessages.With(prometheus.Labels{eventLabel: eventType, codeLabel: code, reasonLabel: non202Code, urlLabel: url}).Add(1)
wp.logger.Warn("HTTP response", zap.String(eventLabel, eventType), zap.String(codeLabel, code), zap.String(reasonLabel, non202Code), zap.String(urlLabel, url))
wp.droppedMessages.With(prometheus.Labels{eventLabel: eventType, codeLabel: code, reasonLabel: non202CodeReason, urlLabel: url}).Add(1)
wp.logger.Warn("HTTP response", zap.String(eventLabel, eventType), zap.String(codeLabel, code), zap.String(reasonLabel, non202CodeReason), zap.String(urlLabel, url))
}

io.Copy(io.Discard, response.Body)
Expand Down
10 changes: 5 additions & 5 deletions workerPool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func testWorkerPoolTransactHTTPSuccess(t *testing.T) {
}
)

dm.On("With", prometheus.Labels{eventLabel: EventPrefix, codeLabel: strconv.Itoa(http.StatusAccepted), reasonLabel: non202Code, urlLabel: target}).Panic("Func dm.With should have not been called")
dm.On("With", prometheus.Labels{eventLabel: EventPrefix, codeLabel: strconv.Itoa(http.StatusAccepted), reasonLabel: non202CodeReason, urlLabel: target}).Panic("Func dm.With should have not been called")
dm.On("Add", 1.).Panic("Func dm.Add should have not been called")
require.NotPanics(func() { wp.transact(envelope) })
assert.Equal(b.Len(), 0)
Expand Down Expand Up @@ -80,7 +80,7 @@ func testWorkerPoolTransactHTTPError(t *testing.T) {
},
},
expectedCode: http.StatusInternalServerError,
expectedReason: non202Code,
expectedReason: non202CodeReason,
},
{
description: "failure 415, caduceus /notify response case",
Expand All @@ -96,7 +96,7 @@ func testWorkerPoolTransactHTTPError(t *testing.T) {
},
},
expectedCode: http.StatusUnsupportedMediaType,
expectedReason: non202Code,
expectedReason: non202CodeReason,
},
{
description: "failure 503, caduceus /notify response case",
Expand All @@ -112,7 +112,7 @@ func testWorkerPoolTransactHTTPError(t *testing.T) {
},
},
expectedCode: http.StatusServiceUnavailable,
expectedReason: non202Code,
expectedReason: non202CodeReason,
},
{
description: "failure 400, caduceus /notify response case",
Expand All @@ -128,7 +128,7 @@ func testWorkerPoolTransactHTTPError(t *testing.T) {
},
},
expectedCode: http.StatusBadRequest,
expectedReason: non202Code,
expectedReason: non202CodeReason,
},
{
description: "failure 408 timeout, caduceus /notify response case",
Expand Down

0 comments on commit 85569a3

Please sign in to comment.