Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upsert delete streamed optional functions #16

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 135 additions & 0 deletions database/optionally.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package database

import (
"context"
"fmt"
"github.com/icinga/icinga-go-library/com"
"github.com/pkg/errors"
)

// Upsert inserts new rows into a table or updates rows of a table if the primary key already exists.
type Upsert interface {
// Stream bulk upserts the specified entities via NamedBulkExec.
// If not explicitly specified, the upsert statement is created using
// BuildUpsertStmt with the first entity from the entities stream.
Stream(ctx context.Context, entities <-chan Entity) error
}

// UpsertOption is a functional option for NewUpsert.
type UpsertOption func(u *upsert)

// WithOnUpsert adds callback(s) to bulk upserts. Entities for which the
// operation was performed successfully are passed to the callbacks.
func WithOnUpsert(onUpsert ...OnSuccess[Entity]) UpsertOption {
return func(u *upsert) {
u.onUpsert = onUpsert
}
}

// WithStatement uses the specified statement for bulk upserts instead of automatically creating one.
func WithStatement(stmt string, placeholders int) UpsertOption {
return func(u *upsert) {
u.stmt = stmt
u.placeholders = placeholders
}
}

// NewUpsert creates a new Upsert initalized with a database.
func NewUpsert(db *DB, options ...UpsertOption) Upsert {
u := &upsert{db: db}

for _, option := range options {
option(u)
}

return u
}

type upsert struct {
db *DB
onUpsert []OnSuccess[Entity]
stmt string
placeholders int
}

func (u *upsert) Stream(ctx context.Context, entities <-chan Entity) error {
first, forward, err := com.CopyFirst(ctx, entities)
if err != nil {
return errors.Wrap(err, "can't copy first entity")
}

sem := u.db.GetSemaphoreForTable(TableName(first))
var stmt string
var placeholders int

if u.stmt != "" {
stmt = u.stmt
placeholders = u.placeholders
} else {
stmt, placeholders = u.db.BuildUpsertStmt(first)
}

return u.db.NamedBulkExec(
ctx, stmt, u.db.BatchSizeByPlaceholders(placeholders), sem,
forward, SplitOnDupId[Entity], u.onUpsert...,
)
}

// Delete deletes rows of a table.
type Delete interface {
// Stream bulk deletes rows from the table specified in from using the given args stream via BulkExec.
// Unless explicitly specified, the DELETE statement is created using BuildDeleteStmt.
Stream(ctx context.Context, from any, args <-chan any) error
}

// DeleteOption is a functional option for NewDelete.
type DeleteOption func(options *delete)

// WithOnDelete adds callback(s) to bulk deletes. Arguments for which the
// operation was performed successfully are passed to the callbacks.
func WithOnDelete(onDelete ...OnSuccess[any]) DeleteOption {
return func(d *delete) {
d.onDelete = onDelete
}
}

// ByColumn uses the given column for the WHERE clause that the rows must
// satisfy in order to be deleted, instead of automatically using ID.
func ByColumn(column string) DeleteOption {
return func(d *delete) {
d.column = column
}
}

// NewDelete creates a new Delete initalized with a database.
func NewDelete(db *DB, options ...DeleteOption) Delete {
d := &delete{db: db}

for _, option := range options {
option(d)
}

return d
}

type delete struct {
db *DB
column string
onDelete []OnSuccess[any]
}

func (d *delete) Stream(ctx context.Context, from any, args <-chan any) error {
var stmt string

if d.column != "" {
stmt = fmt.Sprintf(`DELETE FROM "%s" WHERE %s IN (?)`, TableName(from), d.column)
} else {
stmt = d.db.BuildDeleteStmt(from)
}

sem := d.db.GetSemaphoreForTable(TableName(from))

return d.db.BulkExec(
ctx, stmt, d.db.Options.MaxPlaceholdersPerStatement, sem, args, d.onDelete...,
)
}
Loading