Skip to content

Commit

Permalink
Fix review
Browse files Browse the repository at this point in the history
  • Loading branch information
jrauh01 committed Dec 11, 2023
1 parent 90a7bb3 commit a0511d4
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 171 deletions.
8 changes: 5 additions & 3 deletions cmd/icinga-kubernetes/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,12 @@ func main() {
)
})

logSync := sync.NewContainerLogSync(k, db, logs.GetChildLogger("ContainerLogs"))

g.Go(func() error {
return logSync.Run(ctx, podUpserts, podDeletes)
return sync.NewContainerLogSync(
k, db, logs.GetChildLogger("ContainerLogs"),
).Run(
ctx, podUpserts, podDeletes,
)
})

if err := g.Wait(); err != nil {
Expand Down
68 changes: 37 additions & 31 deletions pkg/sync/channel-mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,79 +7,79 @@ import (
)

// ChannelMux is a multiplexer for channels of variable types.
// It fans all input channels to all output channels.
// It fans out all input channels to all output channels.
type ChannelMux[T any] interface {
// In adds the given input channel reading.
In(<-chan T)

// AddInChannel adds given input channel to the list of input channels.
AddInChannel(<-chan T)
// Out returns a new output channel that receives from all input channels.
Out() <-chan T

// NewOutChannel returns and adds new output channel to the pods of created addedOutChannels.
NewOutChannel() <-chan T
// AddOut registers the given output channel to receive from all input channels.
AddOut(chan<- T)

// AddOutChannel adds given output channel to the list of added addedOutChannels.
AddOutChannel(chan<- T)

// Run combines output channel lists and starts multiplexing.
// Run starts multiplexing of all input channels to all output channels.
Run(context.Context) error
}

type channelMux[T any] struct {
inChannels []<-chan T
createdOutChannels []chan<- T
addedOutChannels []chan<- T
started atomic.Bool
in []<-chan T
out []chan<- T
outAdded []chan<- T
started atomic.Bool
}

// NewChannelMux creates new ChannelMux initialized with at least one input channel
func NewChannelMux[T any](initInChannel <-chan T, inChannels ...<-chan T) ChannelMux[T] {
// NewChannelMux returns a new ChannelMux initialized with at least one input channel.
func NewChannelMux[T any](inChannel <-chan T, inChannels ...<-chan T) ChannelMux[T] {
return &channelMux[T]{
inChannels: append(make([]<-chan T, 0), append(inChannels, initInChannel)...),
in: append(inChannels, inChannel),
}
}

func (mux *channelMux[T]) AddInChannel(channel <-chan T) {
func (mux *channelMux[T]) In(channel <-chan T) {
if mux.started.Load() {
panic("channelMux already started")
}

mux.inChannels = append(mux.inChannels, channel)
mux.in = append(mux.in, channel)
}

func (mux *channelMux[T]) NewOutChannel() <-chan T {
func (mux *channelMux[T]) Out() <-chan T {
if mux.started.Load() {
panic("channelMux already started")
}

channel := make(chan T)
mux.createdOutChannels = append(mux.createdOutChannels, channel)
mux.out = append(mux.out, channel)

return channel
}

func (mux *channelMux[T]) AddOutChannel(channel chan<- T) {
func (mux *channelMux[T]) AddOut(channel chan<- T) {
if mux.started.Load() {
panic("channelMux already started")
}

mux.addedOutChannels = append(mux.addedOutChannels, channel)
mux.outAdded = append(mux.outAdded, channel)
}

func (mux *channelMux[T]) Run(ctx context.Context) error {
mux.started.Store(true)
if mux.started.Swap(true) {
panic("channelMux already started")
}

defer func() {
for _, channelToClose := range mux.createdOutChannels {
for _, channelToClose := range mux.out {
close(channelToClose)
}
}()

outChannels := append(mux.addedOutChannels, mux.createdOutChannels...)
g, ctx := errgroup.WithContext(ctx)

sink := make(chan T)
defer close(sink)

g, ctx := errgroup.WithContext(ctx)

for _, ch := range mux.inChannels {
for _, ch := range mux.in {
ch := ch

g.Go(func() error {
Expand All @@ -89,14 +89,20 @@ func (mux *channelMux[T]) Run(ctx context.Context) error {
if !more {
return nil
}
sink <- spread
select {
case sink <- spread:
case <-ctx.Done():
return ctx.Err()
}

case <-ctx.Done():
return ctx.Err()
}
}
})
}

outs := append(mux.outAdded, mux.out...)
g.Go(func() error {
for {
select {
Expand All @@ -105,9 +111,9 @@ func (mux *channelMux[T]) Run(ctx context.Context) error {
return nil
}

for _, outChannel := range outChannels {
for _, ch := range outs {
select {
case outChannel <- spread:
case ch <- spread:
case <-ctx.Done():
return ctx.Err()
}
Expand Down
20 changes: 10 additions & 10 deletions pkg/sync/channel-mux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ func TestAddedOutputChannels(t *testing.T) {
outputChannel1 := make(chan int)
outputChannel2 := make(chan int)
outputChannel3 := make(chan int)
multiplexer.AddOutChannel(outputChannel1)
multiplexer.AddOutChannel(outputChannel2)
multiplexer.AddOutChannel(outputChannel3)
multiplexer.AddOut(outputChannel1)
multiplexer.AddOut(outputChannel2)
multiplexer.AddOut(outputChannel3)

g, ctx := errgroup.WithContext(context.Background())

Expand All @@ -56,9 +56,9 @@ func TestCreatedOutputChannels(t *testing.T) {
multiplexChannel := make(chan int)
multiplexer := NewChannelMux(multiplexChannel)

outputChannel1 := multiplexer.NewOutChannel()
outputChannel2 := multiplexer.NewOutChannel()
outputChannel3 := multiplexer.NewOutChannel()
outputChannel1 := multiplexer.Out()
outputChannel2 := multiplexer.Out()
outputChannel3 := multiplexer.Out()

g, ctx := errgroup.WithContext(context.Background())

Expand Down Expand Up @@ -100,7 +100,7 @@ func TestAddedInputChannels(t *testing.T) {

multiplexer := NewChannelMux(multiplexChannel1, multiplexChannel2, multiplexChannel3)

outputChannel := multiplexer.NewOutChannel()
outputChannel := multiplexer.Out()

ctx, cancel := context.WithCancel(context.Background())
g, ctx := errgroup.WithContext(ctx)
Expand Down Expand Up @@ -173,9 +173,9 @@ func TestClosedChannels(t *testing.T) {
multiplexChannel := make(chan int)
multiplexer := NewChannelMux(multiplexChannel)

outputChannel1 := multiplexer.NewOutChannel()
outputChannel2 := multiplexer.NewOutChannel()
outputChannel3 := multiplexer.NewOutChannel()
outputChannel1 := multiplexer.Out()
outputChannel2 := multiplexer.Out()
outputChannel3 := multiplexer.Out()

ctx, cancel := context.WithCancel(context.Background())
g, ctx := errgroup.WithContext(ctx)
Expand Down
37 changes: 37 additions & 0 deletions pkg/sync/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package sync

import "github.com/icinga/icinga-kubernetes/pkg/contracts"

// syncOption is a functional option for NewSync.
type syncOption func(options *syncOptions)

// syncOptions stores options for sync.
type syncOptions struct {
forwardUpserts chan<- contracts.KUpsert
forwardDeletes chan<- contracts.KDelete
}

// newSyncOptions returns a new syncOptions initialized with the given options.
func newSyncOptions(options ...syncOption) *syncOptions {
syncOpts := &syncOptions{}

for _, option := range options {
option(syncOpts)
}

return syncOpts
}

// WithForwardUpserts forwards added and updated Kubernetes resources to the specific channel.
func WithForwardUpserts(channel chan<- contracts.KUpsert) syncOption {
return func(options *syncOptions) {
options.forwardUpserts = channel
}
}

// WithForwardDeletes forwards deleted Kubernetes resources to the specific channel.
func WithForwardDeletes(channel chan<- contracts.KDelete) syncOption {
return func(options *syncOptions) {
options.forwardDeletes = channel
}
}
Loading

0 comments on commit a0511d4

Please sign in to comment.