diff --git a/pkg/sync/logs.go b/pkg/sync/logs.go index 77d5cc4f..f1715efd 100644 --- a/pkg/sync/logs.go +++ b/pkg/sync/logs.go @@ -161,7 +161,7 @@ func (ls *LogSync) MaintainList(ctx context.Context, addChannel <-chan contracts }) g.Go(func() error { - return ls.db.DeleteStreamed(ctx, &schema.Log{}, deletes, database.ByColumn("reference_id")) + return database.NewDelete(ls.db).ByColumn("reference_id").Stream(ctx, &schema.Log{}, deletes) }) return g.Wait() @@ -244,7 +244,7 @@ func (ls *LogSync) Run(ctx context.Context) error { }) g.Go(func() error { - return ls.db.UpsertStreamed(ctx, upserts, database.WithStatement(upsertStmt, 5)) + return database.NewUpsert(ls.db).WithStatement(upsertStmt, 5).Stream(ctx, upserts) }) return g.Wait() diff --git a/pkg/sync/sync.go b/pkg/sync/sync.go index acecbf30..5c587010 100644 --- a/pkg/sync/sync.go +++ b/pkg/sync/sync.go @@ -171,7 +171,7 @@ func (s *sync) Run(ctx context.Context, execOptions ...SyncOption) error { } g.Go(func() error { - return s.db.UpsertStreamed(ctx, upsertToStream) + return database.NewUpsert(s.db).Stream(ctx, upsertToStream) }) // init delete channel spreader @@ -235,7 +235,7 @@ func (s *sync) Run(ctx context.Context, execOptions ...SyncOption) error { }) g.Go(func() error { - return s.db.DeleteStreamed(ctx, s.factory(), deleteToStream) + return database.NewDelete(s.db).Stream(ctx, s.factory(), deleteToStream) }) g.Go(func() error {