diff --git a/cmd/icinga-kubernetes/main.go b/cmd/icinga-kubernetes/main.go index 90924229..f56e5222 100644 --- a/cmd/icinga-kubernetes/main.go +++ b/cmd/icinga-kubernetes/main.go @@ -7,6 +7,7 @@ import ( "github.com/icinga/icinga-go-library/driver" "github.com/icinga/icinga-go-library/logging" "github.com/icinga/icinga-kubernetes/internal" + "github.com/icinga/icinga-kubernetes/pkg/contracts" "github.com/icinga/icinga-kubernetes/pkg/schema" "github.com/icinga/icinga-kubernetes/pkg/sync" "github.com/okzk/sdnotify" @@ -15,6 +16,7 @@ import ( kinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" kclientcmd "k8s.io/client-go/tools/clientcmd" + "time" ) func main() { @@ -82,10 +84,25 @@ func main() { ).Run(ctx) }) + podUpserts := make(chan contracts.KUpsert) + podDeletes := make(chan contracts.KDelete) g.Go(func() error { + defer close(podUpserts) + defer close(podDeletes) + return sync.NewSync( db, schema.NewPod, informers.Core().V1().Pods().Informer(), logs.GetChildLogger("Pods"), - ).Run(ctx) + ).Run( + ctx, sync.WithForwardUpserts(podUpserts), sync.WithForwardDeletes(podDeletes), + ) + }) + + g.Go(func() error { + return sync.NewContainerLogSync( + k, db, logs.GetChildLogger("ContainerLogs"), time.Second*15, + ).Run( + ctx, podUpserts, podDeletes, + ) }) if err := g.Wait(); err != nil { diff --git a/go.mod b/go.mod index 4fba6598..08bc7f87 100644 --- a/go.mod +++ b/go.mod @@ -8,8 +8,10 @@ require ( github.com/pkg/errors v0.9.1 go.uber.org/zap v1.26.0 golang.org/x/sync v0.5.0 - k8s.io/apimachinery v0.28.2 - k8s.io/client-go v0.28.2 + k8s.io/api v0.28.4 + k8s.io/apimachinery v0.28.4 + k8s.io/client-go v0.28.4 + k8s.io/metrics v0.28.4 ) require ( @@ -45,7 +47,7 @@ require ( github.com/ssgreg/journald v1.0.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa // indirect - golang.org/x/net v0.16.0 // indirect + golang.org/x/net v0.17.0 // indirect golang.org/x/oauth2 v0.8.0 // indirect golang.org/x/sys v0.14.0 // indirect golang.org/x/term v0.13.0 // indirect @@ -53,11 +55,10 @@ require ( golang.org/x/time v0.3.0 // indirect golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect google.golang.org/appengine v1.6.7 // indirect - google.golang.org/protobuf v1.30.0 // indirect + google.golang.org/protobuf v1.31.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/api v0.28.2 // indirect k8s.io/klog/v2 v2.100.1 // indirect k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 // indirect k8s.io/utils v0.0.0-20230406110748-d93618cff8a2 // indirect diff --git a/go.sum b/go.sum index a789518d..e173eff9 100644 --- a/go.sum +++ b/go.sum @@ -43,7 +43,6 @@ github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/ github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 h1:K6RDEckDVWvDI9JAJYCmNdQXq6neHJOYx3V6jnqNEec= github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4= github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/icinga/icinga-go-library v0.0.0-20231121080432-c03a40718ed9 h1:C+BYgMhhitPxEt9pQ/O/ssQ90mIrm7+YG4KDe9UFFBA= github.com/icinga/icinga-go-library v0.0.0-20231121080432-c03a40718ed9/go.mod h1:Apo85zqPgovShDWxx/TlUN/bfl+RaPviTafT666iJyw= github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28= github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= @@ -124,8 +123,8 @@ golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.16.0 h1:7eBu7KsSvFDtSXUIDbh3aqlK4DPsZ1rByC8PFfBThos= -golang.org/x/net v0.16.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/oauth2 v0.8.0 h1:6dkIjl3j3LtZ/O3sTgZTMsLKSftL/B8Zgq4huOIIUu8= golang.org/x/oauth2 v0.8.0/go.mod h1:yr7u4HXZRm1R1kBWqr/xKNqewf0plRYoB7sla+BCIXE= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -165,8 +164,8 @@ google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6 google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.30.0 h1:kPPoIgf3TsEvrm0PFe15JQ+570QVxYzEvvHqChK+cng= -google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= +google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= @@ -178,16 +177,18 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -k8s.io/api v0.28.2 h1:9mpl5mOb6vXZvqbQmankOfPIGiudghwCoLl1EYfUZbw= -k8s.io/api v0.28.2/go.mod h1:RVnJBsjU8tcMq7C3iaRSGMeaKt2TWEUXcpIt/90fjEg= -k8s.io/apimachinery v0.28.2 h1:KCOJLrc6gu+wV1BYgwik4AF4vXOlVJPdiqn0yAWWwXQ= -k8s.io/apimachinery v0.28.2/go.mod h1:RdzF87y/ngqk9H4z3EL2Rppv5jj95vGS/HaFXrLDApU= -k8s.io/client-go v0.28.2 h1:DNoYI1vGq0slMBN/SWKMZMw0Rq+0EQW6/AK4v9+3VeY= -k8s.io/client-go v0.28.2/go.mod h1:sMkApowspLuc7omj1FOSUxSoqjr+d5Q0Yc0LOFnYFJY= +k8s.io/api v0.28.4 h1:8ZBrLjwosLl/NYgv1P7EQLqoO8MGQApnbgH8tu3BMzY= +k8s.io/api v0.28.4/go.mod h1:axWTGrY88s/5YE+JSt4uUi6NMM+gur1en2REMR7IRj0= +k8s.io/apimachinery v0.28.4 h1:zOSJe1mc+GxuMnFzD4Z/U1wst50X28ZNsn5bhgIIao8= +k8s.io/apimachinery v0.28.4/go.mod h1:wI37ncBvfAoswfq626yPTe6Bz1c22L7uaJ8dho83mgg= +k8s.io/client-go v0.28.4 h1:Np5ocjlZcTrkyRJ3+T3PkXDpe4UpatQxj85+xjaD2wY= +k8s.io/client-go v0.28.4/go.mod h1:0VDZFpgoZfelyP5Wqu0/r/TRYcLYuJ2U1KEeoaPa1N4= k8s.io/klog/v2 v2.100.1 h1:7WCHKK6K8fNhTqfBhISHQ97KrnJNFZMcQvKp7gP/tmg= k8s.io/klog/v2 v2.100.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0= k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 h1:LyMgNKD2P8Wn1iAwQU5OhxCKlKJy0sHc+PcDwFB24dQ= k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9/go.mod h1:wZK2AVp1uHCp4VamDVgBP2COHZjqD1T68Rf0CM3YjSM= +k8s.io/metrics v0.28.4 h1:u36fom9+6c8jX2sk8z58H0hFaIUfrPWbXIxN7GT2blk= +k8s.io/metrics v0.28.4/go.mod h1:bBqAJxH20c7wAsTQxDXOlVqxGMdce49d7WNr1WeaLac= k8s.io/utils v0.0.0-20230406110748-d93618cff8a2 h1:qY1Ad8PODbnymg2pRbkyMT/ylpTrCM8P2RJ0yroCyIk= k8s.io/utils v0.0.0-20230406110748-d93618cff8a2/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo= diff --git a/pkg/schema/log.go b/pkg/schema/log.go new file mode 100644 index 00000000..1f3b7abd --- /dev/null +++ b/pkg/schema/log.go @@ -0,0 +1,11 @@ +package schema + +import "github.com/icinga/icinga-go-library/types" + +type ContainerLog struct { + kmetaWithoutNamespace + ContainerId types.Binary + PodId types.Binary + Time string + Log string +} diff --git a/pkg/sync/channel-mux.go b/pkg/sync/channel-mux.go new file mode 100644 index 00000000..2e05f73f --- /dev/null +++ b/pkg/sync/channel-mux.go @@ -0,0 +1,128 @@ +package sync + +import ( + "context" + "golang.org/x/sync/errgroup" + "sync/atomic" +) + +// ChannelMux is a multiplexer for channels of variable types. +// It fans out all input channels to all output channels. +type ChannelMux[T any] interface { + // In adds the given input channel reading. + In(<-chan T) + + // Out returns a new output channel that receives from all input channels. + Out() <-chan T + + // AddOut registers the given output channel to receive from all input channels. + AddOut(chan<- T) + + // Run starts multiplexing of all input channels to all output channels. + Run(context.Context) error +} + +type channelMux[T any] struct { + in []<-chan T + out []chan<- T + outAdded []chan<- T + started atomic.Bool +} + +// NewChannelMux returns a new ChannelMux initialized with at least one input channel. +func NewChannelMux[T any](inChannel <-chan T, inChannels ...<-chan T) ChannelMux[T] { + return &channelMux[T]{ + in: append(inChannels, inChannel), + } +} + +func (mux *channelMux[T]) In(channel <-chan T) { + if mux.started.Load() { + panic("channelMux already started") + } + + mux.in = append(mux.in, channel) +} + +func (mux *channelMux[T]) Out() <-chan T { + if mux.started.Load() { + panic("channelMux already started") + } + + channel := make(chan T) + mux.out = append(mux.out, channel) + + return channel +} + +func (mux *channelMux[T]) AddOut(channel chan<- T) { + if mux.started.Load() { + panic("channelMux already started") + } + + mux.outAdded = append(mux.outAdded, channel) +} + +func (mux *channelMux[T]) Run(ctx context.Context) error { + if mux.started.Swap(true) { + panic("channelMux already started") + } + + defer func() { + for _, channelToClose := range mux.out { + close(channelToClose) + } + }() + + g, ctx := errgroup.WithContext(ctx) + + sink := make(chan T) + defer close(sink) + + for _, ch := range mux.in { + ch := ch + + g.Go(func() error { + for { + select { + case spread, more := <-ch: + if !more { + return nil + } + select { + case sink <- spread: + case <-ctx.Done(): + return ctx.Err() + } + + case <-ctx.Done(): + return ctx.Err() + } + } + }) + } + + outs := append(mux.outAdded, mux.out...) + g.Go(func() error { + for { + select { + case spread, more := <-sink: + if !more { + return nil + } + + for _, ch := range outs { + select { + case ch <- spread: + case <-ctx.Done(): + return ctx.Err() + } + } + case <-ctx.Done(): + return ctx.Err() + } + } + }) + + return g.Wait() +} diff --git a/pkg/sync/channel-mux_test.go b/pkg/sync/channel-mux_test.go new file mode 100644 index 00000000..467c527d --- /dev/null +++ b/pkg/sync/channel-mux_test.go @@ -0,0 +1,207 @@ +package sync + +import ( + "context" + "golang.org/x/sync/errgroup" + "testing" + "time" +) + +type outputTest struct { + arg1, want int +} + +var outputTests = []outputTest{ + {0, 0}, + {5, 5}, + {35253, 35253}, + {999999, 999999}, + {-7, -7}, +} + +func TestAddedOutputChannels(t *testing.T) { + for _, test := range outputTests { + multiplexChannel := make(chan int) + multiplexer := NewChannelMux(multiplexChannel) + + outputChannel1 := make(chan int) + outputChannel2 := make(chan int) + outputChannel3 := make(chan int) + multiplexer.AddOut(outputChannel1) + multiplexer.AddOut(outputChannel2) + multiplexer.AddOut(outputChannel3) + + g, ctx := errgroup.WithContext(context.Background()) + + g.Go(func() error { + return multiplexer.Run(ctx) + }) + + multiplexChannel <- test.arg1 + + if got := <-outputChannel1; got != test.want { + t.Errorf("got '%d' for 1st test channel, wanted '%d'", got, test.want) + } + if got := <-outputChannel2; got != test.want { + t.Errorf("got '%d' for 2nd test channel, wanted '%d'", got, test.want) + } + if got := <-outputChannel3; got != test.want { + t.Errorf("got '%d' for 3rd test channel, wanted '%d'", got, test.want) + } + } +} + +func TestCreatedOutputChannels(t *testing.T) { + for _, test := range outputTests { + multiplexChannel := make(chan int) + multiplexer := NewChannelMux(multiplexChannel) + + outputChannel1 := multiplexer.Out() + outputChannel2 := multiplexer.Out() + outputChannel3 := multiplexer.Out() + + g, ctx := errgroup.WithContext(context.Background()) + + g.Go(func() error { + return multiplexer.Run(ctx) + }) + + multiplexChannel <- test.arg1 + + if got := <-outputChannel1; got != test.want { + t.Errorf("got '%d' for 1st test channel, wanted '%d'", got, test.want) + } + if got := <-outputChannel2; got != test.want { + t.Errorf("got '%d' for 2nd test channel, wanted '%d'", got, test.want) + } + if got := <-outputChannel3; got != test.want { + t.Errorf("got '%d' for 3rd test channel, wanted '%d'", got, test.want) + } + } +} + +type inputTest struct { + arg1, arg2, arg3, want int +} + +var inputTests = []inputTest{ + {0, 0, 0, 0}, + {1, 2, 3, 6}, + {535, 64, 6432, 7031}, + {353632, 636232, 64674, 1054538}, + {-1, -2, -3, -6}, +} + +func TestAddedInputChannels(t *testing.T) { + for _, test := range inputTests { + multiplexChannel1 := make(chan int) + multiplexChannel2 := make(chan int) + multiplexChannel3 := make(chan int) + + multiplexer := NewChannelMux(multiplexChannel1, multiplexChannel2, multiplexChannel3) + + outputChannel := multiplexer.Out() + + ctx, cancel := context.WithCancel(context.Background()) + g, ctx := errgroup.WithContext(ctx) + + g.Go(func() error { + return multiplexer.Run(ctx) + }) + + g.Go(func() error { + select { + case multiplexChannel1 <- test.arg1: + case <-ctx.Done(): + return ctx.Err() + } + + select { + case multiplexChannel2 <- test.arg2: + case <-ctx.Done(): + return ctx.Err() + } + + select { + case multiplexChannel3 <- test.arg3: + case <-ctx.Done(): + return ctx.Err() + } + + close(multiplexChannel1) + close(multiplexChannel2) + close(multiplexChannel3) + + return nil + }) + + stop := false + got := 0 + + g.Go(func() error { + for !stop { + select { + case output, more := <-outputChannel: + if !more { + stop = true + break + } + + got += output + case <-time.After(time.Second * 1): + stop = true + break + case <-ctx.Done(): + return ctx.Err() + } + } + + if got != test.want { + t.Errorf("Got %d, wanted %d", got, test.want) + } + + cancel() + + return nil + }) + + g.Wait() + } +} + +func TestClosedChannels(t *testing.T) { + multiplexChannel := make(chan int) + multiplexer := NewChannelMux(multiplexChannel) + + outputChannel1 := multiplexer.Out() + outputChannel2 := multiplexer.Out() + outputChannel3 := multiplexer.Out() + + ctx, cancel := context.WithCancel(context.Background()) + g, ctx := errgroup.WithContext(ctx) + + g.Go(func() error { + return multiplexer.Run(ctx) + }) + + cancel() + + select { + case <-outputChannel1: + case <-time.After(time.Second): + t.Error("1st channel is still open, should be closed") + } + + select { + case <-outputChannel2: + case <-time.After(time.Second): + t.Error("2nd channel is still open, should be closed") + } + + select { + case <-outputChannel3: + case <-time.After(time.Second): + t.Error("3rd channel is still open, should be closed") + } + +} diff --git a/pkg/sync/logs.go b/pkg/sync/logs.go new file mode 100644 index 00000000..07d9d96f --- /dev/null +++ b/pkg/sync/logs.go @@ -0,0 +1,243 @@ +package sync + +import ( + "bufio" + "context" + "fmt" + "github.com/icinga/icinga-go-library/database" + "github.com/icinga/icinga-go-library/logging" + "github.com/icinga/icinga-go-library/types" + "github.com/icinga/icinga-kubernetes/pkg/contracts" + "github.com/icinga/icinga-kubernetes/pkg/schema" + "github.com/pkg/errors" + "golang.org/x/sync/errgroup" + "io" + kcorev1 "k8s.io/api/core/v1" + kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "strconv" + "strings" + gosync "sync" + "time" +) + +// ContainerLogSync reacts to pod changes and synchronizes container logs +// with the database. When a pod is added/updated, ContainerLogSync starts +// synchronizing its containers. When a pod is deleted, synchronization stops. +// Container logs are periodically fetched from the Kubernetes API. +type ContainerLogSync interface { + // Run starts the ContainerLogSync. + Run(context.Context, <-chan contracts.KUpsert, <-chan contracts.KDelete) error +} + +// NewContainerLogSync creates new ContainerLogSync initialized with clientset, database and logger. +func NewContainerLogSync(clientset *kubernetes.Clientset, db *database.DB, logger *logging.Logger, period time.Duration) ContainerLogSync { + return &containerLogSync{ + pods: make(map[string]podListItem), + mutex: &gosync.RWMutex{}, + clientset: clientset, + db: db, + logger: logger, + period: period, + } +} + +// containerLogSync syncs container logs to database. +type containerLogSync struct { + pods map[string]podListItem + mutex *gosync.RWMutex + clientset *kubernetes.Clientset + db *database.DB + logger *logging.Logger + period time.Duration +} + +type podListItem struct { + pod *kcorev1.Pod + lastTimestamps map[string]*kmetav1.Time +} + +// upsertStmt returns a database statement to upsert a container log. +func (ls *containerLogSync) upsertStmt() string { + return fmt.Sprintf( + "INSERT INTO %s (%s) VALUES (%s) ON DUPLICATE KEY UPDATE %s", + "container_log", + "container_id, pod_id, time, log", + ":container_id, :pod_id, :time, :log", + "time=CONCAT(time, '\n', :time), log=CONCAT(log, '\n', :log)", + ) +} + +// splitTimestampsFromMessages takes a log line and returns timestamps and messages as separate parts. +func (ls *containerLogSync) splitTimestampsFromMessages(log types.Binary, curPodId string, curContainerId string) (times []string, messages []string, newLastTimestamp time.Time, returnErr error) { + stringReader := strings.NewReader(string(log)) + reader := bufio.NewReader(stringReader) + + var parsedTimestamp time.Time + + for { + line, err := reader.ReadString('\n') + if err != nil { + if err == io.EOF { + break + } + + returnErr = errors.Wrap(err, "error reading log message") + return + } + + timestamp, message, _ := strings.Cut(line, " ") + + parsedTimestamp, err = time.Parse("2006-01-02T15:04:05.999999999Z", timestamp) + if err != nil { + ls.logger.Fatal(errors.Wrap(err, "error parsing log timestamp")) + continue + } + + if lastTimestamp, ok := ls.pods[curPodId].lastTimestamps[curContainerId]; ok && + (parsedTimestamp.Before(lastTimestamp.Time) || parsedTimestamp.Equal(lastTimestamp.Time)) { + continue + } + + times = append(times, strconv.FormatInt(parsedTimestamp.UnixMilli(), 10)) + messages = append(messages, message) + } + + newLastTimestamp = parsedTimestamp + + return +} + +// maintainList updates pods depending on the objects coming in via upsert and delete channel. +func (ls *containerLogSync) maintainList(ctx context.Context, kupserts <-chan contracts.KUpsert, kdeletes <-chan contracts.KDelete) error { + g, ctx := errgroup.WithContext(ctx) + + databaseDeletes := make(chan any) + g.Go(func() error { + defer close(databaseDeletes) + + for { + select { + case kupsert, more := <-kupserts: + if !more { + return nil + } + + podId := kupsert.ID().String() + + if _, ok := ls.pods[podId]; ok { + continue + } + + ls.mutex.RLock() + ls.pods[podId] = podListItem{ + pod: kupsert.KObject().(*kcorev1.Pod), + lastTimestamps: make(map[string]*kmetav1.Time), + } + ls.mutex.RUnlock() + + case kdelete, more := <-kdeletes: + if !more { + return nil + } + + podId := kdelete.ID().String() + + ls.mutex.RLock() + delete(ls.pods, podId) + ls.mutex.RUnlock() + + select { + case databaseDeletes <- podId: + case <-ctx.Done(): + return ctx.Err() + } + case <-ctx.Done(): + return ctx.Err() + } + + } + }) + + g.Go(func() error { + return database.NewDelete(ls.db, database.ByColumn("container_id")).Stream(ctx, &schema.ContainerLog{}, databaseDeletes) + }) + + return g.Wait() +} + +func (ls *containerLogSync) Run(ctx context.Context, kupserts <-chan contracts.KUpsert, kdeletes <-chan contracts.KDelete) error { + ls.logger.Info("Starting sync") + + g, ctx := errgroup.WithContext(ctx) + + g.Go(func() error { + return ls.maintainList(ctx, kupserts, kdeletes) + }) + + databaseUpserts := make(chan database.Entity) + defer close(databaseUpserts) + + g.Go(func() error { + for { + for _, element := range ls.pods { + podId := types.Binary(types.Checksum(element.pod.Namespace + "/" + element.pod.Name)) + for _, container := range element.pod.Spec.Containers { + containerId := types.Binary(types.Checksum(element.pod.Namespace + "/" + element.pod.Name + "/" + container.Name)) + podLogOpts := kcorev1.PodLogOptions{Container: container.Name, Timestamps: true} + + if _, ok := ls.pods[podId.String()].lastTimestamps[containerId.String()]; ok { + podLogOpts.SinceTime = ls.pods[podId.String()].lastTimestamps[containerId.String()] + } + + log, err := ls.clientset.CoreV1().Pods(element.pod.Namespace).GetLogs(element.pod.Name, &podLogOpts).Do(ctx).Raw() + if err != nil { + ls.logger.Fatal(errors.Wrap(err, "error reading container log")) + continue + } + + times, messages, lastTimestamp, err := ls.splitTimestampsFromMessages(log, podId.String(), containerId.String()) + if err != nil { + return err + } + + if len(messages) == 0 { + continue + } + + newLog := &schema.ContainerLog{ + ContainerId: containerId, + PodId: podId, + Time: strings.Join(times, "\n"), + Log: strings.Join(messages, "\n"), + } + + select { + case databaseUpserts <- newLog: + case <-ctx.Done(): + return ctx.Err() + + } + + if _, ok := ls.pods[podId.String()]; !ok { + continue + } + + ls.pods[podId.String()].lastTimestamps[containerId.String()] = &kmetav1.Time{Time: lastTimestamp} + } + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(ls.period): + } + } + }) + + g.Go(func() error { + return database.NewUpsert(ls.db, database.WithStatement(ls.upsertStmt(), 5)).Stream(ctx, databaseUpserts) + }) + + return g.Wait() +} diff --git a/pkg/sync/options.go b/pkg/sync/options.go new file mode 100644 index 00000000..1a874761 --- /dev/null +++ b/pkg/sync/options.go @@ -0,0 +1,37 @@ +package sync + +import "github.com/icinga/icinga-kubernetes/pkg/contracts" + +// syncOption is a functional option for NewSync. +type syncOption func(options *syncOptions) + +// syncOptions stores options for sync. +type syncOptions struct { + forwardUpserts chan<- contracts.KUpsert + forwardDeletes chan<- contracts.KDelete +} + +// newSyncOptions returns a new syncOptions initialized with the given options. +func newSyncOptions(options ...syncOption) *syncOptions { + syncOpts := &syncOptions{} + + for _, option := range options { + option(syncOpts) + } + + return syncOpts +} + +// WithForwardUpserts forwards added and updated Kubernetes resources to the specific channel. +func WithForwardUpserts(channel chan<- contracts.KUpsert) syncOption { + return func(options *syncOptions) { + options.forwardUpserts = channel + } +} + +// WithForwardDeletes forwards deleted Kubernetes resources to the specific channel. +func WithForwardDeletes(channel chan<- contracts.KDelete) syncOption { + return func(options *syncOptions) { + options.forwardDeletes = channel + } +} diff --git a/pkg/sync/sync.go b/pkg/sync/sync.go index d0d788db..0b46ba84 100644 --- a/pkg/sync/sync.go +++ b/pkg/sync/sync.go @@ -15,7 +15,7 @@ import ( ) type Sync interface { - Run(context.Context) error + Run(context.Context, ...syncOption) error } type sync struct { @@ -39,7 +39,7 @@ func NewSync( } } -func (s *sync) Run(ctx context.Context) error { +func (s *sync) Run(ctx context.Context, options ...syncOption) error { s.logger.Info("Starting sync") s.logger.Debug("Warming up") @@ -65,57 +65,77 @@ func (s *sync) Run(ctx context.Context) error { s.logger.Debug("Finished warming up") - upserts := make(chan database.Entity) - defer close(upserts) + syncOpts := newSyncOptions(options...) - for _, ch := range []<-chan contracts.KUpsert{changes.Adds(), changes.Updates()} { - ch := ch + kupsertsMux := NewChannelMux(changes.Adds(), changes.Updates()) + kupserts := kupsertsMux.Out() + + if syncOpts.forwardUpserts != nil { + kupsertsMux.AddOut(syncOpts.forwardUpserts) + } + + g.Go(func() error { + return kupsertsMux.Run(ctx) + }) + + databaseUpserts := make(chan database.Entity) + defer close(databaseUpserts) + + g.Go(func() error { + for { + select { + case kupsert, more := <-kupserts: + if !more { + return nil + } + + entity := s.factory() + entity.SetID(kupsert.ID()) + entity.SetCanonicalName(kupsert.GetCanonicalName()) + entity.Obtain(kupsert.KObject()) - g.Go(func() error { - for { select { - case kupsert, more := <-ch: - if !more { - return nil - } - - entity := s.factory() - entity.SetID(kupsert.ID()) - entity.SetCanonicalName(kupsert.GetCanonicalName()) - entity.Obtain(kupsert.KObject()) - - select { - case upserts <- entity: - s.logger.Debugw( - fmt.Sprintf("Sync: Upserted %s", kupsert.GetCanonicalName()), - zap.String("id", kupsert.ID().String())) - case <-ctx.Done(): - return ctx.Err() - } + case databaseUpserts <- entity: + s.logger.Debugw( + fmt.Sprintf("Sync: Upserted %s", kupsert.GetCanonicalName()), + zap.String("id", kupsert.ID().String())) case <-ctx.Done(): return ctx.Err() } + case <-ctx.Done(): + return ctx.Err() } - }) - } + } + }) g.Go(func() error { - return s.db.UpsertStreamed(ctx, upserts) + return s.db.UpsertStreamed(ctx, databaseUpserts) }) - deletes := make(chan any) - defer close(deletes) + kdeletesMux := NewChannelMux(changes.Deletes()) + kdeletes := kdeletesMux.Out() + if syncOpts.forwardDeletes != nil { + kdeletesMux.AddOut(syncOpts.forwardDeletes) + } + + g.Go(func() error { + return kdeletesMux.Run(ctx) + }) + + databaseDeletes := make(chan any) g.Go(func() error { + defer close(databaseDeletes) + for { select { - case kdelete, more := <-changes.Deletes(): + case kdelete, more := <-kdeletes: if !more { return nil } select { - case deletes <- kdelete.ID(): + case databaseDeletes <- kdelete.ID(): s.logger.Debugw( fmt.Sprintf("Sync: Deleted %s", kdelete.GetCanonicalName()), zap.String("id", kdelete.ID().String())) @@ -129,7 +149,7 @@ func (s *sync) Run(ctx context.Context) error { }) g.Go(func() error { - return s.db.DeleteStreamed(ctx, s.factory(), deletes) + return s.db.DeleteStreamed(ctx, s.factory(), databaseDeletes) }) g.Go(func() error { diff --git a/schema/mysql/schema.sql b/schema/mysql/schema.sql index f6eab658..e6a90f42 100644 --- a/schema/mysql/schema.sql +++ b/schema/mysql/schema.sql @@ -28,3 +28,11 @@ CREATE TABLE pod ( created bigint unsigned NOT NULL, PRIMARY KEY (id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; + +CREATE TABLE container_log ( + container_id binary(20) NOT NULL, + pod_id binary(20) NOT NULL, + time longtext NOT NULL, + log longtext NOT NULL, + PRIMARY KEY (container_id) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;