Skip to content

Commit

Permalink
Add functional options to 'DeleteStreamed' and adjust for 'UpsertStre…
Browse files Browse the repository at this point in the history
…amed'
  • Loading branch information
jrauh01 committed May 24, 2024
1 parent 2d47d95 commit 48af1a6
Showing 1 changed file with 89 additions and 6 deletions.
95 changes: 89 additions & 6 deletions database/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,25 +691,67 @@ func (db *DB) CreateIgnoreStreamed(
)
}

func WithOnSuccessUpsert(onSuccess ...OnSuccess[Entity]) ExecOption {
return func(options *ExecOptions) {
options.onSuccess = onSuccess
}
}

func WithStatement(stmt string, placeholders int) ExecOption {
return func(options *ExecOptions) {
options.stmt = stmt
options.placeholders = placeholders
}
}

type ExecOption func(options *ExecOptions)

type ExecOptions struct {
onSuccess []OnSuccess[Entity]
stmt string
placeholders int
}

func NewExecOptions(execOpts ...ExecOption) *ExecOptions {
execOptions := &ExecOptions{}

for _, option := range execOpts {
option(execOptions)
}

return execOptions
}

// UpsertStreamed bulk upserts the specified entities via NamedBulkExec.
// The upsert statement is created using BuildUpsertStmt with the first entity from the entities stream.
// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and
// concurrency is controlled via Options.MaxConnectionsPerTable.
// Entities for which the query ran successfully will be passed to onSuccess.
func (db *DB) UpsertStreamed(
ctx context.Context, entities <-chan Entity, onSuccess ...OnSuccess[Entity],
ctx context.Context, entities <-chan Entity, execOpts ...ExecOption,
) error {

execOptions := NewExecOptions(execOpts...)

first, forward, err := com.CopyFirst(ctx, entities)
if err != nil {
return errors.Wrap(err, "can't copy first entity")
}

sem := db.GetSemaphoreForTable(TableName(first))
stmt, placeholders := db.BuildUpsertStmt(first)
var stmt string
var placeholders int

if execOptions.stmt != "" {
stmt = execOptions.stmt
placeholders = execOptions.placeholders
} else {
stmt, placeholders = db.BuildUpsertStmt(first)
}

return db.NamedBulkExec(
ctx, stmt, db.BatchSizeByPlaceholders(placeholders), sem,
forward, SplitOnDupId[Entity], onSuccess...,
forward, SplitOnDupId[Entity], execOptions.onSuccess...,
)
}

Expand All @@ -728,17 +770,58 @@ func (db *DB) UpdateStreamed(ctx context.Context, entities <-chan Entity) error
return db.NamedBulkExecTx(ctx, stmt, db.Options.MaxRowsPerTransaction, sem, forward)
}

func WithOnSuccessDelete(onSuccess ...OnSuccess[any]) DeleteOption {
return func(options *DeleteOptions) {
options.onSuccess = onSuccess
}
}

func ByColumn(column string) DeleteOption {
return func(options *DeleteOptions) {
options.column = column
}
}

type DeleteOption func(options *DeleteOptions)

type DeleteOptions struct {
onSuccess []OnSuccess[any]
column string
}

func NewDeleteOptions(execOpts ...DeleteOption) *DeleteOptions {
deleteOptions := &DeleteOptions{}

for _, option := range execOpts {
option(deleteOptions)
}

return deleteOptions
}

// DeleteStreamed bulk deletes the specified ids via BulkExec.
// The delete statement is created using BuildDeleteStmt with the passed entityType.
// Bulk size is controlled via Options.MaxPlaceholdersPerStatement and
// concurrency is controlled via Options.MaxConnectionsPerTable.
// IDs for which the query ran successfully will be passed to onSuccess.
func (db *DB) DeleteStreamed(
ctx context.Context, entityType Entity, ids <-chan interface{}, onSuccess ...OnSuccess[any],
ctx context.Context, entityType Entity, ids <-chan interface{}, deleteOpts ...DeleteOption,
) error {

deleteOptions := NewDeleteOptions(deleteOpts...)

sem := db.GetSemaphoreForTable(TableName(entityType))

var stmt string

if deleteOptions.column != "" {
stmt = fmt.Sprintf("DELETE FROM %s WHERE %s IN (?)", TableName(entityType), deleteOptions.column)
} else {
stmt = db.BuildDeleteStmt(entityType)
}

return db.BulkExec(
ctx, db.BuildDeleteStmt(entityType), db.Options.MaxPlaceholdersPerStatement, sem, ids, onSuccess...,
ctx, stmt, db.Options.MaxPlaceholdersPerStatement, sem, ids, deleteOptions.onSuccess...,
)
}

Expand All @@ -754,7 +837,7 @@ func (db *DB) Delete(
}
close(idsCh)

return db.DeleteStreamed(ctx, entityType, idsCh, onSuccess...)
return db.DeleteStreamed(ctx, entityType, idsCh, WithOnSuccessDelete(onSuccess...))
}

func (db *DB) GetSemaphoreForTable(table string) *semaphore.Weighted {
Expand Down

0 comments on commit 48af1a6

Please sign in to comment.