Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

APPS-1241 Add concurrent scans limiter #113

Merged
merged 21 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions .github/workflows/golangci-lint.yaml
Original file line number Diff line number Diff line change
@@ -1,32 +1,34 @@
name: golangci-lint
on:
push:
tags:
- v*
branches:
- '*'
pull_request:
branches:
- main

permissions:
contents: read

jobs:
golangci:
name: lint
runs-on: ubuntu-latest
steps:
- name: Checkout sources
uses: actions/checkout@v4

- name: Get go version from go.mod
run: |
echo "GO_VERSION=$(grep '^go ' go.mod | cut -d " " -f 2)" >> $GITHUB_ENV

- name: Setup-go
uses: actions/setup-go@v5
with:
go-version: ${{ env.GO_VERSION }}
- name: "generate-mocks"
run: make mocks-generate

- name: Run golangci-lint
uses: golangci/golangci-lint-action@v4
uses: golangci/golangci-lint-action@v6
with:
version: v1.56.2
args: --timeout=5m
args: --timeout=5m
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ package main
import (
"context"
"log"
"log/slog"

"github.com/aerospike/aerospike-client-go/v7"
"github.com/aerospike/backup-go"
Expand All @@ -35,7 +34,7 @@ func main() {
panic(aerr)
}

backupClient, err := backup.NewClient(aerospikeClient, "client_id", slog.Default())
backupClient, err := backup.NewClient(aerospikeClient, backup.WithID("client_id"))
if err != nil {
panic(err)
}
Expand Down
76 changes: 59 additions & 17 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ import (
"errors"
"fmt"
"log/slog"
"math/rand"
"strconv"

a "github.com/aerospike/aerospike-client-go/v7"
"github.com/aerospike/backup-go/internal/logging"
"golang.org/x/sync/semaphore"
)

const (
Expand All @@ -35,7 +38,7 @@ const (

// AerospikeClient describes aerospike client interface for easy mocking.
//
// go:generate mockery --name AerospikeClient
//go:generate mockery --name AerospikeClient
type AerospikeClient interface {
GetDefaultScanPolicy() *a.ScanPolicy
GetDefaultInfoPolicy() *a.InfoPolicy
Expand All @@ -51,6 +54,8 @@ type AerospikeClient interface {
Cluster() *a.Cluster
ScanPartitions(scanPolicy *a.ScanPolicy, partitionFilter *a.PartitionFilter, namespace string,
setName string, binNames ...string) (*a.Recordset, a.Error)
Close()
GetNodes() []*a.Node
}

// Client is the main entry point for the backup package.
Expand All @@ -62,7 +67,7 @@ type AerospikeClient interface {
// // handle error
// }
//
// backupClient, err := backup.NewClient(asc, "id", nil) // create a backup client
// backupClient, err := backup.NewClient(asc, backup.WithID("id")) // create a backup client
// if err != nil {
// // handle error
// }
Expand Down Expand Up @@ -90,33 +95,65 @@ type AerospikeClient interface {
type Client struct {
aerospikeClient AerospikeClient
logger *slog.Logger
scanLimiter *semaphore.Weighted
id string
}

// ClientOpt is a functional option that allows configuring the [Client].
type ClientOpt func(*Client)

// WithID sets the ID for the [Client].
func WithID(id string) ClientOpt {
return func(c *Client) {
c.id = id
}
}

// WithLogger sets the logger for the [Client].
func WithLogger(logger *slog.Logger) ClientOpt {
return func(c *Client) {
c.logger = logger
}
}

// WithScanLimiter sets the scan limiter for the [Client].
func WithScanLimiter(sem *semaphore.Weighted) ClientOpt {
return func(c *Client) {
c.scanLimiter = sem
}
}

// NewClient creates a new backup client.
// - ac is the aerospike client to use for backup and restore operations.
// - id is an identifier for the client.
// - logger is the logger that this client will log to.
func NewClient(ac AerospikeClient, id string, logger *slog.Logger) (*Client, error) {
//
// Options:
// - [WithID] to set an identifier for the client.
// - [WithLogger] to set a logger that this client will log to.
// - [WithScanLimiter] to set a semaphore that is used to limit number of
// concurrent scans.
func NewClient(ac AerospikeClient, opts ...ClientOpt) (*Client, error) {
if ac == nil {
return nil, errors.New("aerospike client pointer is nil")
}

if logger == nil {
logger = slog.Default()
// Initialize the Client with default values
client := &Client{
aerospikeClient: ac,
logger: slog.Default(),
// #nosec G404
id: strconv.Itoa(rand.Intn(1000)),
}

// qualify the logger with a backup lib group
logger = logger.WithGroup("backup")
// Apply all options to the Client
for _, opt := range opts {
opt(client)
}

// add a client group to the logger
logger = logging.WithClient(logger, id)
// Further customization after applying options
client.logger = client.logger.WithGroup("backup")
client.logger = logging.WithClient(client.logger, client.id)

return &Client{
aerospikeClient: ac,
id: id,
logger: logger,
}, nil
return client, nil
}

func (c *Client) getUsableInfoPolicy(p *a.InfoPolicy) a.InfoPolicy {
Expand Down Expand Up @@ -167,7 +204,7 @@ func (c *Client) Backup(
return nil, err
}

handler := newBackupHandler(ctx, config, c.aerospikeClient, c.logger, writer)
handler := newBackupHandler(ctx, config, c.aerospikeClient, c.logger, writer, c.scanLimiter)
handler.run(ctx)

return handler, nil
Expand Down Expand Up @@ -203,3 +240,8 @@ func (c *Client) Restore(

return handler, nil
}

// AerospikeClient returns the underlying aerospike client.
func (c *Client) AerospikeClient() AerospikeClient {
return c.aerospikeClient
}
31 changes: 31 additions & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@
package backup

import (
"log/slog"
"strings"
"testing"

"github.com/aerospike/backup-go/mocks"
"github.com/stretchr/testify/assert"
"golang.org/x/sync/semaphore"
)

func TestPartitionRange_validate(t *testing.T) {
Expand Down Expand Up @@ -81,3 +87,28 @@ func TestPartitionRange_validate(t *testing.T) {
})
}
}

func TestNilClient(t *testing.T) {
_, err := NewClient(nil)
assert.Error(t, err, "aerospike client is required")
}

func TestClientOptions(t *testing.T) {
var logBuffer strings.Builder
logger := slog.New(slog.NewTextHandler(&logBuffer, nil))
sem := semaphore.NewWeighted(10)
id := "ID"

client, err := NewClient(&mocks.MockAerospikeClient{},
WithID(id),
WithLogger(logger),
WithScanLimiter(sem),
)

assert.NoError(t, err)
assert.Equal(t, id, client.id)
assert.Equal(t, sem, client.scanLimiter)

client.logger.Info("test")
assert.Contains(t, logBuffer.String(), "level=INFO msg=test backup.client.id=ID")
}
3 changes: 1 addition & 2 deletions examples/aws/s3/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"fmt"
"log"
"log/slog"

"github.com/aerospike/aerospike-client-go/v7"
"github.com/aerospike/backup-go"
Expand Down Expand Up @@ -58,7 +57,7 @@ func initBackupClient() *backup.Client {
panic(aerr)
}

backupClient, err := backup.NewClient(aerospikeClient, "client_id", slog.Default())
backupClient, err := backup.NewClient(aerospikeClient, backup.WithID("client_id"))
if err != nil {
panic(err)
}
Expand Down
3 changes: 1 addition & 2 deletions examples/readme/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package main
import (
"context"
"log"
"log/slog"

"github.com/aerospike/aerospike-client-go/v7"
"github.com/aerospike/backup-go"
Expand All @@ -29,7 +28,7 @@ func main() {
panic(aerr)
}

backupClient, err := backup.NewClient(aerospikeClient, "client_id", slog.Default())
backupClient, err := backup.NewClient(aerospikeClient, backup.WithID("client_id"))
if err != nil {
panic(err)
}
Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,17 @@ require (
github.com/aerospike/tools-common-go v0.0.0-20240701164814-36eec593d9c6
github.com/aws/aws-sdk-go-v2 v1.30.3
github.com/aws/aws-sdk-go-v2/config v1.27.27
github.com/aws/aws-sdk-go-v2/service/s3 v1.58.2
github.com/aws/aws-sdk-go-v2/service/s3 v1.58.3
github.com/aws/smithy-go v1.20.3
github.com/docker/docker v27.1.0+incompatible
github.com/docker/docker v27.1.1+incompatible
github.com/docker/go-connections v0.5.0
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
github.com/klauspost/compress v1.17.9
github.com/minio/minio-go/v7 v7.0.74
github.com/stretchr/testify v1.9.0
golang.org/x/time v0.5.0
golang.org/x/sync v0.8.0
golang.org/x/time v0.6.0
)

require (
Expand Down Expand Up @@ -70,7 +71,6 @@ require (
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
golang.org/x/crypto v0.24.0 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.21.0 // indirect
golang.org/x/text v0.16.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240711142825-46eb208f015d // indirect
Expand Down
16 changes: 8 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.17 h1:HGErhhrx
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.17/go.mod h1:RkZEx4l0EHYDJpWppMJ3nD9wZJAa8/0lq9aVC+r2UII=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.15 h1:246A4lSTXWJw/rmlQI+TT2OcqeDMKBdyjEQrafMaQdA=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.15/go.mod h1:haVfg3761/WF7YPuJOER2MP0k4UAXyHaLclKXB6usDg=
github.com/aws/aws-sdk-go-v2/service/s3 v1.58.2 h1:sZXIzO38GZOU+O0C+INqbH7C2yALwfMWpd64tONS/NE=
github.com/aws/aws-sdk-go-v2/service/s3 v1.58.2/go.mod h1:Lcxzg5rojyVPU/0eFwLtcyTaek/6Mtic5B1gJo7e/zE=
github.com/aws/aws-sdk-go-v2/service/s3 v1.58.3 h1:hT8ZAZRIfqBqHbzKTII+CIiY8G2oC9OpLedkZ51DWl8=
github.com/aws/aws-sdk-go-v2/service/s3 v1.58.3/go.mod h1:Lcxzg5rojyVPU/0eFwLtcyTaek/6Mtic5B1gJo7e/zE=
github.com/aws/aws-sdk-go-v2/service/sso v1.22.4 h1:BXx0ZIxvrJdSgSvKTZ+yRBeSqqgPM89VPlulEcl37tM=
github.com/aws/aws-sdk-go-v2/service/sso v1.22.4/go.mod h1:ooyCOXjvJEsUw7x+ZDHeISPMhtwI3ZCB7ggFMcFfWLU=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.4 h1:yiwVzJW2ZxZTurVbYWA7QOrAaCYQR72t0wrSBfoesUE=
Expand All @@ -53,8 +53,8 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk=
github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E=
github.com/docker/docker v27.1.0+incompatible h1:rEHVQc4GZ0MIQKifQPHSFGV/dVgaZafgRf8fCPtDYBs=
github.com/docker/docker v27.1.0+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/docker/docker v27.1.1+incompatible h1:hO/M4MtV36kzKldqnA37IWhebRA+LnqqcqDja6kVaKY=
github.com/docker/docker v27.1.1+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c=
github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
Expand Down Expand Up @@ -165,8 +165,8 @@ golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand All @@ -178,8 +178,8 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U=
golang.org/x/time v0.6.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
Expand Down
8 changes: 6 additions & 2 deletions handler_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/aerospike/backup-go/models"
"github.com/aerospike/backup-go/pipeline"
"github.com/google/uuid"
"golang.org/x/sync/semaphore"
"golang.org/x/time/rate"
)

Expand Down Expand Up @@ -59,6 +60,7 @@ type BackupHandler struct {
limiter *rate.Limiter
errors chan error
infoClient *asinfo.InfoClient
scanLimiter *semaphore.Weighted
id string
stats models.BackupStats
}
Expand All @@ -70,6 +72,7 @@ func newBackupHandler(
ac AerospikeClient,
logger *slog.Logger,
writer Writer,
scanLimiter *semaphore.Weighted,
) *BackupHandler {
id := uuid.NewString()
logger = logging.WithHandler(logger, id, logging.HandlerTypeBackup, writer.GetType())
Expand All @@ -86,6 +89,7 @@ func newBackupHandler(
encoder: NewEncoder(config.EncoderType, config.Namespace),
limiter: limiter,
infoClient: asinfo.NewInfoClientFromAerospike(ac, config.InfoPolicy),
scanLimiter: scanLimiter,
}
}

Expand Down Expand Up @@ -125,9 +129,9 @@ func (bh *BackupHandler) backupSync(ctx context.Context) error {
}

writeWorkers := bh.makeWriteWorkers(backupWriters)
handler := newBackupRecordsHandler(bh.config, bh.aerospikeClient, bh.logger)
handler := newBackupRecordsHandler(bh.config, bh.aerospikeClient, bh.logger, bh.scanLimiter)

bh.stats.TotalRecords, err = handler.countRecords(bh.infoClient)
bh.stats.TotalRecords, err = handler.countRecords(ctx, bh.infoClient)
if err != nil {
return err
}
Expand Down
Loading
Loading