Skip to content

Commit

Permalink
Aerospike: Make write and read max retries configurable (#116)
Browse files Browse the repository at this point in the history
  • Loading branch information
guscarreon authored Nov 1, 2022
1 parent 0f2ccc2 commit 4d4af0d
Show file tree
Hide file tree
Showing 3 changed files with 266 additions and 49 deletions.
14 changes: 14 additions & 0 deletions backends/aerospike.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,20 @@ func NewAerospikeBackend(cfg config.Aerospike, metrics *metrics.Metrics) *Aerosp
}
log.Infof("Connected to Aerospike host(s) %v on port %d", append(cfg.Hosts, cfg.Host), cfg.Port)

// client.DefaultPolicy.MaxRetries determines the maximum number of retries before aborting a transaction.
// Default for read: 2 (initial attempt + 2 retries = 3 attempts)
if cfg.MaxReadRetries > 2 {
client.DefaultPolicy.MaxRetries = cfg.MaxReadRetries
}

// client.DefaultWritePolicy.MaxRetries determines the maximum number of retries for write before aborting
// a transaction. Prebid Cache uses the Aerospike backend to do CREATE_ONLY writes, which are idempotent so
// it's safe to increase the maximum value of write retries.
// Default for write: 0 (no retries)
if cfg.MaxWriteRetries > 0 {
client.DefaultWritePolicy.MaxRetries = cfg.MaxWriteRetries
}

return &AerospikeBackend{
namespace: cfg.Namespace,
client: &AerospikeDBClient{client},
Expand Down
30 changes: 23 additions & 7 deletions config/backends.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,15 @@ const (
)

type Aerospike struct {
DefaultTTL int `mapstructure:"default_ttl_seconds"`
Host string `mapstructure:"host"`
Hosts []string `mapstructure:"hosts"`
Port int `mapstructure:"port"`
Namespace string `mapstructure:"namespace"`
User string `mapstructure:"user"`
Password string `mapstructure:"password"`
DefaultTTL int `mapstructure:"default_ttl_seconds"`
Host string `mapstructure:"host"`
Hosts []string `mapstructure:"hosts"`
Port int `mapstructure:"port"`
Namespace string `mapstructure:"namespace"`
User string `mapstructure:"user"`
Password string `mapstructure:"password"`
MaxReadRetries int `mapstructure:"max_read_retries"`
MaxWriteRetries int `mapstructure:"max_write_retries"`
}

func (cfg *Aerospike) validateAndLog() error {
Expand All @@ -71,6 +73,20 @@ func (cfg *Aerospike) validateAndLog() error {
log.Infof("config.backend.aerospike.namespace: %s", cfg.Namespace)
log.Infof("config.backend.aerospike.user: %s", cfg.User)

if cfg.MaxReadRetries < 2 {
log.Infof("config.backend.aerospike.max_read_retries value will default to 2")
cfg.MaxReadRetries = 2
} else if cfg.MaxReadRetries > 2 {
log.Infof("config.backend.aerospike.max_read_retries: %d.", cfg.MaxReadRetries)
}

if cfg.MaxWriteRetries < 0 {
log.Infof("config.backend.aerospike.max_write_retries value cannot be negative and will default to 0")
cfg.MaxWriteRetries = 0
} else if cfg.MaxWriteRetries > 0 {
log.Infof("config.backend.aerospike.max_write_retries: %d.", cfg.MaxWriteRetries)
}

return nil
}

Expand Down
271 changes: 229 additions & 42 deletions config/backends_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,66 +4,253 @@ import (
"fmt"
"testing"

"github.com/sirupsen/logrus"
testLogrus "github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
)

func TestAerospikeValidateAndLog(t *testing.T) {
testCases := []struct {

type logComponents struct {
msg string
lvl logrus.Level
}

type testCase struct {
desc string
inCfg Aerospike
hasError bool
expectedError error
logEntries []logComponents
}
testGroups := []struct {
desc string
testCases []testCase
}{
{
desc: "aerospike.hosts passed in",
inCfg: Aerospike{
Hosts: []string{"foo.com", "bat.com"},
Port: 8888,
},
hasError: false,
},
{
desc: "aerospike.host passed in",
inCfg: Aerospike{
Host: "foo.com",
Port: 8888,
},
hasError: false,
},
{
desc: "aerospike.host aerospike.hosts passed in",
inCfg: Aerospike{
Host: "foo.com",
Hosts: []string{"foo.com", "bat.com"},
Port: 8888,
desc: "No errors expected",
testCases: []testCase{
{
desc: "aerospike.host passed in",
inCfg: Aerospike{
Host: "foo.com",
Port: 8888,
},
hasError: false,
logEntries: []logComponents{
{msg: "config.backend.aerospike.host: foo.com", lvl: logrus.InfoLevel},
{msg: fmt.Sprintf("config.backend.aerospike.hosts: %v", []string{}), lvl: logrus.InfoLevel},
{msg: "config.backend.aerospike.port: 8888", lvl: logrus.InfoLevel},
{msg: "config.backend.aerospike.namespace: ", lvl: logrus.InfoLevel},
{msg: "config.backend.aerospike.user: ", lvl: logrus.InfoLevel},
{msg: "config.backend.aerospike.max_read_retries value will default to 2", lvl: logrus.InfoLevel},
},
},
{
desc: "aerospike.host passed in",
inCfg: Aerospike{
Host: "foo.com",
Port: 8888,
Namespace: "prebid",
User: "prebid-user",
},
hasError: false,
logEntries: []logComponents{
{msg: "config.backend.aerospike.host: foo.com", lvl: logrus.InfoLevel},
{msg: fmt.Sprintf("config.backend.aerospike.hosts: %v", []string{}), lvl: logrus.InfoLevel},
{msg: "config.backend.aerospike.port: 8888", lvl: logrus.InfoLevel},
{msg: "config.backend.aerospike.namespace: prebid", lvl: logrus.InfoLevel},
{msg: "config.backend.aerospike.user: prebid-user", lvl: logrus.InfoLevel},
{msg: "config.backend.aerospike.max_read_retries value will default to 2", lvl: logrus.InfoLevel},
},
},
{
desc: "aerospike.hosts passed in",
inCfg: Aerospike{
Hosts: []string{"foo.com", "bat.com"},
Port: 8888,
},
hasError: false,
logEntries: []logComponents{
{msg: "config.backend.aerospike.host: ", lvl: logrus.InfoLevel},
{msg: fmt.Sprintf("config.backend.aerospike.hosts: %v", []string{"foo.com", "bat.com"}), lvl: logrus.InfoLevel},
{msg: "config.backend.aerospike.port: 8888", lvl: logrus.InfoLevel},
{msg: "config.backend.aerospike.namespace: ", lvl: logrus.InfoLevel},
{msg: "config.backend.aerospike.user: ", lvl: logrus.InfoLevel},
{msg: "config.backend.aerospike.max_read_retries value will default to 2", lvl: logrus.InfoLevel},
},
},
{
desc: "both aerospike.host aerospike.hosts passed in",
inCfg: Aerospike{
Host: "foo.com",
Hosts: []string{"foo.com", "bat.com"},
Port: 8888,
},
hasError: false,
logEntries: []logComponents{
{msg: "config.backend.aerospike.host: foo.com", lvl: logrus.InfoLevel},
{msg: fmt.Sprintf("config.backend.aerospike.hosts: %v", []string{"foo.com", "bat.com"}), lvl: logrus.InfoLevel},
{msg: "config.backend.aerospike.port: 8888", lvl: logrus.InfoLevel},
{msg: "config.backend.aerospike.namespace: ", lvl: logrus.InfoLevel},
{msg: "config.backend.aerospike.user: ", lvl: logrus.InfoLevel},
{msg: "config.backend.aerospike.max_read_retries value will default to 2", lvl: logrus.InfoLevel},
},
},
{
desc: "both aerospike.host, aerospike.hosts and aerospike.default_ttl_seconds set",
inCfg: Aerospike{
Host: "foo.com",
Hosts: []string{"foo.com", "bat.com"},
Port: 8888,
DefaultTTL: 3600,
},
hasError: false,
logEntries: []logComponents{
{msg: "config.backend.aerospike.default_ttl_seconds: 3600. Note that this configuration option is being deprecated in favor of config.request_limits.max_ttl_seconds", lvl: logrus.InfoLevel},
{msg: "config.backend.aerospike.host: foo.com", lvl: logrus.InfoLevel},
{msg: fmt.Sprintf("config.backend.aerospike.hosts: %v", []string{"foo.com", "bat.com"}), lvl: logrus.InfoLevel},
{msg: "config.backend.aerospike.port: 8888", lvl: logrus.InfoLevel},
{msg: "config.backend.aerospike.namespace: ", lvl: logrus.InfoLevel},
{msg: "config.backend.aerospike.user: ", lvl: logrus.InfoLevel},
{msg: "config.backend.aerospike.max_read_retries value will default to 2", lvl: logrus.InfoLevel},
},
},
{
desc: "both aerospike.host, aerospike.port and an aerospike.max_read_retries invalid value. Default to 2 retries",
inCfg: Aerospike{
Host: "foo.com",
Port: 8888,
MaxReadRetries: 1,
},
hasError: false,
logEntries: []logComponents{
{msg: "config.backend.aerospike.host: foo.com", lvl: logrus.InfoLevel},
{msg: fmt.Sprintf("config.backend.aerospike.hosts: %v", []string{}), lvl: logrus.InfoLevel},
{msg: "config.backend.aerospike.port: 8888", lvl: logrus.InfoLevel},
{msg: "config.backend.aerospike.namespace: ", lvl: logrus.InfoLevel},
{msg: "config.backend.aerospike.user: ", lvl: logrus.InfoLevel},
{msg: "config.backend.aerospike.max_read_retries value will default to 2", lvl: logrus.InfoLevel},
},
},
{
desc: "aerospike.max_read_retries valid value.",
inCfg: Aerospike{
Host: "foo.com",
Port: 8888,
MaxReadRetries: 3,
},
hasError: false,
logEntries: []logComponents{
{msg: "config.backend.aerospike.host: foo.com", lvl: logrus.InfoLevel},
{msg: fmt.Sprintf("config.backend.aerospike.hosts: %v", []string{}), lvl: logrus.InfoLevel},
{msg: "config.backend.aerospike.port: 8888", lvl: logrus.InfoLevel},
{msg: "config.backend.aerospike.namespace: ", lvl: logrus.InfoLevel},
{msg: "config.backend.aerospike.user: ", lvl: logrus.InfoLevel},
{msg: "config.backend.aerospike.max_read_retries: 3.", lvl: logrus.InfoLevel},
},
},
{
desc: "aerospike.max_write_retries invalid value. Default to 2 retries",
inCfg: Aerospike{
Host: "foo.com",
Port: 8888,
MaxReadRetries: 2,
MaxWriteRetries: -1,
},
hasError: false,
logEntries: []logComponents{
{msg: "config.backend.aerospike.host: foo.com", lvl: logrus.InfoLevel},
{msg: fmt.Sprintf("config.backend.aerospike.hosts: %v", []string{}), lvl: logrus.InfoLevel},
{msg: "config.backend.aerospike.port: 8888", lvl: logrus.InfoLevel},
{msg: "config.backend.aerospike.namespace: ", lvl: logrus.InfoLevel},
{msg: "config.backend.aerospike.user: ", lvl: logrus.InfoLevel},
{msg: "config.backend.aerospike.max_write_retries value cannot be negative and will default to 0", lvl: logrus.InfoLevel},
},
},
{
desc: "aerospike.max_read_retries valid value.",
inCfg: Aerospike{
Host: "foo.com",
Port: 8888,
MaxReadRetries: 2,
MaxWriteRetries: 1,
},
hasError: false,
logEntries: []logComponents{
{msg: "config.backend.aerospike.host: foo.com", lvl: logrus.InfoLevel},
{msg: fmt.Sprintf("config.backend.aerospike.hosts: %v", []string{}), lvl: logrus.InfoLevel},
{msg: "config.backend.aerospike.port: 8888", lvl: logrus.InfoLevel},
{msg: "config.backend.aerospike.namespace: ", lvl: logrus.InfoLevel},
{msg: "config.backend.aerospike.user: ", lvl: logrus.InfoLevel},
{msg: "config.backend.aerospike.max_write_retries: 1.", lvl: logrus.InfoLevel},
},
},
},
hasError: false,
},
{
desc: "aerospike.host and aerospike.hosts missing",
inCfg: Aerospike{
Port: 8888,
desc: "Expect error",
testCases: []testCase{
{
desc: "aerospike.host and aerospike.hosts missing",
inCfg: Aerospike{
Port: 8888,
},
hasError: true,
expectedError: fmt.Errorf("Cannot connect to empty Aerospike host(s)"),
},
{
desc: "aerospike.port config missing",
inCfg: Aerospike{
Host: "foo.com",
},
hasError: true,
expectedError: fmt.Errorf("Cannot connect to Aerospike host at port 0"),
},
{
desc: "aerospike.port config missing",
inCfg: Aerospike{
Host: "foo.com",
Hosts: []string{"bar.com"},
},
hasError: true,
expectedError: fmt.Errorf("Cannot connect to Aerospike host at port 0"),
},
},
hasError: true,
expectedError: fmt.Errorf("Cannot connect to empty Aerospike host(s)"),
},
{
desc: "aerospike.port config missing",
inCfg: Aerospike{
Host: "foo.com",
},
hasError: true,
expectedError: fmt.Errorf("Cannot connect to Aerospike host at port 0"),
},
}

for _, test := range testCases {
// logrus entries will be recorded to this `hook` object so we can compare and assert them
hook := testLogrus.NewGlobal()

//substitute logger exit function so execution doesn't get interrupted when log.Fatalf() call comes
defer func() { logrus.StandardLogger().ExitFunc = nil }()
var fatal bool
logrus.StandardLogger().ExitFunc = func(int) { fatal = true }

for _, group := range testGroups {
for _, test := range group.testCases {
fatal = false

//run test
if test.hasError {
assert.Equal(t, test.inCfg.validateAndLog(), test.expectedError, group.desc+" : "+test.desc)
} else {
assert.Nil(t, test.inCfg.validateAndLog(), group.desc+" : "+test.desc)
}

assert.False(t, fatal, group.desc+" : "+test.desc)

if assert.Len(t, hook.Entries, len(test.logEntries), "Incorrect number of entries were logged to logrus in test %s", group.desc+" : "+test.desc) {
for i := 0; i < len(test.logEntries); i++ {
assert.Equal(t, test.logEntries[i].msg, hook.Entries[i].Message, group.desc+" : "+test.desc)
assert.Equal(t, test.logEntries[i].lvl, hook.Entries[i].Level, group.desc+" : "+test.desc)
}
}

//run test
if test.hasError {
assert.Equal(t, test.inCfg.validateAndLog(), test.expectedError, test.desc)
} else {
assert.Nil(t, test.inCfg.validateAndLog(), test.desc)
//Reset log after every test and assert successful reset
hook.Reset()
assert.Nil(t, hook.LastEntry(), group.desc+" : "+test.desc)
}
}
}

0 comments on commit 4d4af0d

Please sign in to comment.