diff --git a/cmd/icinga-kubernetes/main.go b/cmd/icinga-kubernetes/main.go index 9ac8e6bd..c504d444 100644 --- a/cmd/icinga-kubernetes/main.go +++ b/cmd/icinga-kubernetes/main.go @@ -96,10 +96,12 @@ func main() { ) }) - logSync := sync.NewContainerLogSync(k, db, logs.GetChildLogger("ContainerLogs")) - g.Go(func() error { - return logSync.Run(ctx, podUpserts, podDeletes) + return sync.NewContainerLogSync( + k, db, logs.GetChildLogger("ContainerLogs"), + ).Run( + ctx, podUpserts, podDeletes, + ) }) if err := g.Wait(); err != nil { diff --git a/pkg/sync/channel-mux.go b/pkg/sync/channel-mux.go index 69635ba9..1a96ff16 100644 --- a/pkg/sync/channel-mux.go +++ b/pkg/sync/channel-mux.go @@ -7,79 +7,79 @@ import ( ) // ChannelMux is a multiplexer for channels of variable types. -// It fans all input channels to all output channels. +// It fans out all input channels to all output channels. type ChannelMux[T any] interface { + // In adds the given input channel reading. + In(<-chan T) - // AddInChannel adds given input channel to the list of input channels. - AddInChannel(<-chan T) + // Out returns a new output channel that receives from all input channels. + Out() <-chan T - // NewOutChannel returns and adds new output channel to the pods of created addedOutChannels. - NewOutChannel() <-chan T + // AddOut registers the given output channel to receive from all input channels. + AddOut(chan<- T) - // AddOutChannel adds given output channel to the list of added addedOutChannels. - AddOutChannel(chan<- T) - - // Run combines output channel lists and starts multiplexing. + // Run starts multiplexing of all input channels to all output channels. Run(context.Context) error } type channelMux[T any] struct { - inChannels []<-chan T - createdOutChannels []chan<- T - addedOutChannels []chan<- T - started atomic.Bool + in []<-chan T + out []chan<- T + outAdded []chan<- T + started atomic.Bool } -// NewChannelMux creates new ChannelMux initialized with at least one input channel -func NewChannelMux[T any](initInChannel <-chan T, inChannels ...<-chan T) ChannelMux[T] { +// NewChannelMux returns a new ChannelMux initialized with at least one input channel. +func NewChannelMux[T any](inChannel <-chan T, inChannels ...<-chan T) ChannelMux[T] { return &channelMux[T]{ - inChannels: append(make([]<-chan T, 0), append(inChannels, initInChannel)...), + in: append(inChannels, inChannel), } } -func (mux *channelMux[T]) AddInChannel(channel <-chan T) { +func (mux *channelMux[T]) In(channel <-chan T) { if mux.started.Load() { panic("channelMux already started") } - mux.inChannels = append(mux.inChannels, channel) + mux.in = append(mux.in, channel) } -func (mux *channelMux[T]) NewOutChannel() <-chan T { +func (mux *channelMux[T]) Out() <-chan T { if mux.started.Load() { panic("channelMux already started") } channel := make(chan T) - mux.createdOutChannels = append(mux.createdOutChannels, channel) + mux.out = append(mux.out, channel) return channel } -func (mux *channelMux[T]) AddOutChannel(channel chan<- T) { +func (mux *channelMux[T]) AddOut(channel chan<- T) { if mux.started.Load() { panic("channelMux already started") } - mux.addedOutChannels = append(mux.addedOutChannels, channel) + mux.outAdded = append(mux.outAdded, channel) } func (mux *channelMux[T]) Run(ctx context.Context) error { - mux.started.Store(true) + if !mux.started.CompareAndSwap(false, true) { + panic("channelMux already started") + } defer func() { - for _, channelToClose := range mux.createdOutChannels { + for _, channelToClose := range mux.out { close(channelToClose) } }() - outChannels := append(mux.addedOutChannels, mux.createdOutChannels...) + g, ctx := errgroup.WithContext(ctx) sink := make(chan T) + defer close(sink) - g, ctx := errgroup.WithContext(ctx) - - for _, ch := range mux.inChannels { + for _, ch := range mux.in { ch := ch g.Go(func() error { @@ -89,7 +89,12 @@ func (mux *channelMux[T]) Run(ctx context.Context) error { if !more { return nil } - sink <- spread + select { + case sink <- spread: + case <-ctx.Done(): + return ctx.Err() + } + case <-ctx.Done(): return ctx.Err() } @@ -97,6 +102,7 @@ func (mux *channelMux[T]) Run(ctx context.Context) error { }) } + outs := append(mux.outAdded, mux.out...) g.Go(func() error { for { select { @@ -105,9 +111,9 @@ func (mux *channelMux[T]) Run(ctx context.Context) error { return nil } - for _, outChannel := range outChannels { + for _, ch := range outs { select { - case outChannel <- spread: + case ch <- spread: case <-ctx.Done(): return ctx.Err() } diff --git a/pkg/sync/channel-mux_test.go b/pkg/sync/channel-mux_test.go index 37e90b6e..467c527d 100644 --- a/pkg/sync/channel-mux_test.go +++ b/pkg/sync/channel-mux_test.go @@ -27,9 +27,9 @@ func TestAddedOutputChannels(t *testing.T) { outputChannel1 := make(chan int) outputChannel2 := make(chan int) outputChannel3 := make(chan int) - multiplexer.AddOutChannel(outputChannel1) - multiplexer.AddOutChannel(outputChannel2) - multiplexer.AddOutChannel(outputChannel3) + multiplexer.AddOut(outputChannel1) + multiplexer.AddOut(outputChannel2) + multiplexer.AddOut(outputChannel3) g, ctx := errgroup.WithContext(context.Background()) @@ -56,9 +56,9 @@ func TestCreatedOutputChannels(t *testing.T) { multiplexChannel := make(chan int) multiplexer := NewChannelMux(multiplexChannel) - outputChannel1 := multiplexer.NewOutChannel() - outputChannel2 := multiplexer.NewOutChannel() - outputChannel3 := multiplexer.NewOutChannel() + outputChannel1 := multiplexer.Out() + outputChannel2 := multiplexer.Out() + outputChannel3 := multiplexer.Out() g, ctx := errgroup.WithContext(context.Background()) @@ -100,7 +100,7 @@ func TestAddedInputChannels(t *testing.T) { multiplexer := NewChannelMux(multiplexChannel1, multiplexChannel2, multiplexChannel3) - outputChannel := multiplexer.NewOutChannel() + outputChannel := multiplexer.Out() ctx, cancel := context.WithCancel(context.Background()) g, ctx := errgroup.WithContext(ctx) @@ -173,9 +173,9 @@ func TestClosedChannels(t *testing.T) { multiplexChannel := make(chan int) multiplexer := NewChannelMux(multiplexChannel) - outputChannel1 := multiplexer.NewOutChannel() - outputChannel2 := multiplexer.NewOutChannel() - outputChannel3 := multiplexer.NewOutChannel() + outputChannel1 := multiplexer.Out() + outputChannel2 := multiplexer.Out() + outputChannel3 := multiplexer.Out() ctx, cancel := context.WithCancel(context.Background()) g, ctx := errgroup.WithContext(ctx) diff --git a/pkg/sync/options.go b/pkg/sync/options.go new file mode 100644 index 00000000..1a874761 --- /dev/null +++ b/pkg/sync/options.go @@ -0,0 +1,37 @@ +package sync + +import "github.com/icinga/icinga-kubernetes/pkg/contracts" + +// syncOption is a functional option for NewSync. +type syncOption func(options *syncOptions) + +// syncOptions stores options for sync. +type syncOptions struct { + forwardUpserts chan<- contracts.KUpsert + forwardDeletes chan<- contracts.KDelete +} + +// newSyncOptions returns a new syncOptions initialized with the given options. +func newSyncOptions(options ...syncOption) *syncOptions { + syncOpts := &syncOptions{} + + for _, option := range options { + option(syncOpts) + } + + return syncOpts +} + +// WithForwardUpserts forwards added and updated Kubernetes resources to the specific channel. +func WithForwardUpserts(channel chan<- contracts.KUpsert) syncOption { + return func(options *syncOptions) { + options.forwardUpserts = channel + } +} + +// WithForwardDeletes forwards deleted Kubernetes resources to the specific channel. +func WithForwardDeletes(channel chan<- contracts.KDelete) syncOption { + return func(options *syncOptions) { + options.forwardDeletes = channel + } +} diff --git a/pkg/sync/sync.go b/pkg/sync/sync.go index e04b5125..eb341e87 100644 --- a/pkg/sync/sync.go +++ b/pkg/sync/sync.go @@ -15,7 +15,7 @@ import ( ) type Sync interface { - Run(context.Context, ...SyncOption) error + Run(context.Context, ...syncOption) error } type sync struct { @@ -31,46 +31,15 @@ func NewSync( informer kcache.SharedInformer, logger *logging.Logger, ) Sync { - s := &sync{ + return &sync{ db: db, informer: informer, logger: logger, factory: factory, } - - return s -} - -func WithForwardUpserts(channel chan<- contracts.KUpsert) SyncOption { - return func(options *SyncOptions) { - options.forwardUpserts = channel - } -} - -func WithForwardDeletes(channel chan<- contracts.KDelete) SyncOption { - return func(options *SyncOptions) { - options.forwardDeletes = channel - } } -type SyncOption func(options *SyncOptions) - -type SyncOptions struct { - forwardUpserts chan<- contracts.KUpsert - forwardDeletes chan<- contracts.KDelete -} - -func NewSyncOptions(options ...SyncOption) *SyncOptions { - syncOptions := &SyncOptions{} - - for _, option := range options { - option(syncOptions) - } - - return syncOptions -} - -func (s *sync) Run(ctx context.Context, execOptions ...SyncOption) error { +func (s *sync) Run(ctx context.Context, options ...syncOption) error { s.logger.Info("Starting sync") s.logger.Debug("Warming up") @@ -96,132 +65,77 @@ func (s *sync) Run(ctx context.Context, execOptions ...SyncOption) error { s.logger.Debug("Finished warming up") - s.factory().GetResourceVersion() - - syncOptions := NewSyncOptions(execOptions...) - - // init upsert channel spreader - multiplexUpsertChannel := make(chan contracts.KUpsert) - defer close(multiplexUpsertChannel) - - multiplexUpsert := NewChannelMux(multiplexUpsertChannel) + syncOpts := newSyncOptions(options...) - upsertChannel := multiplexUpsert.NewOutChannel() + kupsertMux := NewChannelMux(changes.Adds(), changes.Updates()) + kupserts := kupsertMux.Out() - if syncOptions.forwardUpserts != nil { - multiplexUpsert.AddOutChannel(syncOptions.forwardUpserts) + if syncOpts.forwardUpserts != nil { + kupsertMux.AddOut(syncOpts.forwardUpserts) } - // run upsert channel spreader g.Go(func() error { - return multiplexUpsert.Run(ctx) + return kupsertMux.Run(ctx) }) - upsertToStream := make(chan database.Entity) - defer close(upsertToStream) + databaseUpserts := make(chan database.Entity) + defer close(databaseUpserts) - for _, ch := range []<-chan contracts.KUpsert{changes.Adds(), changes.Updates()} { - ch := ch - - g.Go(func() error { - for { - select { - case kupsert, more := <-ch: - if !more { - return nil - } - - select { - case multiplexUpsertChannel <- kupsert: - case <-ctx.Done(): - return ctx.Err() - } - case <-ctx.Done(): - return ctx.Err() + g.Go(func() error { + for { + select { + case kupsert, more := <-kupserts: + if !more { + return nil } - } - }) - g.Go(func() error { - for { + entity := s.factory() + entity.SetID(kupsert.ID()) + entity.SetCanonicalName(kupsert.GetCanonicalName()) + entity.Obtain(kupsert.KObject()) + select { - case kupsert, more := <-upsertChannel: - if !more { - return nil - } - - entity := s.factory() - entity.SetID(kupsert.ID()) - entity.SetCanonicalName(kupsert.GetCanonicalName()) - entity.Obtain(kupsert.KObject()) - - select { - case upsertToStream <- entity: - s.logger.Debugw( - fmt.Sprintf("Sync: Upserted %s", kupsert.GetCanonicalName()), - zap.String("id", kupsert.ID().String())) - case <-ctx.Done(): - return ctx.Err() - } + case databaseUpserts <- entity: + s.logger.Debugw( + fmt.Sprintf("Sync: Upserted %s", kupsert.GetCanonicalName()), + zap.String("id", kupsert.ID().String())) case <-ctx.Done(): return ctx.Err() } + case <-ctx.Done(): + return ctx.Err() } - }) - } + } + }) g.Go(func() error { - return database.NewUpsert(s.db).Stream(ctx, upsertToStream) + return s.db.UpsertStreamed(ctx, databaseUpserts) }) - // init delete channel spreader - multiplexDeleteChannel := make(chan contracts.KDelete) - defer close(multiplexDeleteChannel) - - multiplexDelete := NewChannelMux(multiplexDeleteChannel) - - deleteChannel := multiplexDelete.NewOutChannel() + kdeleteMux := NewChannelMux(changes.Deletes()) + kdeletes := kdeleteMux.Out() - if syncOptions.forwardDeletes != nil { - multiplexDelete.AddOutChannel(syncOptions.forwardDeletes) + if syncOpts.forwardDeletes != nil { + kdeleteMux.AddOut(syncOpts.forwardDeletes) } - // run delete channel spreader - g.Go(func() error { - return multiplexDelete.Run(ctx) - }) - g.Go(func() error { - for { - select { - case kdelete, more := <-changes.Deletes(): - if !more { - return nil - } - select { - case multiplexDeleteChannel <- kdelete: - case <-ctx.Done(): - return ctx.Err() - } - case <-ctx.Done(): - return ctx.Err() - } - } + return kdeleteMux.Run(ctx) }) - deleteToStream := make(chan any) + databaseDeletes := make(chan any) g.Go(func() error { - defer close(deleteToStream) + defer close(databaseDeletes) for { select { - case kdelete, more := <-deleteChannel: + case kdelete, more := <-kdeletes: if !more { return nil } select { - case deleteToStream <- kdelete.ID(): + case databaseDeletes <- kdelete.ID(): s.logger.Debugw( fmt.Sprintf("Sync: Deleted %s", kdelete.GetCanonicalName()), zap.String("id", kdelete.ID().String())) @@ -235,7 +149,7 @@ func (s *sync) Run(ctx context.Context, execOptions ...SyncOption) error { }) g.Go(func() error { - return database.NewDelete(s.db).Stream(ctx, s.factory(), deleteToStream) + return s.db.DeleteStreamed(ctx, s.factory(), databaseDeletes) }) g.Go(func() error {