Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

APPS-1241 Add concurrent scans limiter #113

Merged
merged 21 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions .github/workflows/golangci-lint.yaml
Original file line number Diff line number Diff line change
@@ -1,32 +1,34 @@
name: golangci-lint
on:
push:
tags:
- v*
branches:
- '*'
pull_request:
branches:
- main

permissions:
contents: read

jobs:
golangci:
name: lint
runs-on: ubuntu-latest
steps:
- name: Checkout sources
uses: actions/checkout@v4

- name: Get go version from go.mod
run: |
echo "GO_VERSION=$(grep '^go ' go.mod | cut -d " " -f 2)" >> $GITHUB_ENV

- name: Setup-go
uses: actions/setup-go@v5
with:
go-version: ${{ env.GO_VERSION }}
- name: "generate-mocks"
run: make mocks-generate

- name: Run golangci-lint
uses: golangci/golangci-lint-action@v4
uses: golangci/golangci-lint-action@v6
with:
version: v1.56.2
args: --timeout=5m
args: --timeout=5m
56 changes: 42 additions & 14 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

a "github.com/aerospike/aerospike-client-go/v7"
"github.com/aerospike/backup-go/internal/logging"
"golang.org/x/sync/semaphore"
)

const (
Expand All @@ -35,7 +36,7 @@ const (

// AerospikeClient describes aerospike client interface for easy mocking.
//
// go:generate mockery --name AerospikeClient
//go:generate mockery --name AerospikeClient
type AerospikeClient interface {
GetDefaultScanPolicy() *a.ScanPolicy
GetDefaultInfoPolicy() *a.InfoPolicy
Expand Down Expand Up @@ -90,33 +91,60 @@ type AerospikeClient interface {
type Client struct {
aerospikeClient AerospikeClient
logger *slog.Logger
scanLimiter *semaphore.Weighted
id string
}

type Option func(*Client)
korotkov-aerospike marked this conversation as resolved.
Show resolved Hide resolved
reugn marked this conversation as resolved.
Show resolved Hide resolved

// WithID sets the ID for the Client.
korotkov-aerospike marked this conversation as resolved.
Show resolved Hide resolved
func WithID(id string) Option {
return func(c *Client) {
c.id = id
}
}

// WithLogger sets the logger for the Client.
korotkov-aerospike marked this conversation as resolved.
Show resolved Hide resolved
func WithLogger(logger *slog.Logger) Option {
return func(c *Client) {
c.logger = logger
}
}

// WithScanLimiter sets the scanLimiter for the Client.
korotkov-aerospike marked this conversation as resolved.
Show resolved Hide resolved
func WithScanLimiter(sem *semaphore.Weighted) Option {
return func(c *Client) {
c.scanLimiter = sem
}
}

// NewClient creates a new backup client.
// Options:
// - ac is the aerospike client to use for backup and restore operations.
// - id is an identifier for the client.
// - logger is the logger that this client will log to.
korotkov-aerospike marked this conversation as resolved.
Show resolved Hide resolved
func NewClient(ac AerospikeClient, id string, logger *slog.Logger) (*Client, error) {
// - scan limiter semaphore that is used to limit number of concurrent scans.
func NewClient(ac AerospikeClient, opts ...Option) (*Client, error) {
if ac == nil {
return nil, errors.New("aerospike client pointer is nil")
}

if logger == nil {
logger = slog.Default()
// Initialize the Client with default values
client := &Client{
aerospikeClient: ac,
logger: slog.Default(), // Default logger
}

// qualify the logger with a backup lib group
logger = logger.WithGroup("backup")
// Apply all options to the Client
for _, opt := range opts {
opt(client)
}

// add a client group to the logger
logger = logging.WithClient(logger, id)
// Further customization after applying options
client.logger = client.logger.WithGroup("backup")
client.logger = logging.WithClient(client.logger, client.id)

return &Client{
aerospikeClient: ac,
id: id,
logger: logger,
}, nil
return client, nil
}

func (c *Client) getUsableInfoPolicy(p *a.InfoPolicy) a.InfoPolicy {
Expand Down Expand Up @@ -167,7 +195,7 @@ func (c *Client) Backup(
return nil, err
}

handler := newBackupHandler(ctx, config, c.aerospikeClient, c.logger, writer)
handler := newBackupHandler(ctx, config, c.aerospikeClient, c.logger, writer, c.scanLimiter)
handler.run(ctx)

return handler, nil
Expand Down
6 changes: 2 additions & 4 deletions examples/aws/s3/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@ package main
import (
"context"
"fmt"
"log"
"log/slog"

"github.com/aerospike/aerospike-client-go/v7"
"github.com/aerospike/backup-go"
"github.com/aerospike/backup-go/io/aws/s3"
"log"
reugn marked this conversation as resolved.
Show resolved Hide resolved
)

const (
Expand Down Expand Up @@ -58,7 +56,7 @@ func initBackupClient() *backup.Client {
panic(aerr)
}

backupClient, err := backup.NewClient(aerospikeClient, "client_id", slog.Default())
backupClient, err := backup.NewClient(aerospikeClient, backup.WithID("client_id"))
if err != nil {
panic(err)
}
Expand Down
6 changes: 2 additions & 4 deletions examples/readme/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@ package main

import (
"context"
"log"
"log/slog"

"github.com/aerospike/aerospike-client-go/v7"
"github.com/aerospike/backup-go"
"log"
reugn marked this conversation as resolved.
Show resolved Hide resolved
)

func main() {
Expand All @@ -29,7 +27,7 @@ func main() {
panic(aerr)
}

backupClient, err := backup.NewClient(aerospikeClient, "client_id", slog.Default())
backupClient, err := backup.NewClient(aerospikeClient, backup.WithID("client_id"))
if err != nil {
panic(err)
}
Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,17 @@ require (
github.com/aerospike/tools-common-go v0.0.0-20240701164814-36eec593d9c6
github.com/aws/aws-sdk-go-v2 v1.30.3
github.com/aws/aws-sdk-go-v2/config v1.27.27
github.com/aws/aws-sdk-go-v2/service/s3 v1.58.2
github.com/aws/aws-sdk-go-v2/service/s3 v1.58.3
github.com/aws/smithy-go v1.20.3
github.com/docker/docker v27.1.0+incompatible
github.com/docker/docker v27.1.1+incompatible
github.com/docker/go-connections v0.5.0
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
github.com/klauspost/compress v1.17.9
github.com/minio/minio-go/v7 v7.0.74
github.com/stretchr/testify v1.9.0
golang.org/x/time v0.5.0
golang.org/x/sync v0.8.0
golang.org/x/time v0.6.0
)

require (
Expand Down Expand Up @@ -70,7 +71,6 @@ require (
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
golang.org/x/crypto v0.24.0 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.21.0 // indirect
golang.org/x/text v0.16.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240711142825-46eb208f015d // indirect
Expand Down
16 changes: 8 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.17 h1:HGErhhrx
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.17/go.mod h1:RkZEx4l0EHYDJpWppMJ3nD9wZJAa8/0lq9aVC+r2UII=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.15 h1:246A4lSTXWJw/rmlQI+TT2OcqeDMKBdyjEQrafMaQdA=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.15/go.mod h1:haVfg3761/WF7YPuJOER2MP0k4UAXyHaLclKXB6usDg=
github.com/aws/aws-sdk-go-v2/service/s3 v1.58.2 h1:sZXIzO38GZOU+O0C+INqbH7C2yALwfMWpd64tONS/NE=
github.com/aws/aws-sdk-go-v2/service/s3 v1.58.2/go.mod h1:Lcxzg5rojyVPU/0eFwLtcyTaek/6Mtic5B1gJo7e/zE=
github.com/aws/aws-sdk-go-v2/service/s3 v1.58.3 h1:hT8ZAZRIfqBqHbzKTII+CIiY8G2oC9OpLedkZ51DWl8=
github.com/aws/aws-sdk-go-v2/service/s3 v1.58.3/go.mod h1:Lcxzg5rojyVPU/0eFwLtcyTaek/6Mtic5B1gJo7e/zE=
github.com/aws/aws-sdk-go-v2/service/sso v1.22.4 h1:BXx0ZIxvrJdSgSvKTZ+yRBeSqqgPM89VPlulEcl37tM=
github.com/aws/aws-sdk-go-v2/service/sso v1.22.4/go.mod h1:ooyCOXjvJEsUw7x+ZDHeISPMhtwI3ZCB7ggFMcFfWLU=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.4 h1:yiwVzJW2ZxZTurVbYWA7QOrAaCYQR72t0wrSBfoesUE=
Expand All @@ -53,8 +53,8 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk=
github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E=
github.com/docker/docker v27.1.0+incompatible h1:rEHVQc4GZ0MIQKifQPHSFGV/dVgaZafgRf8fCPtDYBs=
github.com/docker/docker v27.1.0+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/docker/docker v27.1.1+incompatible h1:hO/M4MtV36kzKldqnA37IWhebRA+LnqqcqDja6kVaKY=
github.com/docker/docker v27.1.1+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c=
github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
Expand Down Expand Up @@ -165,8 +165,8 @@ golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand All @@ -178,8 +178,8 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U=
golang.org/x/time v0.6.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
Expand Down
8 changes: 6 additions & 2 deletions handler_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/aerospike/backup-go/models"
"github.com/aerospike/backup-go/pipeline"
"github.com/google/uuid"
"golang.org/x/sync/semaphore"
"golang.org/x/time/rate"
)

Expand Down Expand Up @@ -59,6 +60,7 @@ type BackupHandler struct {
limiter *rate.Limiter
errors chan error
infoClient *asinfo.InfoClient
scanLimiter *semaphore.Weighted
id string
stats models.BackupStats
}
Expand All @@ -70,6 +72,7 @@ func newBackupHandler(
ac AerospikeClient,
logger *slog.Logger,
writer Writer,
scanLimiter *semaphore.Weighted,
) *BackupHandler {
id := uuid.NewString()
logger = logging.WithHandler(logger, id, logging.HandlerTypeBackup, writer.GetType())
Expand All @@ -86,6 +89,7 @@ func newBackupHandler(
encoder: NewEncoder(config.EncoderType, config.Namespace),
limiter: limiter,
infoClient: asinfo.NewInfoClientFromAerospike(ac, config.InfoPolicy),
scanLimiter: scanLimiter,
}
}

Expand Down Expand Up @@ -125,9 +129,9 @@ func (bh *BackupHandler) backupSync(ctx context.Context) error {
}

writeWorkers := bh.makeWriteWorkers(backupWriters)
handler := newBackupRecordsHandler(bh.config, bh.aerospikeClient, bh.logger)
handler := newBackupRecordsHandler(bh.config, bh.aerospikeClient, bh.logger, bh.scanLimiter)

bh.stats.TotalRecords, err = handler.countRecords(bh.infoClient)
bh.stats.TotalRecords, err = handler.countRecords(ctx, bh.infoClient)
if err != nil {
return err
}
Expand Down
24 changes: 14 additions & 10 deletions handler_backup_records.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,29 @@ import (
"github.com/aerospike/backup-go/io/aerospike"
"github.com/aerospike/backup-go/models"
"github.com/aerospike/backup-go/pipeline"
"golang.org/x/sync/semaphore"
)

type backupRecordsHandler struct {
config *BackupConfig
aerospikeClient AerospikeClient
logger *slog.Logger
scanLimiter *semaphore.Weighted
}

func newBackupRecordsHandler(
config *BackupConfig,
ac AerospikeClient,
logger *slog.Logger,
scanLimiter *semaphore.Weighted,
) *backupRecordsHandler {
logger.Debug("created new backup records handler")

return &backupRecordsHandler{
config: config,
aerospikeClient: ac,
logger: logger,
scanLimiter: scanLimiter,
}
}

Expand All @@ -54,7 +58,7 @@ func (bh *backupRecordsHandler) run(
writers []pipeline.Worker[*models.Token],
recordsReadTotal *atomic.Uint64,
) error {
readWorkers, err := bh.makeAerospikeReadWorkers(bh.config.Parallel)
readWorkers, err := bh.makeAerospikeReadWorkers(ctx, bh.config.Parallel)
if err != nil {
return err
}
Expand Down Expand Up @@ -82,25 +86,23 @@ func (bh *backupRecordsHandler) run(
return job.Run(ctx)
}

func (bh *backupRecordsHandler) countRecords(infoClient *asinfo.InfoClient) (uint64, error) {
func (bh *backupRecordsHandler) countRecords(ctx context.Context, infoClient *asinfo.InfoClient) (uint64, error) {
if bh.config.isFullBackup() {
return infoClient.GetRecordCount(bh.config.Namespace, bh.config.SetList)
}

return bh.countRecordsUsingScan()
return bh.countRecordsUsingScan(ctx)
}

func (bh *backupRecordsHandler) countRecordsUsingScan() (uint64, error) {
func (bh *backupRecordsHandler) countRecordsUsingScan(ctx context.Context) (uint64, error) {
scanPolicy := *bh.config.ScanPolicy

scanPolicy.IncludeBinData = false
scanPolicy.MaxRecords = 0

recordReader := aerospike.NewRecordReader(
bh.aerospikeClient,
bh.recordReaderConfigForPartition(PartitionRangeAll(), &scanPolicy),
bh.logger,
)
readerConfig := bh.recordReaderConfigForPartition(PartitionRangeAll(), &scanPolicy)
recordReader := aerospike.NewRecordReader(ctx, bh.aerospikeClient, readerConfig, bh.logger)

defer recordReader.Close()

var count uint64
Expand All @@ -121,7 +123,7 @@ func (bh *backupRecordsHandler) countRecordsUsingScan() (uint64, error) {
}

func (bh *backupRecordsHandler) makeAerospikeReadWorkers(
n int,
ctx context.Context, n int,
) ([]pipeline.Worker[*models.Token], error) {
partitionRanges, err := splitPartitions(
bh.config.Partitions.Begin,
Expand All @@ -141,6 +143,7 @@ func (bh *backupRecordsHandler) makeAerospikeReadWorkers(

for i := 0; i < n; i++ {
recordReader := aerospike.NewRecordReader(
ctx,
bh.aerospikeClient,
bh.recordReaderConfigForPartition(partitionRanges[i], &scanPolicy),
bh.logger,
Expand All @@ -166,5 +169,6 @@ func (bh *backupRecordsHandler) recordReaderConfigForPartition(
FromTime: bh.config.ModAfter,
ToTime: bh.config.ModBefore,
},
bh.scanLimiter,
)
}
Loading
Loading