From 4d4af0d327fdcc53ef5df8c45cc36f4afd006bf6 Mon Sep 17 00:00:00 2001 From: guscarreon Date: Tue, 1 Nov 2022 11:12:35 -0400 Subject: [PATCH] Aerospike: Make write and read max retries configurable (#116) --- backends/aerospike.go | 14 +++ config/backends.go | 30 +++-- config/backends_test.go | 271 +++++++++++++++++++++++++++++++++------- 3 files changed, 266 insertions(+), 49 deletions(-) diff --git a/backends/aerospike.go b/backends/aerospike.go index f501a572..9b37c4d4 100644 --- a/backends/aerospike.go +++ b/backends/aerospike.go @@ -76,6 +76,20 @@ func NewAerospikeBackend(cfg config.Aerospike, metrics *metrics.Metrics) *Aerosp } log.Infof("Connected to Aerospike host(s) %v on port %d", append(cfg.Hosts, cfg.Host), cfg.Port) + // client.DefaultPolicy.MaxRetries determines the maximum number of retries before aborting a transaction. + // Default for read: 2 (initial attempt + 2 retries = 3 attempts) + if cfg.MaxReadRetries > 2 { + client.DefaultPolicy.MaxRetries = cfg.MaxReadRetries + } + + // client.DefaultWritePolicy.MaxRetries determines the maximum number of retries for write before aborting + // a transaction. Prebid Cache uses the Aerospike backend to do CREATE_ONLY writes, which are idempotent so + // it's safe to increase the maximum value of write retries. + // Default for write: 0 (no retries) + if cfg.MaxWriteRetries > 0 { + client.DefaultWritePolicy.MaxRetries = cfg.MaxWriteRetries + } + return &AerospikeBackend{ namespace: cfg.Namespace, client: &AerospikeDBClient{client}, diff --git a/config/backends.go b/config/backends.go index baa98f76..b7969029 100644 --- a/config/backends.go +++ b/config/backends.go @@ -45,13 +45,15 @@ const ( ) type Aerospike struct { - DefaultTTL int `mapstructure:"default_ttl_seconds"` - Host string `mapstructure:"host"` - Hosts []string `mapstructure:"hosts"` - Port int `mapstructure:"port"` - Namespace string `mapstructure:"namespace"` - User string `mapstructure:"user"` - Password string `mapstructure:"password"` + DefaultTTL int `mapstructure:"default_ttl_seconds"` + Host string `mapstructure:"host"` + Hosts []string `mapstructure:"hosts"` + Port int `mapstructure:"port"` + Namespace string `mapstructure:"namespace"` + User string `mapstructure:"user"` + Password string `mapstructure:"password"` + MaxReadRetries int `mapstructure:"max_read_retries"` + MaxWriteRetries int `mapstructure:"max_write_retries"` } func (cfg *Aerospike) validateAndLog() error { @@ -71,6 +73,20 @@ func (cfg *Aerospike) validateAndLog() error { log.Infof("config.backend.aerospike.namespace: %s", cfg.Namespace) log.Infof("config.backend.aerospike.user: %s", cfg.User) + if cfg.MaxReadRetries < 2 { + log.Infof("config.backend.aerospike.max_read_retries value will default to 2") + cfg.MaxReadRetries = 2 + } else if cfg.MaxReadRetries > 2 { + log.Infof("config.backend.aerospike.max_read_retries: %d.", cfg.MaxReadRetries) + } + + if cfg.MaxWriteRetries < 0 { + log.Infof("config.backend.aerospike.max_write_retries value cannot be negative and will default to 0") + cfg.MaxWriteRetries = 0 + } else if cfg.MaxWriteRetries > 0 { + log.Infof("config.backend.aerospike.max_write_retries: %d.", cfg.MaxWriteRetries) + } + return nil } diff --git a/config/backends_test.go b/config/backends_test.go index 5031d9a4..57c1b3ba 100644 --- a/config/backends_test.go +++ b/config/backends_test.go @@ -4,66 +4,253 @@ import ( "fmt" "testing" + "github.com/sirupsen/logrus" + testLogrus "github.com/sirupsen/logrus/hooks/test" "github.com/stretchr/testify/assert" ) func TestAerospikeValidateAndLog(t *testing.T) { - testCases := []struct { + + type logComponents struct { + msg string + lvl logrus.Level + } + + type testCase struct { desc string inCfg Aerospike hasError bool expectedError error + logEntries []logComponents + } + testGroups := []struct { + desc string + testCases []testCase }{ { - desc: "aerospike.hosts passed in", - inCfg: Aerospike{ - Hosts: []string{"foo.com", "bat.com"}, - Port: 8888, - }, - hasError: false, - }, - { - desc: "aerospike.host passed in", - inCfg: Aerospike{ - Host: "foo.com", - Port: 8888, - }, - hasError: false, - }, - { - desc: "aerospike.host aerospike.hosts passed in", - inCfg: Aerospike{ - Host: "foo.com", - Hosts: []string{"foo.com", "bat.com"}, - Port: 8888, + desc: "No errors expected", + testCases: []testCase{ + { + desc: "aerospike.host passed in", + inCfg: Aerospike{ + Host: "foo.com", + Port: 8888, + }, + hasError: false, + logEntries: []logComponents{ + {msg: "config.backend.aerospike.host: foo.com", lvl: logrus.InfoLevel}, + {msg: fmt.Sprintf("config.backend.aerospike.hosts: %v", []string{}), lvl: logrus.InfoLevel}, + {msg: "config.backend.aerospike.port: 8888", lvl: logrus.InfoLevel}, + {msg: "config.backend.aerospike.namespace: ", lvl: logrus.InfoLevel}, + {msg: "config.backend.aerospike.user: ", lvl: logrus.InfoLevel}, + {msg: "config.backend.aerospike.max_read_retries value will default to 2", lvl: logrus.InfoLevel}, + }, + }, + { + desc: "aerospike.host passed in", + inCfg: Aerospike{ + Host: "foo.com", + Port: 8888, + Namespace: "prebid", + User: "prebid-user", + }, + hasError: false, + logEntries: []logComponents{ + {msg: "config.backend.aerospike.host: foo.com", lvl: logrus.InfoLevel}, + {msg: fmt.Sprintf("config.backend.aerospike.hosts: %v", []string{}), lvl: logrus.InfoLevel}, + {msg: "config.backend.aerospike.port: 8888", lvl: logrus.InfoLevel}, + {msg: "config.backend.aerospike.namespace: prebid", lvl: logrus.InfoLevel}, + {msg: "config.backend.aerospike.user: prebid-user", lvl: logrus.InfoLevel}, + {msg: "config.backend.aerospike.max_read_retries value will default to 2", lvl: logrus.InfoLevel}, + }, + }, + { + desc: "aerospike.hosts passed in", + inCfg: Aerospike{ + Hosts: []string{"foo.com", "bat.com"}, + Port: 8888, + }, + hasError: false, + logEntries: []logComponents{ + {msg: "config.backend.aerospike.host: ", lvl: logrus.InfoLevel}, + {msg: fmt.Sprintf("config.backend.aerospike.hosts: %v", []string{"foo.com", "bat.com"}), lvl: logrus.InfoLevel}, + {msg: "config.backend.aerospike.port: 8888", lvl: logrus.InfoLevel}, + {msg: "config.backend.aerospike.namespace: ", lvl: logrus.InfoLevel}, + {msg: "config.backend.aerospike.user: ", lvl: logrus.InfoLevel}, + {msg: "config.backend.aerospike.max_read_retries value will default to 2", lvl: logrus.InfoLevel}, + }, + }, + { + desc: "both aerospike.host aerospike.hosts passed in", + inCfg: Aerospike{ + Host: "foo.com", + Hosts: []string{"foo.com", "bat.com"}, + Port: 8888, + }, + hasError: false, + logEntries: []logComponents{ + {msg: "config.backend.aerospike.host: foo.com", lvl: logrus.InfoLevel}, + {msg: fmt.Sprintf("config.backend.aerospike.hosts: %v", []string{"foo.com", "bat.com"}), lvl: logrus.InfoLevel}, + {msg: "config.backend.aerospike.port: 8888", lvl: logrus.InfoLevel}, + {msg: "config.backend.aerospike.namespace: ", lvl: logrus.InfoLevel}, + {msg: "config.backend.aerospike.user: ", lvl: logrus.InfoLevel}, + {msg: "config.backend.aerospike.max_read_retries value will default to 2", lvl: logrus.InfoLevel}, + }, + }, + { + desc: "both aerospike.host, aerospike.hosts and aerospike.default_ttl_seconds set", + inCfg: Aerospike{ + Host: "foo.com", + Hosts: []string{"foo.com", "bat.com"}, + Port: 8888, + DefaultTTL: 3600, + }, + hasError: false, + logEntries: []logComponents{ + {msg: "config.backend.aerospike.default_ttl_seconds: 3600. Note that this configuration option is being deprecated in favor of config.request_limits.max_ttl_seconds", lvl: logrus.InfoLevel}, + {msg: "config.backend.aerospike.host: foo.com", lvl: logrus.InfoLevel}, + {msg: fmt.Sprintf("config.backend.aerospike.hosts: %v", []string{"foo.com", "bat.com"}), lvl: logrus.InfoLevel}, + {msg: "config.backend.aerospike.port: 8888", lvl: logrus.InfoLevel}, + {msg: "config.backend.aerospike.namespace: ", lvl: logrus.InfoLevel}, + {msg: "config.backend.aerospike.user: ", lvl: logrus.InfoLevel}, + {msg: "config.backend.aerospike.max_read_retries value will default to 2", lvl: logrus.InfoLevel}, + }, + }, + { + desc: "both aerospike.host, aerospike.port and an aerospike.max_read_retries invalid value. Default to 2 retries", + inCfg: Aerospike{ + Host: "foo.com", + Port: 8888, + MaxReadRetries: 1, + }, + hasError: false, + logEntries: []logComponents{ + {msg: "config.backend.aerospike.host: foo.com", lvl: logrus.InfoLevel}, + {msg: fmt.Sprintf("config.backend.aerospike.hosts: %v", []string{}), lvl: logrus.InfoLevel}, + {msg: "config.backend.aerospike.port: 8888", lvl: logrus.InfoLevel}, + {msg: "config.backend.aerospike.namespace: ", lvl: logrus.InfoLevel}, + {msg: "config.backend.aerospike.user: ", lvl: logrus.InfoLevel}, + {msg: "config.backend.aerospike.max_read_retries value will default to 2", lvl: logrus.InfoLevel}, + }, + }, + { + desc: "aerospike.max_read_retries valid value.", + inCfg: Aerospike{ + Host: "foo.com", + Port: 8888, + MaxReadRetries: 3, + }, + hasError: false, + logEntries: []logComponents{ + {msg: "config.backend.aerospike.host: foo.com", lvl: logrus.InfoLevel}, + {msg: fmt.Sprintf("config.backend.aerospike.hosts: %v", []string{}), lvl: logrus.InfoLevel}, + {msg: "config.backend.aerospike.port: 8888", lvl: logrus.InfoLevel}, + {msg: "config.backend.aerospike.namespace: ", lvl: logrus.InfoLevel}, + {msg: "config.backend.aerospike.user: ", lvl: logrus.InfoLevel}, + {msg: "config.backend.aerospike.max_read_retries: 3.", lvl: logrus.InfoLevel}, + }, + }, + { + desc: "aerospike.max_write_retries invalid value. Default to 2 retries", + inCfg: Aerospike{ + Host: "foo.com", + Port: 8888, + MaxReadRetries: 2, + MaxWriteRetries: -1, + }, + hasError: false, + logEntries: []logComponents{ + {msg: "config.backend.aerospike.host: foo.com", lvl: logrus.InfoLevel}, + {msg: fmt.Sprintf("config.backend.aerospike.hosts: %v", []string{}), lvl: logrus.InfoLevel}, + {msg: "config.backend.aerospike.port: 8888", lvl: logrus.InfoLevel}, + {msg: "config.backend.aerospike.namespace: ", lvl: logrus.InfoLevel}, + {msg: "config.backend.aerospike.user: ", lvl: logrus.InfoLevel}, + {msg: "config.backend.aerospike.max_write_retries value cannot be negative and will default to 0", lvl: logrus.InfoLevel}, + }, + }, + { + desc: "aerospike.max_read_retries valid value.", + inCfg: Aerospike{ + Host: "foo.com", + Port: 8888, + MaxReadRetries: 2, + MaxWriteRetries: 1, + }, + hasError: false, + logEntries: []logComponents{ + {msg: "config.backend.aerospike.host: foo.com", lvl: logrus.InfoLevel}, + {msg: fmt.Sprintf("config.backend.aerospike.hosts: %v", []string{}), lvl: logrus.InfoLevel}, + {msg: "config.backend.aerospike.port: 8888", lvl: logrus.InfoLevel}, + {msg: "config.backend.aerospike.namespace: ", lvl: logrus.InfoLevel}, + {msg: "config.backend.aerospike.user: ", lvl: logrus.InfoLevel}, + {msg: "config.backend.aerospike.max_write_retries: 1.", lvl: logrus.InfoLevel}, + }, + }, }, - hasError: false, }, { - desc: "aerospike.host and aerospike.hosts missing", - inCfg: Aerospike{ - Port: 8888, + desc: "Expect error", + testCases: []testCase{ + { + desc: "aerospike.host and aerospike.hosts missing", + inCfg: Aerospike{ + Port: 8888, + }, + hasError: true, + expectedError: fmt.Errorf("Cannot connect to empty Aerospike host(s)"), + }, + { + desc: "aerospike.port config missing", + inCfg: Aerospike{ + Host: "foo.com", + }, + hasError: true, + expectedError: fmt.Errorf("Cannot connect to Aerospike host at port 0"), + }, + { + desc: "aerospike.port config missing", + inCfg: Aerospike{ + Host: "foo.com", + Hosts: []string{"bar.com"}, + }, + hasError: true, + expectedError: fmt.Errorf("Cannot connect to Aerospike host at port 0"), + }, }, - hasError: true, - expectedError: fmt.Errorf("Cannot connect to empty Aerospike host(s)"), - }, - { - desc: "aerospike.port config missing", - inCfg: Aerospike{ - Host: "foo.com", - }, - hasError: true, - expectedError: fmt.Errorf("Cannot connect to Aerospike host at port 0"), }, } - for _, test := range testCases { + // logrus entries will be recorded to this `hook` object so we can compare and assert them + hook := testLogrus.NewGlobal() + + //substitute logger exit function so execution doesn't get interrupted when log.Fatalf() call comes + defer func() { logrus.StandardLogger().ExitFunc = nil }() + var fatal bool + logrus.StandardLogger().ExitFunc = func(int) { fatal = true } + + for _, group := range testGroups { + for _, test := range group.testCases { + fatal = false + + //run test + if test.hasError { + assert.Equal(t, test.inCfg.validateAndLog(), test.expectedError, group.desc+" : "+test.desc) + } else { + assert.Nil(t, test.inCfg.validateAndLog(), group.desc+" : "+test.desc) + } + + assert.False(t, fatal, group.desc+" : "+test.desc) + + if assert.Len(t, hook.Entries, len(test.logEntries), "Incorrect number of entries were logged to logrus in test %s", group.desc+" : "+test.desc) { + for i := 0; i < len(test.logEntries); i++ { + assert.Equal(t, test.logEntries[i].msg, hook.Entries[i].Message, group.desc+" : "+test.desc) + assert.Equal(t, test.logEntries[i].lvl, hook.Entries[i].Level, group.desc+" : "+test.desc) + } + } - //run test - if test.hasError { - assert.Equal(t, test.inCfg.validateAndLog(), test.expectedError, test.desc) - } else { - assert.Nil(t, test.inCfg.validateAndLog(), test.desc) + //Reset log after every test and assert successful reset + hook.Reset() + assert.Nil(t, hook.LastEntry(), group.desc+" : "+test.desc) } } }