From 396b3ad42e10070b0b17fbc3486f30d1e5ee15c1 Mon Sep 17 00:00:00 2001 From: Johannes Rauh Date: Thu, 30 Nov 2023 10:15:07 +0100 Subject: [PATCH] Fix review --- cmd/icinga-kubernetes/main.go | 9 +- pkg/sync/channel-mux.go | 68 +++++++------- pkg/sync/channel-mux_test.go | 20 ++-- pkg/sync/logs.go | 148 +++++++++++++++--------------- pkg/sync/options.go | 37 ++++++++ pkg/sync/sync.go | 168 +++++++++------------------------- 6 files changed, 206 insertions(+), 244 deletions(-) create mode 100644 pkg/sync/options.go diff --git a/cmd/icinga-kubernetes/main.go b/cmd/icinga-kubernetes/main.go index 9ac8e6bd..f56e5222 100644 --- a/cmd/icinga-kubernetes/main.go +++ b/cmd/icinga-kubernetes/main.go @@ -16,6 +16,7 @@ import ( kinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" kclientcmd "k8s.io/client-go/tools/clientcmd" + "time" ) func main() { @@ -96,10 +97,12 @@ func main() { ) }) - logSync := sync.NewContainerLogSync(k, db, logs.GetChildLogger("ContainerLogs")) - g.Go(func() error { - return logSync.Run(ctx, podUpserts, podDeletes) + return sync.NewContainerLogSync( + k, db, logs.GetChildLogger("ContainerLogs"), time.Second*15, + ).Run( + ctx, podUpserts, podDeletes, + ) }) if err := g.Wait(); err != nil { diff --git a/pkg/sync/channel-mux.go b/pkg/sync/channel-mux.go index 69635ba9..2e05f73f 100644 --- a/pkg/sync/channel-mux.go +++ b/pkg/sync/channel-mux.go @@ -7,79 +7,79 @@ import ( ) // ChannelMux is a multiplexer for channels of variable types. -// It fans all input channels to all output channels. +// It fans out all input channels to all output channels. type ChannelMux[T any] interface { + // In adds the given input channel reading. + In(<-chan T) - // AddInChannel adds given input channel to the list of input channels. - AddInChannel(<-chan T) + // Out returns a new output channel that receives from all input channels. + Out() <-chan T - // NewOutChannel returns and adds new output channel to the pods of created addedOutChannels. - NewOutChannel() <-chan T + // AddOut registers the given output channel to receive from all input channels. + AddOut(chan<- T) - // AddOutChannel adds given output channel to the list of added addedOutChannels. - AddOutChannel(chan<- T) - - // Run combines output channel lists and starts multiplexing. + // Run starts multiplexing of all input channels to all output channels. Run(context.Context) error } type channelMux[T any] struct { - inChannels []<-chan T - createdOutChannels []chan<- T - addedOutChannels []chan<- T - started atomic.Bool + in []<-chan T + out []chan<- T + outAdded []chan<- T + started atomic.Bool } -// NewChannelMux creates new ChannelMux initialized with at least one input channel -func NewChannelMux[T any](initInChannel <-chan T, inChannels ...<-chan T) ChannelMux[T] { +// NewChannelMux returns a new ChannelMux initialized with at least one input channel. +func NewChannelMux[T any](inChannel <-chan T, inChannels ...<-chan T) ChannelMux[T] { return &channelMux[T]{ - inChannels: append(make([]<-chan T, 0), append(inChannels, initInChannel)...), + in: append(inChannels, inChannel), } } -func (mux *channelMux[T]) AddInChannel(channel <-chan T) { +func (mux *channelMux[T]) In(channel <-chan T) { if mux.started.Load() { panic("channelMux already started") } - mux.inChannels = append(mux.inChannels, channel) + mux.in = append(mux.in, channel) } -func (mux *channelMux[T]) NewOutChannel() <-chan T { +func (mux *channelMux[T]) Out() <-chan T { if mux.started.Load() { panic("channelMux already started") } channel := make(chan T) - mux.createdOutChannels = append(mux.createdOutChannels, channel) + mux.out = append(mux.out, channel) return channel } -func (mux *channelMux[T]) AddOutChannel(channel chan<- T) { +func (mux *channelMux[T]) AddOut(channel chan<- T) { if mux.started.Load() { panic("channelMux already started") } - mux.addedOutChannels = append(mux.addedOutChannels, channel) + mux.outAdded = append(mux.outAdded, channel) } func (mux *channelMux[T]) Run(ctx context.Context) error { - mux.started.Store(true) + if mux.started.Swap(true) { + panic("channelMux already started") + } defer func() { - for _, channelToClose := range mux.createdOutChannels { + for _, channelToClose := range mux.out { close(channelToClose) } }() - outChannels := append(mux.addedOutChannels, mux.createdOutChannels...) + g, ctx := errgroup.WithContext(ctx) sink := make(chan T) + defer close(sink) - g, ctx := errgroup.WithContext(ctx) - - for _, ch := range mux.inChannels { + for _, ch := range mux.in { ch := ch g.Go(func() error { @@ -89,7 +89,12 @@ func (mux *channelMux[T]) Run(ctx context.Context) error { if !more { return nil } - sink <- spread + select { + case sink <- spread: + case <-ctx.Done(): + return ctx.Err() + } + case <-ctx.Done(): return ctx.Err() } @@ -97,6 +102,7 @@ func (mux *channelMux[T]) Run(ctx context.Context) error { }) } + outs := append(mux.outAdded, mux.out...) g.Go(func() error { for { select { @@ -105,9 +111,9 @@ func (mux *channelMux[T]) Run(ctx context.Context) error { return nil } - for _, outChannel := range outChannels { + for _, ch := range outs { select { - case outChannel <- spread: + case ch <- spread: case <-ctx.Done(): return ctx.Err() } diff --git a/pkg/sync/channel-mux_test.go b/pkg/sync/channel-mux_test.go index 37e90b6e..467c527d 100644 --- a/pkg/sync/channel-mux_test.go +++ b/pkg/sync/channel-mux_test.go @@ -27,9 +27,9 @@ func TestAddedOutputChannels(t *testing.T) { outputChannel1 := make(chan int) outputChannel2 := make(chan int) outputChannel3 := make(chan int) - multiplexer.AddOutChannel(outputChannel1) - multiplexer.AddOutChannel(outputChannel2) - multiplexer.AddOutChannel(outputChannel3) + multiplexer.AddOut(outputChannel1) + multiplexer.AddOut(outputChannel2) + multiplexer.AddOut(outputChannel3) g, ctx := errgroup.WithContext(context.Background()) @@ -56,9 +56,9 @@ func TestCreatedOutputChannels(t *testing.T) { multiplexChannel := make(chan int) multiplexer := NewChannelMux(multiplexChannel) - outputChannel1 := multiplexer.NewOutChannel() - outputChannel2 := multiplexer.NewOutChannel() - outputChannel3 := multiplexer.NewOutChannel() + outputChannel1 := multiplexer.Out() + outputChannel2 := multiplexer.Out() + outputChannel3 := multiplexer.Out() g, ctx := errgroup.WithContext(context.Background()) @@ -100,7 +100,7 @@ func TestAddedInputChannels(t *testing.T) { multiplexer := NewChannelMux(multiplexChannel1, multiplexChannel2, multiplexChannel3) - outputChannel := multiplexer.NewOutChannel() + outputChannel := multiplexer.Out() ctx, cancel := context.WithCancel(context.Background()) g, ctx := errgroup.WithContext(ctx) @@ -173,9 +173,9 @@ func TestClosedChannels(t *testing.T) { multiplexChannel := make(chan int) multiplexer := NewChannelMux(multiplexChannel) - outputChannel1 := multiplexer.NewOutChannel() - outputChannel2 := multiplexer.NewOutChannel() - outputChannel3 := multiplexer.NewOutChannel() + outputChannel1 := multiplexer.Out() + outputChannel2 := multiplexer.Out() + outputChannel3 := multiplexer.Out() ctx, cancel := context.WithCancel(context.Background()) g, ctx := errgroup.WithContext(ctx) diff --git a/pkg/sync/logs.go b/pkg/sync/logs.go index ef0935d5..07d9d96f 100644 --- a/pkg/sync/logs.go +++ b/pkg/sync/logs.go @@ -3,7 +3,6 @@ package sync import ( "bufio" "context" - "crypto/sha1" "fmt" "github.com/icinga/icinga-go-library/database" "github.com/icinga/icinga-go-library/logging" @@ -16,42 +15,46 @@ import ( kcorev1 "k8s.io/api/core/v1" kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + "strconv" "strings" - msync "sync" + gosync "sync" "time" ) -type podListItem struct { - pod *kcorev1.Pod - lastTimestamps map[[20]byte]*kmetav1.Time -} - -// ContainerLogSync reacts to pod changes and syncs container logs to database. -// On pod add/updates ContainerLogSync starts syncing. On pod deletes syncing stops. -// Container logs are periodic fetched from Kubernetes API. +// ContainerLogSync reacts to pod changes and synchronizes container logs +// with the database. When a pod is added/updated, ContainerLogSync starts +// synchronizing its containers. When a pod is deleted, synchronization stops. +// Container logs are periodically fetched from the Kubernetes API. type ContainerLogSync interface { // Run starts the ContainerLogSync. Run(context.Context, <-chan contracts.KUpsert, <-chan contracts.KDelete) error } +// NewContainerLogSync creates new ContainerLogSync initialized with clientset, database and logger. +func NewContainerLogSync(clientset *kubernetes.Clientset, db *database.DB, logger *logging.Logger, period time.Duration) ContainerLogSync { + return &containerLogSync{ + pods: make(map[string]podListItem), + mutex: &gosync.RWMutex{}, + clientset: clientset, + db: db, + logger: logger, + period: period, + } +} + // containerLogSync syncs container logs to database. type containerLogSync struct { - pods map[[20]byte]podListItem - mutex *msync.RWMutex + pods map[string]podListItem + mutex *gosync.RWMutex clientset *kubernetes.Clientset db *database.DB logger *logging.Logger + period time.Duration } -// NewContainerLogSync creates new containerLogSync initialized with clientset, database and logger. -func NewContainerLogSync(clientset *kubernetes.Clientset, db *database.DB, logger *logging.Logger) ContainerLogSync { - return &containerLogSync{ - pods: make(map[[20]byte]podListItem), - mutex: &msync.RWMutex{}, - clientset: clientset, - db: db, - logger: logger, - } +type podListItem struct { + pod *kcorev1.Pod + lastTimestamps map[string]*kmetav1.Time } // upsertStmt returns a database statement to upsert a container log. @@ -65,129 +68,135 @@ func (ls *containerLogSync) upsertStmt() string { ) } -// splitTimestampsFromMessages takes a log and returns timestamps and messages as separate parts. -// Additionally, it updates the last checked timestamp for the container log. -func (ls *containerLogSync) splitTimestampsFromMessages(log types.Binary, curPodId [20]byte, curContainerId [20]byte) (times []string, messages []string, err error) { +// splitTimestampsFromMessages takes a log line and returns timestamps and messages as separate parts. +func (ls *containerLogSync) splitTimestampsFromMessages(log types.Binary, curPodId string, curContainerId string) (times []string, messages []string, newLastTimestamp time.Time, returnErr error) { stringReader := strings.NewReader(string(log)) reader := bufio.NewReader(stringReader) + var parsedTimestamp time.Time + for { line, err := reader.ReadString('\n') if err != nil { if err == io.EOF { break } - return nil, nil, errors.Wrap(err, "error reading log message") + + returnErr = errors.Wrap(err, "error reading log message") + return } - messageTime, err := time.Parse("2006-01-02T15:04:05.999999999Z", strings.Split(line, " ")[0]) + timestamp, message, _ := strings.Cut(line, " ") + + parsedTimestamp, err = time.Parse("2006-01-02T15:04:05.999999999Z", timestamp) if err != nil { - logging.Fatal(errors.Wrap(err, "error parsing log timestamp")) + ls.logger.Fatal(errors.Wrap(err, "error parsing log timestamp")) continue } - if ls.pods[curPodId].lastTimestamps[curContainerId] != nil && messageTime.UnixNano() <= ls.pods[curPodId].lastTimestamps[curContainerId].UnixNano() { + if lastTimestamp, ok := ls.pods[curPodId].lastTimestamps[curContainerId]; ok && + (parsedTimestamp.Before(lastTimestamp.Time) || parsedTimestamp.Equal(lastTimestamp.Time)) { continue } - times = append(times, strings.Split(line, " ")[0]) - messages = append(messages, strings.Join(strings.Split(line, " ")[1:], " ")) + times = append(times, strconv.FormatInt(parsedTimestamp.UnixMilli(), 10)) + messages = append(messages, message) } - return times, messages, nil + newLastTimestamp = parsedTimestamp + + return } // maintainList updates pods depending on the objects coming in via upsert and delete channel. -func (ls *containerLogSync) maintainList(ctx context.Context, upsertChannel <-chan contracts.KUpsert, deleteChannel <-chan contracts.KDelete) error { +func (ls *containerLogSync) maintainList(ctx context.Context, kupserts <-chan contracts.KUpsert, kdeletes <-chan contracts.KDelete) error { g, ctx := errgroup.WithContext(ctx) - deletes := make(chan any) + databaseDeletes := make(chan any) g.Go(func() error { - defer close(deletes) + defer close(databaseDeletes) for { select { - case <-ctx.Done(): - return errors.Wrap(ctx.Err(), "context canceled maintain log sync pods") - - case podFromChannel, more := <-upsertChannel: + case kupsert, more := <-kupserts: if !more { return nil } - pod := podFromChannel.KObject().(*kcorev1.Pod) - podId := sha1.Sum(types.Checksum(podFromChannel.ID().String())) + podId := kupsert.ID().String() - _, ok := ls.pods[podId] - - if ok { + if _, ok := ls.pods[podId]; ok { continue } ls.mutex.RLock() - ls.pods[podId] = podListItem{pod: pod} + ls.pods[podId] = podListItem{ + pod: kupsert.KObject().(*kcorev1.Pod), + lastTimestamps: make(map[string]*kmetav1.Time), + } ls.mutex.RUnlock() - case podIdFromChannel, more := <-deleteChannel: + case kdelete, more := <-kdeletes: if !more { return nil } - podId := sha1.Sum(types.Checksum(podIdFromChannel.ID().String())) + podId := kdelete.ID().String() ls.mutex.RLock() delete(ls.pods, podId) ls.mutex.RUnlock() select { - case deletes <- podId: + case databaseDeletes <- podId: case <-ctx.Done(): return ctx.Err() } + case <-ctx.Done(): + return ctx.Err() } } }) g.Go(func() error { - return database.NewDelete(ls.db).ByColumn("container_id").Stream(ctx, &schema.ContainerLog{}, deletes) + return database.NewDelete(ls.db, database.ByColumn("container_id")).Stream(ctx, &schema.ContainerLog{}, databaseDeletes) }) return g.Wait() } -func (ls *containerLogSync) Run(ctx context.Context, upsertChannel <-chan contracts.KUpsert, deleteChannel <-chan contracts.KDelete) error { +func (ls *containerLogSync) Run(ctx context.Context, kupserts <-chan contracts.KUpsert, kdeletes <-chan contracts.KDelete) error { ls.logger.Info("Starting sync") g, ctx := errgroup.WithContext(ctx) g.Go(func() error { - return ls.maintainList(ctx, upsertChannel, deleteChannel) + return ls.maintainList(ctx, kupserts, kdeletes) }) - upsertStmt := ls.upsertStmt() - upserts := make(chan database.Entity) - defer close(upserts) + databaseUpserts := make(chan database.Entity) + defer close(databaseUpserts) g.Go(func() error { for { for _, element := range ls.pods { - podId := sha1.Sum(types.Checksum(element.pod.Namespace + "/" + element.pod.Name)) + podId := types.Binary(types.Checksum(element.pod.Namespace + "/" + element.pod.Name)) for _, container := range element.pod.Spec.Containers { - containerId := sha1.Sum(types.Checksum(element.pod.Namespace + "/" + element.pod.Name + "/" + container.Name)) + containerId := types.Binary(types.Checksum(element.pod.Namespace + "/" + element.pod.Name + "/" + container.Name)) podLogOpts := kcorev1.PodLogOptions{Container: container.Name, Timestamps: true} - if ls.pods[podId].lastTimestamps != nil { - podLogOpts.SinceTime = ls.pods[podId].lastTimestamps[containerId] + if _, ok := ls.pods[podId.String()].lastTimestamps[containerId.String()]; ok { + podLogOpts.SinceTime = ls.pods[podId.String()].lastTimestamps[containerId.String()] } log, err := ls.clientset.CoreV1().Pods(element.pod.Namespace).GetLogs(element.pod.Name, &podLogOpts).Do(ctx).Raw() if err != nil { - fmt.Println(errors.Wrap(err, "error reading container log")) + ls.logger.Fatal(errors.Wrap(err, "error reading container log")) continue } - times, messages, err := ls.splitTimestampsFromMessages(log, podId, containerId) + times, messages, lastTimestamp, err := ls.splitTimestampsFromMessages(log, podId.String(), containerId.String()) if err != nil { return err } @@ -197,44 +206,37 @@ func (ls *containerLogSync) Run(ctx context.Context, upsertChannel <-chan contra } newLog := &schema.ContainerLog{ - ContainerId: containerId[:], - PodId: podId[:], + ContainerId: containerId, + PodId: podId, Time: strings.Join(times, "\n"), Log: strings.Join(messages, "\n"), } select { - case upserts <- newLog: + case databaseUpserts <- newLog: case <-ctx.Done(): return ctx.Err() } - lastTime, err := time.Parse("2006-01-02T15:04:05.999999999Z", times[len(times)-1]) - if err != nil { - return errors.Wrap(err, "error parsing log time") - } - - lastV1Time := kmetav1.Time{Time: lastTime} - - if _, ok := ls.pods[podId]; !ok { + if _, ok := ls.pods[podId.String()]; !ok { continue } - ls.pods[podId].lastTimestamps[containerId] = &lastV1Time + ls.pods[podId.String()].lastTimestamps[containerId.String()] = &kmetav1.Time{Time: lastTimestamp} } } select { case <-ctx.Done(): return ctx.Err() - case <-time.After(time.Second * 15): + case <-time.After(ls.period): } } }) g.Go(func() error { - return database.NewUpsert(ls.db).WithStatement(upsertStmt, 5).Stream(ctx, upserts) + return database.NewUpsert(ls.db, database.WithStatement(ls.upsertStmt(), 5)).Stream(ctx, databaseUpserts) }) return g.Wait() diff --git a/pkg/sync/options.go b/pkg/sync/options.go new file mode 100644 index 00000000..1a874761 --- /dev/null +++ b/pkg/sync/options.go @@ -0,0 +1,37 @@ +package sync + +import "github.com/icinga/icinga-kubernetes/pkg/contracts" + +// syncOption is a functional option for NewSync. +type syncOption func(options *syncOptions) + +// syncOptions stores options for sync. +type syncOptions struct { + forwardUpserts chan<- contracts.KUpsert + forwardDeletes chan<- contracts.KDelete +} + +// newSyncOptions returns a new syncOptions initialized with the given options. +func newSyncOptions(options ...syncOption) *syncOptions { + syncOpts := &syncOptions{} + + for _, option := range options { + option(syncOpts) + } + + return syncOpts +} + +// WithForwardUpserts forwards added and updated Kubernetes resources to the specific channel. +func WithForwardUpserts(channel chan<- contracts.KUpsert) syncOption { + return func(options *syncOptions) { + options.forwardUpserts = channel + } +} + +// WithForwardDeletes forwards deleted Kubernetes resources to the specific channel. +func WithForwardDeletes(channel chan<- contracts.KDelete) syncOption { + return func(options *syncOptions) { + options.forwardDeletes = channel + } +} diff --git a/pkg/sync/sync.go b/pkg/sync/sync.go index e04b5125..0b46ba84 100644 --- a/pkg/sync/sync.go +++ b/pkg/sync/sync.go @@ -15,7 +15,7 @@ import ( ) type Sync interface { - Run(context.Context, ...SyncOption) error + Run(context.Context, ...syncOption) error } type sync struct { @@ -31,46 +31,15 @@ func NewSync( informer kcache.SharedInformer, logger *logging.Logger, ) Sync { - s := &sync{ + return &sync{ db: db, informer: informer, logger: logger, factory: factory, } - - return s -} - -func WithForwardUpserts(channel chan<- contracts.KUpsert) SyncOption { - return func(options *SyncOptions) { - options.forwardUpserts = channel - } -} - -func WithForwardDeletes(channel chan<- contracts.KDelete) SyncOption { - return func(options *SyncOptions) { - options.forwardDeletes = channel - } } -type SyncOption func(options *SyncOptions) - -type SyncOptions struct { - forwardUpserts chan<- contracts.KUpsert - forwardDeletes chan<- contracts.KDelete -} - -func NewSyncOptions(options ...SyncOption) *SyncOptions { - syncOptions := &SyncOptions{} - - for _, option := range options { - option(syncOptions) - } - - return syncOptions -} - -func (s *sync) Run(ctx context.Context, execOptions ...SyncOption) error { +func (s *sync) Run(ctx context.Context, options ...syncOption) error { s.logger.Info("Starting sync") s.logger.Debug("Warming up") @@ -96,132 +65,77 @@ func (s *sync) Run(ctx context.Context, execOptions ...SyncOption) error { s.logger.Debug("Finished warming up") - s.factory().GetResourceVersion() - - syncOptions := NewSyncOptions(execOptions...) - - // init upsert channel spreader - multiplexUpsertChannel := make(chan contracts.KUpsert) - defer close(multiplexUpsertChannel) - - multiplexUpsert := NewChannelMux(multiplexUpsertChannel) + syncOpts := newSyncOptions(options...) - upsertChannel := multiplexUpsert.NewOutChannel() + kupsertsMux := NewChannelMux(changes.Adds(), changes.Updates()) + kupserts := kupsertsMux.Out() - if syncOptions.forwardUpserts != nil { - multiplexUpsert.AddOutChannel(syncOptions.forwardUpserts) + if syncOpts.forwardUpserts != nil { + kupsertsMux.AddOut(syncOpts.forwardUpserts) } - // run upsert channel spreader g.Go(func() error { - return multiplexUpsert.Run(ctx) + return kupsertsMux.Run(ctx) }) - upsertToStream := make(chan database.Entity) - defer close(upsertToStream) + databaseUpserts := make(chan database.Entity) + defer close(databaseUpserts) - for _, ch := range []<-chan contracts.KUpsert{changes.Adds(), changes.Updates()} { - ch := ch - - g.Go(func() error { - for { - select { - case kupsert, more := <-ch: - if !more { - return nil - } - - select { - case multiplexUpsertChannel <- kupsert: - case <-ctx.Done(): - return ctx.Err() - } - case <-ctx.Done(): - return ctx.Err() + g.Go(func() error { + for { + select { + case kupsert, more := <-kupserts: + if !more { + return nil } - } - }) - g.Go(func() error { - for { + entity := s.factory() + entity.SetID(kupsert.ID()) + entity.SetCanonicalName(kupsert.GetCanonicalName()) + entity.Obtain(kupsert.KObject()) + select { - case kupsert, more := <-upsertChannel: - if !more { - return nil - } - - entity := s.factory() - entity.SetID(kupsert.ID()) - entity.SetCanonicalName(kupsert.GetCanonicalName()) - entity.Obtain(kupsert.KObject()) - - select { - case upsertToStream <- entity: - s.logger.Debugw( - fmt.Sprintf("Sync: Upserted %s", kupsert.GetCanonicalName()), - zap.String("id", kupsert.ID().String())) - case <-ctx.Done(): - return ctx.Err() - } + case databaseUpserts <- entity: + s.logger.Debugw( + fmt.Sprintf("Sync: Upserted %s", kupsert.GetCanonicalName()), + zap.String("id", kupsert.ID().String())) case <-ctx.Done(): return ctx.Err() } + case <-ctx.Done(): + return ctx.Err() } - }) - } + } + }) g.Go(func() error { - return database.NewUpsert(s.db).Stream(ctx, upsertToStream) + return s.db.UpsertStreamed(ctx, databaseUpserts) }) - // init delete channel spreader - multiplexDeleteChannel := make(chan contracts.KDelete) - defer close(multiplexDeleteChannel) - - multiplexDelete := NewChannelMux(multiplexDeleteChannel) - - deleteChannel := multiplexDelete.NewOutChannel() + kdeletesMux := NewChannelMux(changes.Deletes()) + kdeletes := kdeletesMux.Out() - if syncOptions.forwardDeletes != nil { - multiplexDelete.AddOutChannel(syncOptions.forwardDeletes) + if syncOpts.forwardDeletes != nil { + kdeletesMux.AddOut(syncOpts.forwardDeletes) } - // run delete channel spreader - g.Go(func() error { - return multiplexDelete.Run(ctx) - }) - g.Go(func() error { - for { - select { - case kdelete, more := <-changes.Deletes(): - if !more { - return nil - } - select { - case multiplexDeleteChannel <- kdelete: - case <-ctx.Done(): - return ctx.Err() - } - case <-ctx.Done(): - return ctx.Err() - } - } + return kdeletesMux.Run(ctx) }) - deleteToStream := make(chan any) + databaseDeletes := make(chan any) g.Go(func() error { - defer close(deleteToStream) + defer close(databaseDeletes) for { select { - case kdelete, more := <-deleteChannel: + case kdelete, more := <-kdeletes: if !more { return nil } select { - case deleteToStream <- kdelete.ID(): + case databaseDeletes <- kdelete.ID(): s.logger.Debugw( fmt.Sprintf("Sync: Deleted %s", kdelete.GetCanonicalName()), zap.String("id", kdelete.ID().String())) @@ -235,7 +149,7 @@ func (s *sync) Run(ctx context.Context, execOptions ...SyncOption) error { }) g.Go(func() error { - return database.NewDelete(s.db).Stream(ctx, s.factory(), deleteToStream) + return s.db.DeleteStreamed(ctx, s.factory(), databaseDeletes) }) g.Go(func() error {