From 2a378e3f22909f60b63bfcad2cd12bd90f9fbab5 Mon Sep 17 00:00:00 2001 From: Johannes Rauh Date: Wed, 18 Dec 2024 14:06:20 +0100 Subject: [PATCH] Sync Prometheus config like for Notifications --- cmd/icinga-kubernetes/main.go | 9 +- internal/prometheus.go | 150 ++++++++++++++++++++++++++++++++++ pkg/metrics/config.go | 142 -------------------------------- 3 files changed, 155 insertions(+), 146 deletions(-) create mode 100644 internal/prometheus.go diff --git a/cmd/icinga-kubernetes/main.go b/cmd/icinga-kubernetes/main.go index 86d3ee0..06e50d8 100644 --- a/cmd/icinga-kubernetes/main.go +++ b/cmd/icinga-kubernetes/main.go @@ -271,13 +271,13 @@ func main() { }) } - err = metrics.SyncPrometheusConfig(ctx, db2, &cfg.Prometheus) + err = internal.SyncPrometheusConfig(ctx, db2, &cfg.Prometheus) if err != nil { klog.Error(errors.Wrap(err, "cannot sync prometheus config")) } if cfg.Prometheus.Url == "" { - err = metrics.AutoDetectPrometheus(ctx, clientset, &cfg.Prometheus) + err = internal.AutoDetectPrometheus(ctx, clientset, &cfg.Prometheus) if err != nil { klog.Error(errors.Wrap(err, "cannot auto-detect prometheus")) } @@ -288,8 +288,9 @@ func main() { if cfg.Prometheus.Username != "" && cfg.Prometheus.Password != "" { basicAuthTransport = &com.BasicAuthTransport{ - Username: cfg.Prometheus.Username, - Password: cfg.Prometheus.Password, + RoundTripper: http.DefaultTransport, + Username: cfg.Prometheus.Username, + Password: cfg.Prometheus.Password, } } diff --git a/internal/prometheus.go b/internal/prometheus.go new file mode 100644 index 0000000..ff2c9f4 --- /dev/null +++ b/internal/prometheus.go @@ -0,0 +1,150 @@ +package internal + +import ( + "context" + "fmt" + "github.com/icinga/icinga-go-library/database" + "github.com/icinga/icinga-go-library/types" + "github.com/icinga/icinga-kubernetes/pkg/metrics" + schemav1 "github.com/icinga/icinga-kubernetes/pkg/schema/v1" + "github.com/jmoiron/sqlx" + "github.com/pkg/errors" + v1 "k8s.io/api/core/v1" + kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "strings" +) + +func SyncPrometheusConfig(ctx context.Context, db *database.DB, config *metrics.PrometheusConfig) error { + _true := types.Bool{Bool: true, Valid: true} + + if config.Url != "" { + toDb := []schemav1.Config{ + {Key: schemav1.ConfigKeyPrometheusUrl, Value: config.Url, Locked: _true}, + } + + if config.Username != "" { + toDb = append( + toDb, + schemav1.Config{Key: schemav1.ConfigKeyPrometheusUsername, Value: config.Username, Locked: _true}, + schemav1.Config{Key: schemav1.ConfigKeyPrometheusPassword, Value: config.Password, Locked: _true}, + ) + } + + err := db.ExecTx(ctx, func(ctx context.Context, tx *sqlx.Tx) error { + if _, err := tx.ExecContext( + ctx, + fmt.Sprintf( + `DELETE FROM "%s" WHERE "key" LIKE ? AND "locked" = ?`, + database.TableName(&schemav1.Config{}), + ), + `prometheus.%`, + _true, + ); err != nil { + return errors.Wrap(err, "cannot delete Prometheus config") + } + + stmt, _ := db.BuildInsertStmt(schemav1.Config{}) + if _, err := tx.NamedExecContext(ctx, stmt, toDb); err != nil { + return errors.Wrap(err, "cannot insert Prometheus config") + } + + return nil + }) + if err != nil { + return errors.Wrap(err, "cannot upsert Prometheus config") + } + } else { + err := db.ExecTx(ctx, func(ctx context.Context, tx *sqlx.Tx) error { + if _, err := tx.ExecContext( + ctx, + fmt.Sprintf( + `DELETE FROM "%s" WHERE "key" LIKE ? AND "locked" = ?`, + database.TableName(&schemav1.Config{}), + ), + `prometheus.%`, + _true, + ); err != nil { + return errors.Wrap(err, "cannot delete Prometheus config") + } + + rows, err := tx.QueryxContext(ctx, db.BuildSelectStmt(&schemav1.Config{}, &schemav1.Config{})) + if err != nil { + return errors.Wrap(err, "cannot fetch Prometheus config from DB") + } + + for rows.Next() { + var r schemav1.Config + if err := rows.StructScan(&r); err != nil { + return errors.Wrap(err, "cannot fetch Prometheus config from DB") + } + + switch r.Key { + case schemav1.ConfigKeyPrometheusUrl: + config.Url = r.Value + case schemav1.ConfigKeyPrometheusUsername: + config.Username = r.Value + case schemav1.ConfigKeyPrometheusPassword: + config.Password = r.Value + } + } + + return nil + }) + if err != nil { + return errors.Wrap(err, "cannot retrieve Prometheus config") + } + } + + return nil +} + +// AutoDetectPrometheus tries to auto-detect the Prometheus service in the monitoring namespace and +// if found sets the URL in the supplied Prometheus configuration. The first service with the label +// "app.kubernetes.io/name=prometheus" is used. Until now the ServiceTypes ClusterIP and NodePort are supported. +func AutoDetectPrometheus(ctx context.Context, clientset *kubernetes.Clientset, config *metrics.PrometheusConfig) error { + services, err := clientset.CoreV1().Services("monitoring").List(ctx, kmetav1.ListOptions{ + LabelSelector: "app.kubernetes.io/name=prometheus", + }) + if err != nil { + return errors.Wrap(err, "cannot list Prometheus services") + } + + if len(services.Items) == 0 { + return errors.New("no Prometheus service found") + } + + var ip string + var port int32 + + // Check if we are running in a Kubernetes cluster. If so, use the + // service's ClusterIP. Otherwise, use the API Server's IP and NodePort. + if _, err = rest.InClusterConfig(); err == nil { + for _, service := range services.Items { + if service.Spec.Type == v1.ServiceTypeClusterIP { + ip = service.Spec.ClusterIP + port = service.Spec.Ports[0].Port + + break + } + } + } else if errors.Is(err, rest.ErrNotInCluster) { + for _, service := range services.Items { + if service.Spec.Type == v1.ServiceTypeNodePort { + ip = strings.Split(clientset.RESTClient().Get().URL().Host, ":")[0] + port = service.Spec.Ports[0].NodePort + + break + } + } + } + + if ip == "" { + + } + + config.Url = fmt.Sprintf("http://%s:%d", ip, port) + + return nil +} diff --git a/pkg/metrics/config.go b/pkg/metrics/config.go index b6aafb7..41fae6b 100644 --- a/pkg/metrics/config.go +++ b/pkg/metrics/config.go @@ -1,17 +1,7 @@ package metrics import ( - "context" - "database/sql" - "fmt" - "github.com/icinga/icinga-go-library/database" - "github.com/icinga/icinga-go-library/types" - schemav1 "github.com/icinga/icinga-kubernetes/pkg/schema/v1" "github.com/pkg/errors" - kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" - "strings" ) // PrometheusConfig defines Prometheus configuration. @@ -31,135 +21,3 @@ func (c *PrometheusConfig) Validate() error { } return nil } - -func SyncPrometheusConfig(ctx context.Context, db *database.DB, config *PrometheusConfig) error { - _true := types.Bool{Bool: true, Valid: true} - - var configPairs []*schemav1.Config - var deleteKeys []schemav1.ConfigKey - - tx, err := db.BeginTxx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable}) - if err != nil { - return errors.Wrap(err, "cannot start transaction") - } - - if config.Url != "" { - configPairs = append(configPairs, &schemav1.Config{Key: schemav1.ConfigKeyPrometheusUrl, Value: config.Url, Locked: _true}) - - if config.Username != "" { - configPairs = append(configPairs, &schemav1.Config{Key: schemav1.ConfigKeyPrometheusUsername, Value: config.Username, Locked: _true}) - configPairs = append(configPairs, &schemav1.Config{Key: schemav1.ConfigKeyPrometheusPassword, Value: config.Password, Locked: _true}) - } else { - deleteKeys = append(deleteKeys, schemav1.ConfigKeyPrometheusUsername) - deleteKeys = append(deleteKeys, schemav1.ConfigKeyPrometheusPassword) - } - } else { - deleteKeys, err = cleanupKeys(ctx, db) - if err != nil { - if err := tx.Rollback(); err != nil { - return errors.Wrap(err, "cannot rollback transaction") - } - return errors.Wrap(err, "cannot cleanup Prometheus configuration") - } - } - - if len(configPairs) > 0 { - upsertStmt, _ := db.BuildUpsertStmt(&schemav1.Config{}) - - if _, err := tx.NamedExecContext(ctx, upsertStmt, configPairs); err != nil { - if err := tx.Rollback(); err != nil { - return errors.Wrap(err, "cannot rollback transaction") - } - return errors.Wrap(err, "cannot upsert Prometheus configuration") - } - } - - if len(deleteKeys) > 0 { - deleteStmt := fmt.Sprintf( - `DELETE FROM %s WHERE %s = (?)`, - database.TableName(&schemav1.Config{}), - "`key`", - ) - - for _, key := range deleteKeys { - if _, err := tx.ExecContext(ctx, deleteStmt, key); err != nil { - if err := tx.Rollback(); err != nil { - return errors.Wrap(err, "cannot rollback transaction") - } - return errors.Wrap(err, "cannot delete Prometheus credentials") - } - } - } - - if err := tx.Commit(); err != nil { - return errors.Wrap(err, "cannot commit transaction") - } - - return nil -} - -func cleanupKeys(ctx context.Context, db *database.DB) ([]schemav1.ConfigKey, error) { - var dbConfig []*schemav1.Config - if err := db.SelectContext(ctx, &dbConfig, db.BuildSelectStmt(&schemav1.Config{}, &schemav1.Config{})); err != nil { - return nil, errors.Wrap(err, "cannot retrieve Prometheus configuration") - } - - var deleteKeys []schemav1.ConfigKey - - for _, c := range dbConfig { - if c.Locked.Bool { - switch c.Key { - case schemav1.ConfigKeyPrometheusUrl: - deleteKeys = append(deleteKeys, schemav1.ConfigKeyPrometheusUrl) - case schemav1.ConfigKeyPrometheusUsername: - deleteKeys = append(deleteKeys, schemav1.ConfigKeyPrometheusUsername) - case schemav1.ConfigKeyPrometheusPassword: - deleteKeys = append(deleteKeys, schemav1.ConfigKeyPrometheusPassword) - } - } - } - - return deleteKeys, nil -} - -func AutoDetectPrometheus(ctx context.Context, clientset *kubernetes.Clientset, config *PrometheusConfig) error { - services, err := clientset.CoreV1().Services("monitoring").List(ctx, kmetav1.ListOptions{ - LabelSelector: "app.kubernetes.io/name=prometheus", - }) - if err != nil { - return errors.Wrap(err, "cannot list Prometheus services") - } - - if len(services.Items) == 0 { - return errors.New("no Prometheus service found") - } - - var ip string - var port int32 - - // Check if we are running in a Kubernetes cluster. If so, use the - // service's ClusterIP. Otherwise, use the API Server's IP and NodePort. - if _, err = rest.InClusterConfig(); err == nil { - for _, service := range services.Items { - if service.Spec.Type == "ClusterIP" { - ip = services.Items[0].Spec.ClusterIP - port = services.Items[0].Spec.Ports[0].Port - - break - } - } - } else if errors.Is(err, rest.ErrNotInCluster) { - for _, service := range services.Items { - if service.Spec.Type == "NodePort" { - ip = strings.Split(clientset.RESTClient().Get().URL().Host, ":")[0] - port = service.Spec.Ports[0].NodePort - - break - } - } - } - - config.Url = fmt.Sprintf("http://%s:%d", ip, port) - - return nil -}