From e9917b78ac4ac7f800bba37d1a007ddc0823284d Mon Sep 17 00:00:00 2001 From: Eric Lippmann Date: Mon, 18 Nov 2024 21:16:50 +0100 Subject: [PATCH] Refactor Icinga Notifications integration --- cmd/icinga-kubernetes/main.go | 194 +++++++++++++++++++---------- config.example.yml | 17 ++- go.mod | 20 ++- go.sum | 58 ++++----- internal/cache/v1/multiplexers.go | 148 ++++++++++++++++++++++ internal/channel_multiplexer.go | 156 +++++++++++++++++++++++ internal/notifications.go | 108 ++++++++++++++++ pkg/notifications/client.go | 79 +++++++----- pkg/notifications/config.go | 35 +++--- pkg/notifications/contracts.go | 7 ++ pkg/notifications/event.go | 33 +++++ pkg/notifications/notifications.go | 86 ------------- pkg/schema/v1/config.go | 9 +- pkg/schema/v1/daemon_set.go | 23 ++-- pkg/schema/v1/deployment.go | 23 ++-- pkg/schema/v1/node.go | 31 +++-- pkg/schema/v1/pod.go | 23 ++-- pkg/schema/v1/replica_set.go | 23 ++-- pkg/schema/v1/stateful_set.go | 23 ++-- pkg/sync/{ => v1}/controller.go | 2 +- pkg/sync/{ => v1}/event_handler.go | 2 +- pkg/sync/{ => v1}/features.go | 2 +- pkg/sync/{ => v1}/sink.go | 2 +- pkg/sync/v1/sync.go | 15 ++- schema/mysql/schema.sql | 21 ++-- 25 files changed, 789 insertions(+), 351 deletions(-) create mode 100644 internal/cache/v1/multiplexers.go create mode 100644 internal/channel_multiplexer.go create mode 100644 internal/notifications.go create mode 100644 pkg/notifications/contracts.go create mode 100644 pkg/notifications/event.go delete mode 100644 pkg/notifications/notifications.go rename pkg/sync/{ => v1}/controller.go (99%) rename pkg/sync/{ => v1}/event_handler.go (99%) rename pkg/sync/{ => v1}/features.go (98%) rename pkg/sync/{ => v1}/sink.go (99%) diff --git a/cmd/icinga-kubernetes/main.go b/cmd/icinga-kubernetes/main.go index edddf6e4..5a0e4100 100644 --- a/cmd/icinga-kubernetes/main.go +++ b/cmd/icinga-kubernetes/main.go @@ -11,6 +11,7 @@ import ( "github.com/icinga/icinga-go-library/periodic" "github.com/icinga/icinga-go-library/types" "github.com/icinga/icinga-kubernetes/internal" + cachev1 "github.com/icinga/icinga-kubernetes/internal/cache/v1" "github.com/icinga/icinga-kubernetes/pkg/backoff" "github.com/icinga/icinga-kubernetes/pkg/com" "github.com/icinga/icinga-kubernetes/pkg/daemon" @@ -19,7 +20,6 @@ import ( "github.com/icinga/icinga-kubernetes/pkg/notifications" "github.com/icinga/icinga-kubernetes/pkg/retry" schemav1 "github.com/icinga/icinga-kubernetes/pkg/schema/v1" - "github.com/icinga/icinga-kubernetes/pkg/sync" syncv1 "github.com/icinga/icinga-kubernetes/pkg/sync/v1" k8sMysql "github.com/icinga/icinga-kubernetes/schema/mysql" "github.com/okzk/sdnotify" @@ -35,6 +35,7 @@ import ( "k8s.io/klog/v2" "os" "strings" + "sync" "time" ) @@ -232,20 +233,41 @@ func main() { } }, periodic.Immediate()).Stop() - var nclient *notifications.Client - if err := notifications.SyncSourceConfig(ctx, db2, &cfg.Notifications); err != nil { + if err := internal.SyncNotificationsConfig(ctx, db2, &cfg.Notifications); err != nil { klog.Fatal(err) } - if cfg.Notifications.Url == "" { - err = notifications.RetrieveConfig(ctx, db2, &cfg.Notifications) + if cfg.Notifications.Url != "" { + klog.Infof("Sending notifications to %s", cfg.Notifications.Url) + + nclient, err := notifications.NewClient("icinga-kubernetes/"+internal.Version.Version, cfg.Notifications) if err != nil { - klog.Error(errors.Wrap(err, "cannot retrieve Icinga Notifications config")) + klog.Fatal(err) } - } - if cfg.Notifications.Url != "" { - nclient = notifications.NewClient(db2, cfg.Notifications) + g.Go(func() error { + return nclient.Stream(ctx, cachev1.Multiplexers().Nodes().UpsertEvents().Out()) + }) + + g.Go(func() error { + return nclient.Stream(ctx, cachev1.Multiplexers().DaemonSets().UpsertEvents().Out()) + }) + + g.Go(func() error { + return nclient.Stream(ctx, cachev1.Multiplexers().StatefulSets().UpsertEvents().Out()) + }) + + g.Go(func() error { + return nclient.Stream(ctx, cachev1.Multiplexers().Deployments().UpsertEvents().Out()) + }) + + g.Go(func() error { + return nclient.Stream(ctx, cachev1.Multiplexers().ReplicaSets().UpsertEvents().Out()) + }) + + g.Go(func() error { + return nclient.Stream(ctx, cachev1.Multiplexers().Pods().UpsertEvents().Out()) + }) } if cfg.Prometheus.Url != "" { @@ -271,146 +293,192 @@ func main() { return s.Run(ctx) }) - g.Go(func() error { - nodes := internal.NewMultiplex() - if cfg.Notifications.Url != "" { - nodesOut := nodes.Out() - g.Go(func() error { return nclient.Stream(ctx, nodesOut) }) - } - nodesIn := nodes.In() - g.Go(func() error { return nodes.Do(ctx) }) + wg := sync.WaitGroup{} - s := syncv1.NewSync(db, factory.Core().V1().Nodes().Informer(), log.WithName("nodes"), schemav1.NewNode) - return s.Run(ctx, sync.WithOnUpsert(com.ForwardBulk(nodesIn))) - }) + wg.Add(1) g.Go(func() error { - pods := internal.NewMultiplex() - deletedPodUuids := internal.NewMultiplex() + s := syncv1.NewSync(db, factory.Core().V1().Nodes().Informer(), log.WithName("nodes"), schemav1.NewNode) + var forwardForNotifications []syncv1.Feature if cfg.Notifications.Url != "" { - podsOut := pods.Out() - g.Go(func() error { return nclient.Stream(ctx, podsOut) }) + forwardForNotifications = append( + forwardForNotifications, + syncv1.WithOnUpsert(com.ForwardBulk(cachev1.Multiplexers().Nodes().UpsertEvents().In())), + syncv1.WithOnDelete(com.ForwardBulk(cachev1.Multiplexers().Nodes().DeleteEvents().In())), + ) } - schemav1.SyncContainers(ctx, db, g, pods.Out(), deletedPodUuids.Out()) + wg.Done() + + return s.Run(ctx, forwardForNotifications...) + }) + + wg.Add(1) + g.Go(func() error { + schemav1.SyncContainers( + ctx, + db, + g, + cachev1.Multiplexers().Pods().UpsertEvents().Out(), + cachev1.Multiplexers().Pods().DeleteEvents().Out(), + ) f := schemav1.NewPodFactory(clientset) s := syncv1.NewSync(db, factory.Core().V1().Pods().Informer(), log.WithName("pods"), f.New) - podsIn := pods.In() - deletedIn := deletedPodUuids.In() + wg.Done() - g.Go(func() error { return pods.Do(ctx) }) - g.Go(func() error { return deletedPodUuids.Do(ctx) }) - - return s.Run(ctx, sync.WithOnUpsert(com.ForwardBulk(podsIn)), sync.WithOnDelete(com.ForwardBulk(deletedIn))) + return s.Run( + ctx, + syncv1.WithOnUpsert(com.ForwardBulk(cachev1.Multiplexers().Pods().UpsertEvents().In())), + syncv1.WithOnDelete(com.ForwardBulk(cachev1.Multiplexers().Pods().DeleteEvents().In())), + ) }) + + wg.Add(1) g.Go(func() error { - deployments := internal.NewMultiplex() + s := syncv1.NewSync( + db, factory.Apps().V1().Deployments().Informer(), log.WithName("deployments"), schemav1.NewDeployment) + + var forwardForNotifications []syncv1.Feature if cfg.Notifications.Url != "" { - deploymentsOut := deployments.Out() - g.Go(func() error { return nclient.Stream(ctx, deploymentsOut) }) + forwardForNotifications = append( + forwardForNotifications, + syncv1.WithOnUpsert(com.ForwardBulk(cachev1.Multiplexers().Deployments().UpsertEvents().In())), + syncv1.WithOnDelete(com.ForwardBulk(cachev1.Multiplexers().Deployments().DeleteEvents().In())), + ) } - s := syncv1.NewSync(db, factory.Apps().V1().Deployments().Informer(), log.WithName("deployments"), schemav1.NewDeployment) - deploymentsIn := deployments.In() - g.Go(func() error { return deployments.Do(ctx) }) + wg.Done() - return s.Run(ctx, sync.WithOnUpsert(com.ForwardBulk(deploymentsIn))) + return s.Run(ctx, forwardForNotifications...) }) + + wg.Add(1) g.Go(func() error { - daemonSet := internal.NewMultiplex() + s := syncv1.NewSync( + db, factory.Apps().V1().DaemonSets().Informer(), log.WithName("daemon-sets"), schemav1.NewDaemonSet) + + var forwardForNotifications []syncv1.Feature if cfg.Notifications.Url != "" { - daemonSetOut := daemonSet.Out() - g.Go(func() error { return nclient.Stream(ctx, daemonSetOut) }) + forwardForNotifications = append( + forwardForNotifications, + syncv1.WithOnUpsert(com.ForwardBulk(cachev1.Multiplexers().DaemonSets().UpsertEvents().In())), + syncv1.WithOnDelete(com.ForwardBulk(cachev1.Multiplexers().DaemonSets().DeleteEvents().In())), + ) } - daemonSetIn := daemonSet.In() - g.Go(func() error { return daemonSet.Do(ctx) }) + wg.Done() - s := syncv1.NewSync(db, factory.Apps().V1().DaemonSets().Informer(), log.WithName("daemon-sets"), schemav1.NewDaemonSet) - - return s.Run(ctx, sync.WithOnUpsert(com.ForwardBulk(daemonSetIn))) + return s.Run(ctx, forwardForNotifications...) }) + + wg.Add(1) g.Go(func() error { - replicaSet := internal.NewMultiplex() + s := syncv1.NewSync( + db, factory.Apps().V1().ReplicaSets().Informer(), log.WithName("replica-sets"), schemav1.NewReplicaSet) + + var forwardForNotifications []syncv1.Feature if cfg.Notifications.Url != "" { - replicaSetOut := replicaSet.Out() - g.Go(func() error { return nclient.Stream(ctx, replicaSetOut) }) + forwardForNotifications = append( + forwardForNotifications, + syncv1.WithOnUpsert(com.ForwardBulk(cachev1.Multiplexers().ReplicaSets().UpsertEvents().In())), + syncv1.WithOnDelete(com.ForwardBulk(cachev1.Multiplexers().ReplicaSets().DeleteEvents().In())), + ) } - replicaSetIn := replicaSet.In() - g.Go(func() error { return replicaSet.Do(ctx) }) - - s := syncv1.NewSync(db, factory.Apps().V1().ReplicaSets().Informer(), log.WithName("replica-sets"), schemav1.NewReplicaSet) + wg.Done() - return s.Run(ctx, sync.WithOnUpsert(com.ForwardBulk(replicaSetIn))) + return s.Run(ctx, forwardForNotifications...) }) + + wg.Add(1) g.Go(func() error { - statefulSet := internal.NewMultiplex() + s := syncv1.NewSync( + db, factory.Apps().V1().StatefulSets().Informer(), log.WithName("stateful-sets"), schemav1.NewStatefulSet) + + var forwardForNotifications []syncv1.Feature if cfg.Notifications.Url != "" { - statefulSetOut := statefulSet.Out() - g.Go(func() error { return nclient.Stream(ctx, statefulSetOut) }) + forwardForNotifications = append( + forwardForNotifications, + syncv1.WithOnUpsert(com.ForwardBulk(cachev1.Multiplexers().StatefulSets().UpsertEvents().In())), + syncv1.WithOnDelete(com.ForwardBulk(cachev1.Multiplexers().StatefulSets().DeleteEvents().In())), + ) } - statefulSetIn := statefulSet.In() - g.Go(func() error { return statefulSet.Do(ctx) }) - - s := syncv1.NewSync(db, factory.Apps().V1().StatefulSets().Informer(), log.WithName("stateful-sets"), schemav1.NewStatefulSet) + wg.Done() - return s.Run(ctx, sync.WithOnUpsert(com.ForwardBulk(statefulSetIn))) + return s.Run(ctx, forwardForNotifications...) }) + g.Go(func() error { s := syncv1.NewSync(db, factory.Core().V1().Services().Informer(), log.WithName("services"), schemav1.NewService) return s.Run(ctx) }) + g.Go(func() error { s := syncv1.NewSync(db, factory.Discovery().V1().EndpointSlices().Informer(), log.WithName("endpoints"), schemav1.NewEndpointSlice) return s.Run(ctx) }) + g.Go(func() error { s := syncv1.NewSync(db, factory.Core().V1().Secrets().Informer(), log.WithName("secrets"), schemav1.NewSecret) return s.Run(ctx) }) + g.Go(func() error { s := syncv1.NewSync(db, factory.Core().V1().ConfigMaps().Informer(), log.WithName("config-maps"), schemav1.NewConfigMap) return s.Run(ctx) }) + g.Go(func() error { s := syncv1.NewSync(db, factory.Events().V1().Events().Informer(), log.WithName("events"), schemav1.NewEvent) - return s.Run(ctx, sync.WithNoDelete(), sync.WithNoWarumup()) + return s.Run(ctx, syncv1.WithNoDelete(), syncv1.WithNoWarumup()) }) + g.Go(func() error { s := syncv1.NewSync(db, factory.Core().V1().PersistentVolumeClaims().Informer(), log.WithName("pvcs"), schemav1.NewPvc) return s.Run(ctx) }) + g.Go(func() error { s := syncv1.NewSync(db, factory.Core().V1().PersistentVolumes().Informer(), log.WithName("persistent-volumes"), schemav1.NewPersistentVolume) return s.Run(ctx) }) + g.Go(func() error { s := syncv1.NewSync(db, factory.Batch().V1().Jobs().Informer(), log.WithName("jobs"), schemav1.NewJob) return s.Run(ctx) }) + g.Go(func() error { s := syncv1.NewSync(db, factory.Batch().V1().CronJobs().Informer(), log.WithName("cron-jobs"), schemav1.NewCronJob) return s.Run(ctx) }) + g.Go(func() error { s := syncv1.NewSync(db, factory.Networking().V1().Ingresses().Informer(), log.WithName("ingresses"), schemav1.NewIngress) return s.Run(ctx) }) + g.Go(func() error { + wg.Wait() + + klog.V(2).Info("Starting multiplexers") + + return cachev1.Multiplexers().Run(ctx) + }) + g.Go(func() error { return db.PeriodicCleanup(ctx, database.CleanupStmt{ Table: "event", diff --git a/config.example.yml b/config.example.yml index 937e05ed..8a1eb633 100644 --- a/config.example.yml +++ b/config.example.yml @@ -28,17 +28,14 @@ prometheus: # Configuration for Icinga Notifications daemon. notifications: - # Icinga Notifications daemon base URL. - # If not set, Icinga for Kubernetes won't send any event to Icinga Notifications (disabled). + # Icinga Notifications daemon URL. # url: http://localhost:5680 - # The base URL of your Icinga for Kubernetes web module to be used for the generated Icinga Notifications events. -# kubernetes_web_url: http://localhost/icingaweb2/kubernetes + # Username for authenticating the Icinga for Kubernetes source in Icinga Notifications. +# username: source-\d+ - # The Icinga for Kubernetes source username used to authenticate in Icinga Notifications. - # Leave it empty to let Icinga for Kubernetes generate the credentials automatically. -# username: kubernetes + # Password for authenticating the Icinga for Kubernetes source in Icinga Notifications. +# password: password - # The Icinga for Kubernetes source password used to authenticate in Icinga Notifications. - # Leave it empty to let Icinga for Kubernetes generate the credentials automatically. -# password: 0bcf3398-12c8-4cb5-849d-81dcd0644cc7 + # The base URL of Icinga for Kubernetes Web used in generated Icinga Notification events. +# kubernetes_web_url: http://localhost/icingaweb2/kubernetes diff --git a/go.mod b/go.mod index 012081ff..ac7f5cbb 100644 --- a/go.mod +++ b/go.mod @@ -2,14 +2,12 @@ module github.com/icinga/icinga-kubernetes go 1.22.0 -toolchain go1.23.0 - require ( github.com/go-co-op/gocron v1.37.0 github.com/go-logr/logr v1.4.2 github.com/go-sql-driver/mysql v1.8.1 github.com/google/uuid v1.6.0 - github.com/icinga/icinga-go-library v0.0.0-20240524093614-7048f8f10123 + github.com/icinga/icinga-go-library v0.3.2-0.20241118194934-1a19cd696d37 github.com/jmoiron/sqlx v1.4.0 github.com/lib/pq v1.10.9 github.com/okzk/sdnotify v0.0.0-20240725214427-1c1fdd37c5ac @@ -19,7 +17,7 @@ require ( github.com/spf13/pflag v1.0.5 go.uber.org/zap v1.27.0 golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 - golang.org/x/sync v0.8.0 + golang.org/x/sync v0.9.0 k8s.io/api v0.31.1 k8s.io/apimachinery v0.31.1 k8s.io/client-go v0.31.1 @@ -29,15 +27,16 @@ require ( require ( filippo.io/edwards25519 v1.1.0 // indirect + github.com/caarlos0/env/v11 v11.2.2 // indirect github.com/creasty/defaults v1.8.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/emicklei/go-restful/v3 v3.12.1 // indirect - github.com/fatih/color v1.17.0 // indirect + github.com/fatih/color v1.18.0 // indirect github.com/fxamacker/cbor/v2 v2.7.0 // indirect github.com/go-openapi/jsonpointer v0.21.0 // indirect github.com/go-openapi/jsonreference v0.21.0 // indirect github.com/go-openapi/swag v0.23.0 // indirect - github.com/goccy/go-yaml v1.12.0 // indirect + github.com/goccy/go-yaml v1.13.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/google/gnostic-models v0.6.8 // indirect @@ -59,13 +58,12 @@ require ( github.com/x448/float16 v0.8.4 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/net v0.29.0 // indirect + golang.org/x/net v0.30.0 // indirect golang.org/x/oauth2 v0.23.0 // indirect - golang.org/x/sys v0.25.0 // indirect - golang.org/x/term v0.24.0 // indirect - golang.org/x/text v0.18.0 // indirect + golang.org/x/sys v0.26.0 // indirect + golang.org/x/term v0.25.0 // indirect + golang.org/x/text v0.19.0 // indirect golang.org/x/time v0.6.0 // indirect - golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect google.golang.org/protobuf v1.34.2 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/go.sum b/go.sum index dfb4a743..fedc8b37 100644 --- a/go.sum +++ b/go.sum @@ -2,6 +2,8 @@ filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/caarlos0/env/v11 v11.2.2 h1:95fApNrUyueipoZN/EhA8mMxiNxrBwDa+oAZrMWl3Kg= +github.com/caarlos0/env/v11 v11.2.2/go.mod h1:JBfcdeQiBoI3Zh1QRAWfe+tpiNTmDtcCj/hHHHMx0vc= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= @@ -13,10 +15,12 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/emicklei/go-restful/v3 v3.12.1 h1:PJMDIM/ak7btuL8Ex0iYET9hxM3CI2sjZtzpL63nKAU= github.com/emicklei/go-restful/v3 v3.12.1/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= -github.com/fatih/color v1.17.0 h1:GlRw1BRJxkpqUCBKzKOw098ed57fEsKeNjpTe3cSjK4= -github.com/fatih/color v1.17.0/go.mod h1:YZ7TlrGPkiz6ku9fK3TLD/pl3CpsiFyu8N92HLgmosI= +github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM= +github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU= github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E= github.com/fxamacker/cbor/v2 v2.7.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ= +github.com/gabriel-vasile/mimetype v1.4.3 h1:in2uUcidCuFcDKtdcBxlR0rJ1+fsokWf+uqxgUFjbI0= +github.com/gabriel-vasile/mimetype v1.4.3/go.mod h1:d8uq/6HKRL6CGdk+aubisF/M5GcPfT7nKyLpA0lbSSk= github.com/go-co-op/gocron v1.37.0 h1:ZYDJGtQ4OMhTLKOKMIch+/CY70Brbb1dGdooLEhh7b0= github.com/go-co-op/gocron v1.37.0/go.mod h1:3L/n6BkO7ABj+TrfSVXLRzsP26zmikL4ISkLQ0O8iNY= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= @@ -27,18 +31,18 @@ github.com/go-openapi/jsonreference v0.21.0 h1:Rs+Y7hSXT83Jacb7kFyjn4ijOuVGSvOdF github.com/go-openapi/jsonreference v0.21.0/go.mod h1:LmZmgsrTkVg9LG4EaHeY8cBDslNPMo06cago5JNLkm4= github.com/go-openapi/swag v0.23.0 h1:vsEVJDUo2hPJ2tu0/Xc+4noaxyEffXNIs3cOULZ+GrE= github.com/go-openapi/swag v0.23.0/go.mod h1:esZ8ITTYEsH1V2trKHjAN8Ai7xHb8RV+YSZ577vPjgQ= -github.com/go-playground/locales v0.13.0 h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q= -github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8= -github.com/go-playground/universal-translator v0.17.0 h1:icxd5fm+REJzpZx7ZfpaD876Lmtgy7VtROAbHHXk8no= -github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA= -github.com/go-playground/validator/v10 v10.4.1 h1:pH2c5ADXtd66mxoE0Zm9SUhxE20r7aM3F26W0hOn+GE= -github.com/go-playground/validator/v10 v10.4.1/go.mod h1:nlOn6nFhuKACm19sB/8EGNn9GlaMV7XkbRSipzJ0Ii4= +github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA= +github.com/go-playground/locales v0.14.1/go.mod h1:hxrqLVvrK65+Rwrd5Fc6F2O76J/NuW9t0sjnWqG1slY= +github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY= +github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY= +github.com/go-playground/validator/v10 v10.22.1 h1:40JcKH+bBNGFczGuoBYgX4I6m/i27HYW8P9FDk5PbgA= +github.com/go-playground/validator/v10 v10.22.1/go.mod h1:dbuPbCMFw/DrkbEynArYaCwl3amGuJotoKCe95atGMM= github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= -github.com/goccy/go-yaml v1.12.0 h1:/1WHjnMsI1dlIBQutrvSMGZRQufVO3asrHfTwfACoPM= -github.com/goccy/go-yaml v1.12.0/go.mod h1:wKnAMd44+9JAAnGQpWVEgBzGt3YuTaQ4uXoHvE4m7WU= +github.com/goccy/go-yaml v1.13.0 h1:0Wtp0FZLd7Sm8gERmR9S6Iczzb3vItJj7NaHmFg8pTs= +github.com/goccy/go-yaml v1.13.0/go.mod h1:IjYwxUiJDoqpx2RmbdjMUceGHZwYLon3sfOGl5Hi9lc= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= @@ -56,8 +60,8 @@ github.com/google/pprof v0.0.0-20240727154555-813a5fbdbec8/go.mod h1:K1liHPHnj73 github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/icinga/icinga-go-library v0.0.0-20240524093614-7048f8f10123 h1:41AWPlHZGj6SaNEELAI9fgzNDNEZWxTsIH7mLd2sd/0= -github.com/icinga/icinga-go-library v0.0.0-20240524093614-7048f8f10123/go.mod h1:YN7XJN3W0FodD+j4kirO89zk2tgvanXWt1RMV8UgOLo= +github.com/icinga/icinga-go-library v0.3.2-0.20241118194934-1a19cd696d37 h1:b1xWtyFSPlCBTgP7sajx4fb+zecPh0LOXnXsJ7n3r9o= +github.com/icinga/icinga-go-library v0.3.2-0.20241118194934-1a19cd696d37/go.mod h1:zBVLixVxt+FIxOct/DDTBzbu02BlvPZ0Mhcq8t6ErxE= github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4= github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY= github.com/jessevdk/go-flags v1.6.1 h1:Cvu5U8UGrLay1rZfv/zP7iLpSHGUZ/Ou68T0iX1bBK4= @@ -81,8 +85,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y= -github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII= +github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= +github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= @@ -158,8 +162,8 @@ go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= -golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= +golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw= +golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U= golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 h1:e66Fs6Z+fZTbFBAxKfP3PALWBtpfqks2bwGcexMxgtk= golang.org/x/exp v0.0.0-20240909161429-701f63a606c0/go.mod h1:2TbTHSBQa924w8M6Xs1QcRcFwyucIwBGpK1p2f1YFFY= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= @@ -168,28 +172,28 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo= -golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0= +golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4= +golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU= golang.org/x/oauth2 v0.23.0 h1:PbgcYx2W7i4LvjJWEbf0ngHV6qJYr86PkAV3bXdLEbs= golang.org/x/oauth2 v0.23.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= -golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.9.0 h1:fEo0HyrW1GIgZdpbhCRO0PkJajUS5H9IFUztCgEo2jQ= +golang.org/x/sync v0.9.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= -golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/term v0.24.0 h1:Mh5cbb+Zk2hqqXNO7S1iTjEphVL+jb8ZWaqh/g+JWkM= -golang.org/x/term v0.24.0/go.mod h1:lOBK/LVxemqiMij05LGJ0tzNr8xlmwBRJ81PX6wVLH8= +golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= +golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.25.0 h1:WtHI/ltw4NvSUig5KARz9h521QvRC8RmF/cuYqifU24= +golang.org/x/term v0.25.0/go.mod h1:RPyXicDX+6vLxogjjRxjgD2TKtmAO6NZBsBRfrOLu7M= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= -golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM= +golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/time v0.6.0 h1:eTDhh4ZXt5Qf0augr54TN6suAUudPcawVZeIAPU7D4U= golang.org/x/time v0.6.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -202,8 +206,6 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da h1:noIWHXmPHxILtqtCOPIhSt0ABwskkZKjD3bXGnZGpNY= -golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90= google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/internal/cache/v1/multiplexers.go b/internal/cache/v1/multiplexers.go new file mode 100644 index 00000000..21ad7835 --- /dev/null +++ b/internal/cache/v1/multiplexers.go @@ -0,0 +1,148 @@ +package v1 + +import ( + "context" + "github.com/icinga/icinga-kubernetes/internal" + "golang.org/x/sync/errgroup" +) + +type EventsMultiplexer interface { + UpsertEvents() internal.ChannelMultiplexer[any] + DeleteEvents() internal.ChannelMultiplexer[any] + Run(context.Context) error +} + +type EventsMultiplexers interface { + DaemonSets() EventsMultiplexer + Deployments() EventsMultiplexer + Nodes() EventsMultiplexer + Pods() EventsMultiplexer + ReplicaSets() EventsMultiplexer + StatefulSets() EventsMultiplexer + Run(context.Context) error +} + +func Multiplexers() EventsMultiplexers { + return m +} + +type events struct { + upsertEvents internal.ChannelMultiplexer[any] + deleteEvents internal.ChannelMultiplexer[any] +} + +func (e events) UpsertEvents() internal.ChannelMultiplexer[any] { + return e.upsertEvents +} + +func (e events) DeleteEvents() internal.ChannelMultiplexer[any] { + return e.deleteEvents +} + +func (e events) Run(ctx context.Context) error { + g, ctx := errgroup.WithContext(ctx) + + g.Go(func() error { + return e.upsertEvents.Run(ctx) + }) + + g.Go(func() error { + return e.deleteEvents.Run(ctx) + }) + + return g.Wait() +} + +type multiplexers struct { + daemonSets events + deployments events + nodes events + pods events + replicaSets events + statefulSets events +} + +func (m multiplexers) DaemonSets() EventsMultiplexer { + return m.daemonSets +} + +func (m multiplexers) Deployments() EventsMultiplexer { + return m.deployments +} + +func (m multiplexers) Nodes() EventsMultiplexer { + return m.nodes +} + +func (m multiplexers) Pods() EventsMultiplexer { + return m.pods +} + +func (m multiplexers) ReplicaSets() EventsMultiplexer { + return m.replicaSets +} + +func (m multiplexers) StatefulSets() EventsMultiplexer { + return m.statefulSets +} + +func (m multiplexers) Run(ctx context.Context) error { + g, ctx := errgroup.WithContext(ctx) + + g.Go(func() error { + return m.daemonSets.Run(ctx) + }) + + g.Go(func() error { + return m.deployments.Run(ctx) + }) + + g.Go(func() error { + return m.nodes.Run(ctx) + }) + + g.Go(func() error { + return m.pods.Run(ctx) + }) + + g.Go(func() error { + return m.replicaSets.Run(ctx) + }) + + g.Go(func() error { + return m.statefulSets.Run(ctx) + }) + + return g.Wait() +} + +var m multiplexers + +func init() { + m = multiplexers{ + daemonSets: events{ + upsertEvents: internal.NewChannelMux[any](), + deleteEvents: internal.NewChannelMux[any](), + }, + deployments: events{ + upsertEvents: internal.NewChannelMux[any](), + deleteEvents: internal.NewChannelMux[any](), + }, + nodes: events{ + upsertEvents: internal.NewChannelMux[any](), + deleteEvents: internal.NewChannelMux[any](), + }, + pods: events{ + upsertEvents: internal.NewChannelMux[any](), + deleteEvents: internal.NewChannelMux[any](), + }, + replicaSets: events{ + upsertEvents: internal.NewChannelMux[any](), + deleteEvents: internal.NewChannelMux[any](), + }, + statefulSets: events{ + upsertEvents: internal.NewChannelMux[any](), + deleteEvents: internal.NewChannelMux[any](), + }, + } +} diff --git a/internal/channel_multiplexer.go b/internal/channel_multiplexer.go new file mode 100644 index 00000000..e72c5377 --- /dev/null +++ b/internal/channel_multiplexer.go @@ -0,0 +1,156 @@ +package internal + +import ( + "context" + "golang.org/x/sync/errgroup" + "sync/atomic" +) + +// ChannelMultiplexer is a multiplexer for channels of variable types. +// It fans out all input channels to all output channels. +type ChannelMultiplexer[T any] interface { + // In adds the given input channel reading. + In() chan<- T + + AddIn(chan T) + + // Out returns a new output channel that receives from all input channels. + Out() <-chan T + + // AddOut registers the given output channel to receive from all input channels. + AddOut(chan T) + + // Run starts multiplexing of all input channels to all output channels. + // Once run is called, can't be modified and will panic. + Run(context.Context) error +} + +// NewChannelMux returns a new ChannelMultiplexer initialized with at least one input channel. +func NewChannelMux[T any](inChannels ...chan T) ChannelMultiplexer[T] { + return &channelMultiplexer[T]{ + inAdded: inChannels, + } +} + +type channelMultiplexer[T any] struct { + in []chan T + inAdded []chan T + out []chan T + outAdded []chan T + started atomic.Bool +} + +func (mux *channelMultiplexer[T]) In() chan<- T { + if mux.started.Load() { + panic("channelMultiplexer already started") + } + + channel := make(chan T) + + mux.in = append(mux.in, channel) + + return channel +} + +func (mux *channelMultiplexer[T]) AddIn(channel chan T) { + if mux.started.Load() { + panic("channelMultiplexer already started") + } + + mux.inAdded = append(mux.inAdded, channel) +} + +func (mux *channelMultiplexer[T]) Out() <-chan T { + if mux.started.Load() { + panic("channelMultiplexer already started") + } + + channel := make(chan T) + mux.out = append(mux.out, channel) + + return channel +} + +func (mux *channelMultiplexer[T]) AddOut(channel chan T) { + if mux.started.Load() { + panic("channelMultiplexer already started") + } + + mux.outAdded = append(mux.outAdded, channel) +} + +func (mux *channelMultiplexer[T]) Run(ctx context.Context) error { + if mux.started.Swap(true) { + panic("channelMultiplexer already started") + } + + defer func() { + for _, channelToClose := range mux.in { + close(channelToClose) + } + + for _, channelToClose := range mux.out { + close(channelToClose) + } + }() + + if len(mux.in)+len(mux.inAdded) == 0 { + if len(mux.out)+len(mux.outAdded) > 0 { + panic("foobar") + } + + return nil + } + + g, ctx := errgroup.WithContext(ctx) + + sink := make(chan T) + defer close(sink) + + for _, ch := range mux.in { + ch := ch + + g.Go(func() error { + for { + select { + case spread, more := <-ch: + if !more { + return nil + } + select { + case sink <- spread: + case <-ctx.Done(): + return ctx.Err() + } + + case <-ctx.Done(): + return ctx.Err() + } + } + }) + } + + outs := append(mux.outAdded, mux.out...) + g.Go(func() error { + for { + select { + case spread, more := <-sink: + if !more { + return nil + } + + for _, ch := range outs { + select { + case ch <- spread: + case <-ctx.Done(): + return ctx.Err() + } + } + case <-ctx.Done(): + return ctx.Err() + } + } + }) + + return g.Wait() +} diff --git a/internal/notifications.go b/internal/notifications.go new file mode 100644 index 00000000..1ad9570e --- /dev/null +++ b/internal/notifications.go @@ -0,0 +1,108 @@ +package internal + +import ( + "context" + "fmt" + "github.com/icinga/icinga-go-library/database" + "github.com/icinga/icinga-go-library/types" + "github.com/icinga/icinga-kubernetes/pkg/notifications" + schemav1 "github.com/icinga/icinga-kubernetes/pkg/schema/v1" + "github.com/jmoiron/sqlx" + "github.com/pkg/errors" +) + +func SyncNotificationsConfig(ctx context.Context, db *database.DB, config *notifications.Config) error { + _true := types.Bool{Bool: true, Valid: true} + + if config.Url != "" { + toDb := []schemav1.Config{ + {Key: schemav1.ConfigKeyNotificationsUrl, Value: config.Url, Locked: _true}, + {Key: schemav1.ConfigKeyNotificationsUsername, Value: config.Username, Locked: _true}, + {Key: schemav1.ConfigKeyNotificationsPassword, Value: config.Password, Locked: _true}, + } + + err := db.ExecTx(ctx, func(ctx context.Context, tx *sqlx.Tx) error { + if kwebUrl := config.KubernetesWebUrl; kwebUrl != "" { + toDb = append(toDb, schemav1.Config{ + Key: schemav1.ConfigKeyNotificationsKubernetesWebUrl, + Value: kwebUrl, + Locked: _true, + }) + } else { + if err := tx.SelectContext(ctx, &config.KubernetesWebUrl, fmt.Sprintf( + `SELECT "%s" FROM "%s"`, + schemav1.ConfigKeyNotificationsKubernetesWebUrl, + database.TableName(schemav1.Config{})), + ); err != nil { + return errors.Wrap(err, "cannot select Icinga Notifications config") + } + } + + if _, err := tx.ExecContext( + ctx, + fmt.Sprintf( + `DELETE FROM "%s" WHERE "key" LIKE ? AND "locked" = ?`, + database.TableName(&schemav1.Config{}), + ), + `notifications.%`, + _true, + ); err != nil { + return errors.Wrap(err, "cannot delete Icinga Notifications config") + } + + stmt, _ := db.BuildInsertStmt(schemav1.Config{}) + if _, err := tx.NamedExecContext(ctx, stmt, toDb); err != nil { + return errors.Wrap(err, "cannot insert Icinga Notifications config") + } + + return nil + }) + if err != nil { + return errors.Wrap(err, "cannot upsert Icinga Notifications config") + } + } else { + err := db.ExecTx(ctx, func(ctx context.Context, tx *sqlx.Tx) error { + if _, err := tx.ExecContext( + ctx, + fmt.Sprintf( + `DELETE FROM "%s" WHERE "key" LIKE ? AND "locked" = ?`, + database.TableName(&schemav1.Config{}), + ), + `notifications.%`, + _true, + ); err != nil { + return errors.Wrap(err, "cannot delete Icinga Notifications config") + } + + rows, err := db.QueryxContext(ctx, db.BuildSelectStmt(&schemav1.Config{}, &schemav1.Config{})) + if err != nil { + return errors.Wrap(err, "cannot fetch Icinga Notifications config from DB") + } + + for rows.Next() { + var r schemav1.Config + if err := rows.StructScan(&r); err != nil { + return errors.Wrap(err, "cannot fetch Icinga Notifications config from DB") + } + + switch r.Key { + case schemav1.ConfigKeyNotificationsUrl: + config.Url = r.Value + case schemav1.ConfigKeyNotificationsUsername: + config.Username = r.Value + case schemav1.ConfigKeyNotificationsPassword: + config.Password = r.Value + case schemav1.ConfigKeyNotificationsKubernetesWebUrl: + config.KubernetesWebUrl = r.Value + } + } + + return nil + }) + if err != nil { + return errors.Wrap(err, "cannot upsert Icinga Notifications config") + } + } + + return nil +} diff --git a/pkg/notifications/client.go b/pkg/notifications/client.go index 2ff448e5..8c6b31eb 100644 --- a/pkg/notifications/client.go +++ b/pkg/notifications/client.go @@ -4,8 +4,6 @@ import ( "bytes" "context" "encoding/json" - "github.com/icinga/icinga-go-library/database" - "github.com/icinga/icinga-kubernetes/internal" "github.com/pkg/errors" "io" "k8s.io/klog/v2" @@ -13,53 +11,66 @@ import ( "net/url" ) -// Notifiable can be implemented by all k8s types that want to submit notification events to Icinga Notifications. -type Notifiable interface { - // GetNotificationsEvent returns the event data of this type that will be transmitted to Icinga Notifications. - GetNotificationsEvent(baseUrl *url.URL) map[string]any -} - type Client struct { - db *database.DB - client http.Client - Config + client http.Client + userAgent string + processEventUrl string + webUrl *url.URL } -func NewClient(db *database.DB, c Config) *Client { - return &Client{db: db, client: http.Client{}, Config: c} -} +func NewClient(name string, config Config) (*Client, error) { + baseUrl, err := url.Parse(config.Url) + if err != nil { + return nil, errors.Wrap(err, "unable to parse url") + } -func (c *Client) ProcessEvent(notifiable Notifiable) error { - baseUrl, err := url.Parse(c.Config.KubernetesWebUrl) + webUrl, err := url.Parse(config.KubernetesWebUrl) if err != nil { - return errors.Wrapf(err, "cannot parse Icinga for Kubernetes Web URL: %q", c.Config.KubernetesWebUrl) + return nil, errors.Wrap(err, "unable to parse web url") } - body, err := json.Marshal(notifiable.GetNotificationsEvent(baseUrl)) + return &Client{ + client: http.Client{ + Transport: &basicAuthTransport{ + RoundTripper: http.DefaultTransport, + username: config.Username, + password: config.Password, + }, + }, + userAgent: name, + processEventUrl: baseUrl.ResolveReference(&url.URL{Path: "/process-event"}).String(), + webUrl: webUrl, + }, nil +} + +func (c *Client) ProcessEvent(ctx context.Context, event Marshaler) error { + e, _ := event.MarshalEvent() + e.URL = c.webUrl.ResolveReference(e.URL) + + body, err := json.Marshal(e) if err != nil { - return errors.Wrapf(err, "cannot marshal notifications event data of type: %T", notifiable) + return errors.Wrapf(err, "cannot marshal notifications event data of type: %T", e) } - r, err := http.NewRequest(http.MethodPost, c.Config.Url+"/process-event", bytes.NewBuffer(body)) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.processEventUrl, bytes.NewReader(body)) if err != nil { return errors.Wrap(err, "cannot create new notifications http request") } - r.SetBasicAuth(c.Config.Username, c.Config.Password) - r.Header.Set("User-Agent", "icinga-kubernetes/"+internal.Version.Version) - r.Header.Add("Content-Type", "application/json") + req.Header.Add("Content-Type", "application/json") - res, err := c.client.Do(r) + res, err := c.client.Do(req) if err != nil { return errors.Wrap(err, "cannot send notifications event") } + defer func() { - _, _ = io.Copy(io.Discard, res.Body) _ = res.Body.Close() }() - if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusAlreadyReported { - return errors.Errorf("received unexpected http status code from Icinga Notifications: %d", res.StatusCode) + if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusNotAcceptable { + _, msg := io.ReadAll(res.Body) + return errors.Errorf("received unexpected http status code from Icinga Notifications: %d: %s", res.StatusCode, msg) } return nil @@ -74,7 +85,7 @@ func (c *Client) Stream(ctx context.Context, entities <-chan any) error { return nil } - if err := c.ProcessEvent(entity.(Notifiable)); err != nil { + if err := c.ProcessEvent(ctx, entity.(Marshaler)); err != nil { klog.Error(err) } case <-ctx.Done(): @@ -82,3 +93,15 @@ func (c *Client) Stream(ctx context.Context, entities <-chan any) error { } } } + +type basicAuthTransport struct { + http.RoundTripper + username string + password string +} + +func (t *basicAuthTransport) RoundTrip(req *http.Request) (*http.Response, error) { + req.SetBasicAuth(t.username, t.password) + + return t.RoundTripper.RoundTrip(req) +} diff --git a/pkg/notifications/config.go b/pkg/notifications/config.go index 8ea28f90..301c0993 100644 --- a/pkg/notifications/config.go +++ b/pkg/notifications/config.go @@ -3,36 +3,39 @@ package notifications import ( "github.com/pkg/errors" "net/url" - "strings" + "regexp" ) type Config struct { + // If URL is the empty string, notifications are disabled. Url string `yaml:"url"` Username string `yaml:"username"` Password string `yaml:"password"` KubernetesWebUrl string `yaml:"kubernetes_web_url" default:"http://localhost/icingaweb2/kubernetes"` } -// Validate implements the config.Validator interface. +// Validate checks constraints in the supplied configuration and returns an error if they are violated. func (c *Config) Validate() error { - if (c.Username == "") != (c.Password == "") { - return errors.New("'username' must be set, if password is provided and vice versa") - } - if c.Username != "" { - // Since Icinga Notifications does not yet support basic HTTP authentication with a simple user and password, - // we have to use a static “username” consisting of `source-` and the actual source ID for the time being. - // See https://github.com/Icinga/icinga-notifications/issues/227 - parts := strings.Split(c.Username, "-") - if len(parts) != 2 || parts[0] != "source" { - return errors.New("'username' must be of the form '-'") + if c.Url != "" || c.Username != "" || c.Password != "" { + if c.Url == "" || c.Username == "" || c.Password == "" { + return errors.New("if one of 'url', 'username', or 'password' is set, all must be set") + } + + usernameValid, err := regexp.MatchString(`^source-\d+$`, c.Username) + if err != nil { + return errors.WithStack(err) + } + if !usernameValid { + return errors.New("'username' must be of the form 'source-'") + } + + if _, err := url.Parse(c.Url); err != nil { + return errors.Wrap(err, "'url' invalid") } - } - if c.Url == "" && c.Username != "" { - return errors.New("Icinga Notifications base 'url' must be provided, if username and password are set") } if _, err := url.Parse(c.KubernetesWebUrl); err != nil { - return errors.Wrapf(err, "cannot parse Icinga for Kubernetes Web base URL: %q", c.KubernetesWebUrl) + return errors.Wrap(err, "'kubernetes_web_url' invalid") } return nil diff --git a/pkg/notifications/contracts.go b/pkg/notifications/contracts.go new file mode 100644 index 00000000..62a24f76 --- /dev/null +++ b/pkg/notifications/contracts.go @@ -0,0 +1,7 @@ +package notifications + +// Marshaler is the interface implemented by types that +// can marshal themselves into valid notification events. +type Marshaler interface { + MarshalEvent() (Event, error) +} diff --git a/pkg/notifications/event.go b/pkg/notifications/event.go new file mode 100644 index 00000000..c2762a97 --- /dev/null +++ b/pkg/notifications/event.go @@ -0,0 +1,33 @@ +package notifications + +import ( + "encoding/json" + "net/url" +) + +type Event struct { + Name string + Severity string + Message string + URL *url.URL + Tags map[string]string + ExtraTags map[string]string +} + +func (e Event) MarshalJSON() ([]byte, error) { + return json.Marshal(struct { + Name string `json:"name"` + Severity string `json:"severity"` + Message string `json:"message"` + URL string `json:"json"` + Tags map[string]string `json:"tags"` + ExtraTags map[string]string `json:"extra_tags"` + }{ + Name: e.Name, + Severity: e.Severity, + Message: e.Message, + URL: e.URL.String(), + Tags: e.Tags, + ExtraTags: e.ExtraTags, + }) +} diff --git a/pkg/notifications/notifications.go b/pkg/notifications/notifications.go deleted file mode 100644 index 63082045..00000000 --- a/pkg/notifications/notifications.go +++ /dev/null @@ -1,86 +0,0 @@ -package notifications - -import ( - "context" - "fmt" - "github.com/icinga/icinga-go-library/database" - schemav1 "github.com/icinga/icinga-kubernetes/pkg/schema/v1" - "github.com/pkg/errors" -) - -// SyncSourceConfig synchronises the Icinga Notifications credentials from the YAML config to the database. -func SyncSourceConfig(ctx context.Context, db *database.DB, config *Config) error { - var configPairs []*schemav1.Config - - if config.Url != "" { - configPairs = []*schemav1.Config{ - {Key: schemav1.ConfigKeyNotificationsUrl, Value: config.Url}, - {Key: schemav1.ConfigKeyNotificationsKubernetesWebUrl, Value: config.KubernetesWebUrl}, - {Key: schemav1.ConfigKeyNotificationsUsername, Value: config.Username}, - {Key: schemav1.ConfigKeyNotificationsPassword, Value: config.Password}, - {Key: schemav1.ConfigKeyNotificationsLocked, Value: "true"}, - } - - stmt := fmt.Sprintf( - `DELETE FROM %s WHERE %s IN (?)`, - database.TableName(&schemav1.Config{}), - "`key`", - ) - - if _, err := db.ExecContext(ctx, stmt, schemav1.ConfigKeyNotificationsSourceID); err != nil { - return errors.Wrap(err, "cannot delete Icinga Notifications credentials") - } - } else { - configPairs = []*schemav1.Config{ - {Key: schemav1.ConfigKeyNotificationsLocked, Value: "false"}, - } - } - - stmt, _ := db.BuildUpsertStmt(&schemav1.Config{}) - if _, err := db.NamedExecContext(ctx, stmt, configPairs); err != nil { - return errors.Wrap(err, "cannot upsert Icinga Notifications credentials") - } - - return nil -} - -// RetrieveConfig retrieves the Icinga Notifications config from the database. The username is "source-". -func RetrieveConfig(ctx context.Context, db *database.DB, config *Config) error { - var dbConfig []*schemav1.Config - if err := db.SelectContext(ctx, &dbConfig, db.BuildSelectStmt(&schemav1.Config{}, &schemav1.Config{})); err != nil { - return errors.Wrap(err, "cannot fetch Icinga Notifications config from DB") - } - - var locked bool - - for _, pair := range dbConfig { - if pair.Key == schemav1.ConfigKeyNotificationsLocked { - if pair.Value == "true" { - locked = true - } else { - locked = false - } - } - } - - for _, pair := range dbConfig { - switch pair.Key { - case schemav1.ConfigKeyNotificationsUrl: - config.Url = pair.Value - case schemav1.ConfigKeyNotificationsKubernetesWebUrl: - config.KubernetesWebUrl = pair.Value - case schemav1.ConfigKeyNotificationsPassword: - config.Password = pair.Value - case schemav1.ConfigKeyNotificationsSourceID: - if !locked { - config.Username = "source-" + pair.Value - } - case schemav1.ConfigKeyNotificationsUsername: - if locked { - config.Username = pair.Value - } - } - } - - return nil -} diff --git a/pkg/schema/v1/config.go b/pkg/schema/v1/config.go index a2277c36..5a6f853b 100644 --- a/pkg/schema/v1/config.go +++ b/pkg/schema/v1/config.go @@ -1,19 +1,20 @@ package v1 +import "github.com/icinga/icinga-go-library/types" + // Config represents a single key => value pair database config entry. type Config struct { - Key ConfigKey - Value string + Key ConfigKey + Value string + Locked types.Bool } // ConfigKey represents the database config.Key enums. type ConfigKey string const ( - ConfigKeyNotificationsSourceID ConfigKey = "notifications.source_id" ConfigKeyNotificationsUsername ConfigKey = "notifications.username" ConfigKeyNotificationsPassword ConfigKey = "notifications.password" ConfigKeyNotificationsUrl ConfigKey = "notifications.url" ConfigKeyNotificationsKubernetesWebUrl ConfigKey = "notifications.kubernetes_web_url" - ConfigKeyNotificationsLocked ConfigKey = "notifications.locked" ) diff --git a/pkg/schema/v1/daemon_set.go b/pkg/schema/v1/daemon_set.go index a16a13ed..72c2cdd8 100644 --- a/pkg/schema/v1/daemon_set.go +++ b/pkg/schema/v1/daemon_set.go @@ -5,6 +5,7 @@ import ( "github.com/icinga/icinga-go-library/strcase" "github.com/icinga/icinga-go-library/types" "github.com/icinga/icinga-kubernetes/pkg/database" + "github.com/icinga/icinga-kubernetes/pkg/notifications" kappsv1 "k8s.io/api/apps/v1" kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kruntime "k8s.io/apimachinery/pkg/runtime" @@ -155,23 +156,19 @@ func (d *DaemonSet) Obtain(k8s kmetav1.Object) { d.Yaml = string(output) } -// GetNotificationsEvent implements the notifications.Notifiable interface. -func (d *DaemonSet) GetNotificationsEvent(baseUrl *url.URL) map[string]any { - daemonSetUrl := baseUrl.JoinPath("/daemonset") - daemonSetUrl.RawQuery = fmt.Sprintf("id=%s", d.Uuid) - - return map[string]any{ - "name": d.Namespace + "/" + d.Name, - "severity": d.IcingaState.ToSeverity(), - "message": d.IcingaStateReason, - "url": daemonSetUrl.String(), - "tags": map[string]any{ +func (d *DaemonSet) MarshalEvent() (notifications.Event, error) { + return notifications.Event{ + Name: d.Namespace + "/" + d.Name, + Severity: d.IcingaState.ToSeverity(), + Message: d.IcingaStateReason, + URL: &url.URL{Path: "/daemonset", RawQuery: fmt.Sprintf("id=%s", d.Uuid)}, + Tags: map[string]string{ + "uuid": d.Uuid.String(), "name": d.Name, "namespace": d.Namespace, - "uuid": d.Uuid.String(), "resource": "daemon_set", }, - } + }, nil } func (d *DaemonSet) getIcingaState() (IcingaState, string) { diff --git a/pkg/schema/v1/deployment.go b/pkg/schema/v1/deployment.go index b5e664d3..f79fc5b1 100644 --- a/pkg/schema/v1/deployment.go +++ b/pkg/schema/v1/deployment.go @@ -5,6 +5,7 @@ import ( "github.com/icinga/icinga-go-library/strcase" "github.com/icinga/icinga-go-library/types" "github.com/icinga/icinga-kubernetes/pkg/database" + "github.com/icinga/icinga-kubernetes/pkg/notifications" kappsv1 "k8s.io/api/apps/v1" kcorev1 "k8s.io/api/core/v1" kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -167,23 +168,19 @@ func (d *Deployment) Obtain(k8s kmetav1.Object) { d.Yaml = string(output) } -// GetNotificationsEvent implements the notifications.Notifiable interface. -func (d *Deployment) GetNotificationsEvent(baseUrl *url.URL) map[string]any { - deploymentUrl := baseUrl.JoinPath("/deployment") - deploymentUrl.RawQuery = fmt.Sprintf("id=%s", d.Uuid) - - return map[string]any{ - "name": d.Namespace + "/" + d.Name, - "severity": d.IcingaState.ToSeverity(), - "message": d.IcingaStateReason, - "url": deploymentUrl.String(), - "tags": map[string]any{ +func (d *Deployment) MarshalEvent() (notifications.Event, error) { + return notifications.Event{ + Name: d.Namespace + "/" + d.Name, + Severity: d.IcingaState.ToSeverity(), + Message: d.IcingaStateReason, + URL: &url.URL{Path: "/deployment", RawQuery: fmt.Sprintf("id=%s", d.Uuid)}, + Tags: map[string]string{ + "uuid": d.Uuid.String(), "name": d.Name, "namespace": d.Namespace, - "uuid": d.Uuid.String(), "resource": "deployment", }, - } + }, nil } func (d *Deployment) getIcingaState() (IcingaState, string) { diff --git a/pkg/schema/v1/node.go b/pkg/schema/v1/node.go index 15eaa167..3b99256a 100644 --- a/pkg/schema/v1/node.go +++ b/pkg/schema/v1/node.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/icinga/icinga-go-library/types" "github.com/icinga/icinga-kubernetes/pkg/database" + "github.com/icinga/icinga-kubernetes/pkg/notifications" "github.com/pkg/errors" kcorev1 "k8s.io/api/core/v1" kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -191,33 +192,29 @@ func (n *Node) Obtain(k8s kmetav1.Object) { } } -// GetNotificationsEvent implements the notifications.Notifiable interface. -func (n *Node) GetNotificationsEvent(baseUrl *url.URL) map[string]any { - nodeUrl := baseUrl.JoinPath("/node") - nodeUrl.RawQuery = fmt.Sprintf("id=%s", n.Uuid) - - return map[string]any{ - "name": n.Name, - "severity": n.IcingaState.ToSeverity(), - "message": n.IcingaStateReason, - "url": nodeUrl.String(), - "tags": map[string]any{ +func (n *Node) MarshalEvent() (notifications.Event, error) { + return notifications.Event{ + Name: n.Namespace + "/" + n.Name, + Severity: n.IcingaState.ToSeverity(), + Message: n.IcingaStateReason, + URL: &url.URL{Path: "/node", RawQuery: fmt.Sprintf("id=%s", n.Uuid)}, + Tags: map[string]string{ + "uuid": n.Uuid.String(), "name": n.Name, "namespace": n.Namespace, - "uuid": n.Uuid.String(), "resource": "node", }, - } + }, nil } func (n *Node) getIcingaState(node *kcorev1.Node) (IcingaState, string) { - //if node.Status.Phase == kcorev1.NodePending { + // if node.Status.Phase == kcorev1.NodePending { // return Pending, fmt.Sprintf("Node %s is pending.", node.Name) - //} + // } // - //if node.Status.Phase == kcorev1.NodeTerminated { + // if node.Status.Phase == kcorev1.NodeTerminated { // return Ok, fmt.Sprintf("Node %s is terminated.", node.Name) - //} + // } var state IcingaState var reason []string diff --git a/pkg/schema/v1/pod.go b/pkg/schema/v1/pod.go index 6020fd45..93128630 100644 --- a/pkg/schema/v1/pod.go +++ b/pkg/schema/v1/pod.go @@ -6,6 +6,7 @@ import ( "github.com/icinga/icinga-go-library/strcase" "github.com/icinga/icinga-go-library/types" "github.com/icinga/icinga-kubernetes/pkg/database" + "github.com/icinga/icinga-kubernetes/pkg/notifications" kcorev1 "k8s.io/api/core/v1" kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kruntime "k8s.io/apimachinery/pkg/runtime" @@ -277,23 +278,19 @@ func (p *Pod) Obtain(k8s kmetav1.Object) { p.Yaml = string(output) } -// GetNotificationsEvent implements the notifications.Notifiable interface. -func (p *Pod) GetNotificationsEvent(baseUrl *url.URL) map[string]any { - podUrl := baseUrl.JoinPath("/pod") - podUrl.RawQuery = fmt.Sprintf("id=%s", p.Uuid) - - return map[string]any{ - "name": p.Namespace + "/" + p.Name, - "severity": p.IcingaState.ToSeverity(), - "message": p.IcingaStateReason, - "url": podUrl.String(), - "tags": map[string]any{ +func (p *Pod) MarshalEvent() (notifications.Event, error) { + return notifications.Event{ + Name: p.Namespace + "/" + p.Name, + Severity: p.IcingaState.ToSeverity(), + Message: p.IcingaStateReason, + URL: &url.URL{Path: "/pod", RawQuery: fmt.Sprintf("id=%s", p.Uuid)}, + Tags: map[string]string{ + "uuid": p.Uuid.String(), "name": p.Name, "namespace": p.Namespace, - "uuid": p.Uuid.String(), "resource": "pod", }, - } + }, nil } func (p *Pod) getIcingaState(pod *kcorev1.Pod) (IcingaState, string) { diff --git a/pkg/schema/v1/replica_set.go b/pkg/schema/v1/replica_set.go index 53340109..48765367 100644 --- a/pkg/schema/v1/replica_set.go +++ b/pkg/schema/v1/replica_set.go @@ -5,6 +5,7 @@ import ( "github.com/icinga/icinga-go-library/strcase" "github.com/icinga/icinga-go-library/types" "github.com/icinga/icinga-kubernetes/pkg/database" + "github.com/icinga/icinga-kubernetes/pkg/notifications" kappsv1 "k8s.io/api/apps/v1" kcorev1 "k8s.io/api/core/v1" kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -154,23 +155,19 @@ func (r *ReplicaSet) Obtain(k8s kmetav1.Object) { r.Yaml = string(output) } -// GetNotificationsEvent implements the notifications.Notifiable interface. -func (r *ReplicaSet) GetNotificationsEvent(baseUrl *url.URL) map[string]any { - replicaSetUrl := baseUrl.JoinPath("/replicaset") - replicaSetUrl.RawQuery = fmt.Sprintf("id=%s", r.Uuid) - - return map[string]any{ - "name": r.Namespace + "/" + r.Name, - "severity": r.IcingaState.ToSeverity(), - "message": r.IcingaStateReason, - "url": replicaSetUrl.String(), - "tags": map[string]any{ +func (r *ReplicaSet) MarshalEvent() (notifications.Event, error) { + return notifications.Event{ + Name: r.Namespace + "/" + r.Name, + Severity: r.IcingaState.ToSeverity(), + Message: r.IcingaStateReason, + URL: &url.URL{Path: "/replicaset", RawQuery: fmt.Sprintf("id=%s", r.Uuid)}, + Tags: map[string]string{ + "uuid": r.Uuid.String(), "name": r.Name, "namespace": r.Namespace, - "uuid": r.Uuid.String(), "resource": "replica_set", }, - } + }, nil } func (r *ReplicaSet) getIcingaState() (IcingaState, string) { diff --git a/pkg/schema/v1/stateful_set.go b/pkg/schema/v1/stateful_set.go index 9981f480..10ea0d3e 100644 --- a/pkg/schema/v1/stateful_set.go +++ b/pkg/schema/v1/stateful_set.go @@ -5,6 +5,7 @@ import ( "github.com/icinga/icinga-go-library/strcase" "github.com/icinga/icinga-go-library/types" "github.com/icinga/icinga-kubernetes/pkg/database" + "github.com/icinga/icinga-kubernetes/pkg/notifications" kappsv1 "k8s.io/api/apps/v1" kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kruntime "k8s.io/apimachinery/pkg/runtime" @@ -179,23 +180,19 @@ func (s *StatefulSet) Obtain(k8s kmetav1.Object) { s.Yaml = string(output) } -// GetNotificationsEvent implements the notifications.Notifiable interface. -func (s *StatefulSet) GetNotificationsEvent(baseUrl *url.URL) map[string]any { - statefulSetUrl := baseUrl.JoinPath("/statefulset") - statefulSetUrl.RawQuery = fmt.Sprintf("id=%s", s.Uuid) - - return map[string]any{ - "name": s.Namespace + "/" + s.Name, - "severity": s.IcingaState.ToSeverity(), - "message": s.IcingaStateReason, - "url": statefulSetUrl.String(), - "tags": map[string]any{ +func (s *StatefulSet) MarshalEvent() (notifications.Event, error) { + return notifications.Event{ + Name: s.Namespace + "/" + s.Name, + Severity: s.IcingaState.ToSeverity(), + Message: s.IcingaStateReason, + URL: &url.URL{Path: "/statefulset", RawQuery: fmt.Sprintf("id=%s", s.Uuid)}, + Tags: map[string]string{ + "uuid": s.Uuid.String(), "name": s.Name, "namespace": s.Namespace, - "uuid": s.Uuid.String(), "resource": "stateful_set", }, - } + }, nil } func (s *StatefulSet) getIcingaState() (IcingaState, string) { diff --git a/pkg/sync/controller.go b/pkg/sync/v1/controller.go similarity index 99% rename from pkg/sync/controller.go rename to pkg/sync/v1/controller.go index 2d617888..898d087c 100644 --- a/pkg/sync/controller.go +++ b/pkg/sync/v1/controller.go @@ -1,4 +1,4 @@ -package sync +package v1 import ( "context" diff --git a/pkg/sync/event_handler.go b/pkg/sync/v1/event_handler.go similarity index 99% rename from pkg/sync/event_handler.go rename to pkg/sync/v1/event_handler.go index 4e20e3a9..a4f51c77 100644 --- a/pkg/sync/event_handler.go +++ b/pkg/sync/v1/event_handler.go @@ -1,4 +1,4 @@ -package sync +package v1 import ( "fmt" diff --git a/pkg/sync/features.go b/pkg/sync/v1/features.go similarity index 98% rename from pkg/sync/features.go rename to pkg/sync/v1/features.go index 0112d465..73749d91 100644 --- a/pkg/sync/features.go +++ b/pkg/sync/v1/features.go @@ -1,4 +1,4 @@ -package sync +package v1 import "github.com/icinga/icinga-kubernetes/pkg/com" diff --git a/pkg/sync/sink.go b/pkg/sync/v1/sink.go similarity index 99% rename from pkg/sync/sink.go rename to pkg/sync/v1/sink.go index f63e7a60..1c3ee9fb 100644 --- a/pkg/sync/sink.go +++ b/pkg/sync/v1/sink.go @@ -1,4 +1,4 @@ -package sync +package v1 import ( "context" diff --git a/pkg/sync/v1/sync.go b/pkg/sync/v1/sync.go index fa5c81f0..f1818cf6 100644 --- a/pkg/sync/v1/sync.go +++ b/pkg/sync/v1/sync.go @@ -6,7 +6,6 @@ import ( "github.com/icinga/icinga-kubernetes/pkg/com" "github.com/icinga/icinga-kubernetes/pkg/database" schemav1 "github.com/icinga/icinga-kubernetes/pkg/schema/v1" - "github.com/icinga/icinga-kubernetes/pkg/sync" "golang.org/x/sync/errgroup" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/tools/cache" @@ -33,10 +32,10 @@ func NewSync( } } -func (s *Sync) Run(ctx context.Context, features ...sync.Feature) error { - controller := sync.NewController(s.informer, s.log.WithName("controller")) +func (s *Sync) Run(ctx context.Context, features ...Feature) error { + controller := NewController(s.informer, s.log.WithName("controller")) - with := sync.NewFeatures(features...) + with := NewFeatures(features...) if !with.NoWarmup() { if err := s.warmup(ctx, controller); err != nil { @@ -47,7 +46,7 @@ func (s *Sync) Run(ctx context.Context, features ...sync.Feature) error { return s.sync(ctx, controller, features...) } -func (s *Sync) warmup(ctx context.Context, c *sync.Controller) error { +func (s *Sync) warmup(ctx context.Context, c *Controller) error { g, ctx := errgroup.WithContext(ctx) entities, errs := s.db.YieldAll(ctx, func() (interface{}, error) { @@ -78,8 +77,8 @@ func (s *Sync) warmup(ctx context.Context, c *sync.Controller) error { return g.Wait() } -func (s *Sync) sync(ctx context.Context, c *sync.Controller, features ...sync.Feature) error { - sink := sync.NewSink(func(i *sync.Item) interface{} { +func (s *Sync) sync(ctx context.Context, c *Controller, features ...Feature) error { + sink := NewSink(func(i *Item) interface{} { entity := s.factory() entity.Obtain(*i.Item) @@ -88,7 +87,7 @@ func (s *Sync) sync(ctx context.Context, c *sync.Controller, features ...sync.Fe return k }) - with := sync.NewFeatures(features...) + with := NewFeatures(features...) g, ctx := errgroup.WithContext(ctx) g.Go(func() error { diff --git a/schema/mysql/schema.sql b/schema/mysql/schema.sql index 48fdf52c..2672fd43 100644 --- a/schema/mysql/schema.sql +++ b/schema/mysql/schema.sql @@ -949,17 +949,16 @@ CREATE TABLE kubernetes_instance ( ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin ROW_FORMAT=DYNAMIC; CREATE TABLE config ( - `key` enum( - 'notifications.username', - 'notifications.password', - 'notifications.source_id', - 'notifications.url', - 'notifications.kubernetes_web_url', - 'notifications.locked' - ) COLLATE utf8mb4_unicode_ci NOT NULL, - value varchar(255) NOT NULL, - - PRIMARY KEY (`key`) + `key` enum( + 'notifications.url', + 'notifications.username', + 'notifications.password', + 'notifications.kubernetes_web_url' + ) COLLATE utf8mb4_unicode_ci NOT NULL, + value varchar(255) NOT NULL, + locked enum('n', 'y') COLLATE utf8mb4_unicode_ci NOT NULL, + + PRIMARY KEY (`key`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE TABLE kubernetes_schema (