Skip to content

Commit

Permalink
use message headers from rabbitMQ on runners (#62)
Browse files Browse the repository at this point in the history
* use message headers from rabbitMQ on runners

fixes #61

* fix lint issues
  • Loading branch information
leandro-lugaresi authored Feb 27, 2019
1 parent 455b376 commit b5e651c
Show file tree
Hide file tree
Showing 11 changed files with 267 additions and 194 deletions.
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ TEST_PATTERN?=./...
TEST_OPTIONS?=-race

setup: ## Install all the build and lint dependencies
go get github.com/golangci/golangci-lint/cmd/golangci-lint
go get github.com/mfridman/tparse
go get golang.org/x/tools/cmd/cover
GO111MODULE=off go get github.com/golangci/golangci-lint/cmd/golangci-lint
GO111MODULE=off go get github.com/mfridman/tparse
GO111MODULE=off go get golang.org/x/tools/cmd/cover
go get ./...

test: ## Run all the tests
Expand Down
2 changes: 1 addition & 1 deletion cmd/launch.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var launchCmd = &cobra.Command{
Use: "launch",
Short: "Launch will start all the consumers from the config file",
Long: `Launch will start all the consumers from the config file `,
RunE: func(cmd *cobra.Command, args []string) error {
RunE: func(cmd *cobra.Command, _ []string) error {
err := initConfig()
if err != nil {
return errors.Wrap(err, "failed initializing the config")
Expand Down
14 changes: 11 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
module github.com/leandro-lugaresi/message-cannon

require (
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 // indirect
github.com/BurntSushi/toml v0.3.1 // indirect
github.com/Microsoft/go-winio v0.4.12 // indirect
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 // indirect
github.com/a8m/envsubst v1.1.0
github.com/cenkalti/backoff v2.1.1+incompatible // indirect
github.com/containerd/continuity v0.0.0-20181203112020-004b46473808 // indirect
github.com/creasty/defaults v1.2.1
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.3.3 // indirect
github.com/golangci/golangci-lint v1.12.5 // indirect
github.com/google/go-cmp v0.2.0 // indirect
github.com/gotestyourself/gotestyourself v2.2.0+incompatible // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/kr/pretty v0.1.0 // indirect
github.com/leandro-lugaresi/hub v1.1.0
github.com/mfridman/tparse v0.7.2 // indirect
github.com/lib/pq v1.0.0 // indirect
github.com/michaelklishin/rabbit-hole v1.4.0
github.com/onsi/gomega v1.4.2 // indirect
github.com/opencontainers/go-digest v1.0.0-rc1 // indirect
github.com/opencontainers/image-spec v1.0.1 // indirect
github.com/opencontainers/runc v0.1.1 // indirect
Expand All @@ -25,7 +32,8 @@ require (
github.com/streadway/amqp v0.0.0-20181205114330-a314942b2fd9
github.com/stretchr/testify v1.2.2
golang.org/x/net v0.0.0-20190110200230-915654e7eabc // indirect
golang.org/x/tools v0.0.0-20190111214448-fc1d57b08d7b // indirect
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
gopkg.in/ory-am/dockertest.v3 v3.3.3
gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637
gotest.tools v2.2.0+incompatible // indirect
)
158 changes: 28 additions & 130 deletions go.sum

Large diffs are not rendered by default.

30 changes: 0 additions & 30 deletions rabbit/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package rabbit
import (
"context"
"errors"
"strconv"
"time"

"gopkg.in/tomb.v2"
Expand Down Expand Up @@ -164,32 +163,3 @@ func (c *consumer) processMessage(ctx context.Context, msg amqp.Delivery) {
})
}
}

func getHeaders(msg amqp.Delivery) map[string]string {
headers := map[string]string{
"Content-Type": msg.ContentType,
"Content-Encoding": msg.ContentEncoding,
"Correlation-Id": msg.CorrelationId,
"Message-Id": msg.MessageId,
}
xdeaths, ok := msg.Headers["x-death"].([]interface{})
if !ok {
return headers
}
var (
count, deathCount int64
xdeath amqp.Table
)
for _, ideath := range xdeaths {
xdeath, ok = ideath.(amqp.Table)
if !ok {
continue
}
if xdeath["reason"] != "expired" {
count, _ = xdeath["count"].(int64)
deathCount += count
}
}
headers["Message-Deaths"] = strconv.FormatInt(deathCount, 10)
return headers
}
47 changes: 47 additions & 0 deletions rabbit/header.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package rabbit

import (
"strconv"
"time"

"github.com/leandro-lugaresi/message-cannon/runner"
"github.com/streadway/amqp"
)

func getHeaders(msg amqp.Delivery) runner.Headers {
headers := runner.Headers{
"Content-Type": msg.ContentType,
"Content-Encoding": msg.ContentEncoding,
"Correlation-Id": msg.CorrelationId,
"Message-Id": msg.MessageId,
}
for k, v := range msg.Headers {
switch vt := v.(type) {
case int, int16, int32, int64, float32, float64, string, []byte, time.Time, bool:
headers[k] = vt
}
}
xdeaths, ok := msg.Headers["x-death"].([]interface{})
if ok {
headers["Message-Deaths"] = processDeaths(xdeaths)
}

return headers
}

func processDeaths(xdeaths []interface{}) string {
var (
count, deathCount int64
)
for _, ideath := range xdeaths {
xdeath, ok := ideath.(amqp.Table)
if !ok {
continue
}
if xdeath["reason"] != "expired" {
count, _ = xdeath["count"].(int64)
deathCount += count
}
}
return strconv.FormatInt(deathCount, 10)
}
29 changes: 24 additions & 5 deletions rabbit/consumer_test.go → rabbit/header_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package rabbit
import (
"testing"

"github.com/leandro-lugaresi/message-cannon/runner"
"github.com/streadway/amqp"
"github.com/stretchr/testify/require"
)
Expand All @@ -11,12 +12,12 @@ func Test_getHeaders(t *testing.T) {
tests := []struct {
name string
args amqp.Delivery
want map[string]string
want runner.Headers
}{
{
"with empty headers",
amqp.Delivery{Body: []byte(`foooo`)},
map[string]string{
runner.Headers{
"Content-Encoding": "",
"Content-Type": "",
"Correlation-Id": "",
Expand All @@ -32,7 +33,7 @@ func Test_getHeaders(t *testing.T) {
MessageId: "12345566",
Body: []byte(`foooo`),
},
map[string]string{
runner.Headers{
"Content-Encoding": "compress, gzip",
"Content-Type": "application/json",
"Correlation-Id": "id-12334455",
Expand All @@ -47,7 +48,7 @@ func Test_getHeaders(t *testing.T) {
"x-death": []amqp.Table{},
},
},
map[string]string{
runner.Headers{
"Content-Encoding": "",
"Content-Type": "",
"Correlation-Id": "",
Expand Down Expand Up @@ -81,14 +82,32 @@ func Test_getHeaders(t *testing.T) {
},
},
},
map[string]string{
runner.Headers{
"Content-Encoding": "",
"Content-Type": "",
"Correlation-Id": "",
"Message-Id": "",
"Message-Deaths": "6",
},
},
{
"with custom headers",
amqp.Delivery{
Body: []byte(`foooo`),
Headers: amqp.Table{
"Authorization": "Basic YWxhZGRpbjpvcGVuc2VzYW1l",
"X-Forwarded-For": "203.0.113.195, 70.41.3.18, 150.172.238.178",
},
},
runner.Headers{
"Content-Encoding": "",
"Content-Type": "",
"Correlation-Id": "",
"Message-Id": "",
"Authorization": "Basic YWxhZGRpbjpvcGVuc2VzYW1l",
"X-Forwarded-For": "203.0.113.195, 70.41.3.18, 150.172.238.178",
},
},
}
for _, tt := range tests {
ctt := tt
Expand Down
2 changes: 1 addition & 1 deletion runner/cmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func Test_command_Process(t *testing.T) {
ctx, cancel = context.WithTimeout(ctx, 100*time.Millisecond)
defer cancel()
}
exitCode, err := c.Process(ctx, Message{Body: ctt.args.b, Headers: map[string]string{}})
exitCode, err := c.Process(ctx, Message{Body: ctt.args.b, Headers: Headers{}})
if len(ctt.wants.err) > 0 {
require.Contains(t, err.Error(), ctt.wants.err)
} else {
Expand Down
25 changes: 21 additions & 4 deletions runner/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net"
"net/http"
"strconv"
"time"

"github.com/leandro-lugaresi/hub"
Expand Down Expand Up @@ -72,13 +74,28 @@ func (p *httpRunner) prepareRequest(msg Message) (*http.Request, error) {
if err != nil {
return req, err
}
for k, v := range msg.Headers {
req.Header.Set(k, v)
}
p.setHeaders(req, msg)
return req, nil
}

func (p *httpRunner) setHeaders(req *http.Request, msg Message) {
for k, v := range p.headers {
req.Header.Set(k, v)
}
return req, nil
for k, v := range msg.Headers {
switch vt := v.(type) {
case int, int16, int32, int64, float32, float64:
req.Header.Set(k, fmt.Sprint(vt))
case string:
req.Header.Set(k, vt)
case []byte:
req.Header.Set(k, string(vt))
case time.Time:
req.Header.Set(k, vt.Format(http.TimeFormat))
case bool:
req.Header.Set(k, strconv.FormatBool(vt))
}
}
}

func (p *httpRunner) executeRequest(req *http.Request) (*http.Response, []byte, error) {
Expand Down
Loading

0 comments on commit b5e651c

Please sign in to comment.