Skip to content

Commit

Permalink
Sync Prometheus config like for Notifications
Browse files Browse the repository at this point in the history
  • Loading branch information
jrauh01 committed Dec 18, 2024
1 parent 7da2740 commit 2a378e3
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 146 deletions.
9 changes: 5 additions & 4 deletions cmd/icinga-kubernetes/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,13 +271,13 @@ func main() {
})
}

err = metrics.SyncPrometheusConfig(ctx, db2, &cfg.Prometheus)
err = internal.SyncPrometheusConfig(ctx, db2, &cfg.Prometheus)
if err != nil {
klog.Error(errors.Wrap(err, "cannot sync prometheus config"))
}

if cfg.Prometheus.Url == "" {
err = metrics.AutoDetectPrometheus(ctx, clientset, &cfg.Prometheus)
err = internal.AutoDetectPrometheus(ctx, clientset, &cfg.Prometheus)
if err != nil {
klog.Error(errors.Wrap(err, "cannot auto-detect prometheus"))
}
Expand All @@ -288,8 +288,9 @@ func main() {

if cfg.Prometheus.Username != "" && cfg.Prometheus.Password != "" {
basicAuthTransport = &com.BasicAuthTransport{
Username: cfg.Prometheus.Username,
Password: cfg.Prometheus.Password,
RoundTripper: http.DefaultTransport,
Username: cfg.Prometheus.Username,
Password: cfg.Prometheus.Password,
}
}

Expand Down
150 changes: 150 additions & 0 deletions internal/prometheus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package internal

import (
"context"
"fmt"
"github.com/icinga/icinga-go-library/database"
"github.com/icinga/icinga-go-library/types"
"github.com/icinga/icinga-kubernetes/pkg/metrics"
schemav1 "github.com/icinga/icinga-kubernetes/pkg/schema/v1"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"strings"
)

func SyncPrometheusConfig(ctx context.Context, db *database.DB, config *metrics.PrometheusConfig) error {
_true := types.Bool{Bool: true, Valid: true}

if config.Url != "" {
toDb := []schemav1.Config{
{Key: schemav1.ConfigKeyPrometheusUrl, Value: config.Url, Locked: _true},
}

if config.Username != "" {
toDb = append(
toDb,
schemav1.Config{Key: schemav1.ConfigKeyPrometheusUsername, Value: config.Username, Locked: _true},
schemav1.Config{Key: schemav1.ConfigKeyPrometheusPassword, Value: config.Password, Locked: _true},
)
}

err := db.ExecTx(ctx, func(ctx context.Context, tx *sqlx.Tx) error {
if _, err := tx.ExecContext(
ctx,
fmt.Sprintf(
`DELETE FROM "%s" WHERE "key" LIKE ? AND "locked" = ?`,
database.TableName(&schemav1.Config{}),
),
`prometheus.%`,
_true,
); err != nil {
return errors.Wrap(err, "cannot delete Prometheus config")
}

stmt, _ := db.BuildInsertStmt(schemav1.Config{})
if _, err := tx.NamedExecContext(ctx, stmt, toDb); err != nil {
return errors.Wrap(err, "cannot insert Prometheus config")
}

return nil
})
if err != nil {
return errors.Wrap(err, "cannot upsert Prometheus config")
}
} else {
err := db.ExecTx(ctx, func(ctx context.Context, tx *sqlx.Tx) error {
if _, err := tx.ExecContext(
ctx,
fmt.Sprintf(
`DELETE FROM "%s" WHERE "key" LIKE ? AND "locked" = ?`,
database.TableName(&schemav1.Config{}),
),
`prometheus.%`,
_true,
); err != nil {
return errors.Wrap(err, "cannot delete Prometheus config")
}

rows, err := tx.QueryxContext(ctx, db.BuildSelectStmt(&schemav1.Config{}, &schemav1.Config{}))
if err != nil {
return errors.Wrap(err, "cannot fetch Prometheus config from DB")
}

for rows.Next() {
var r schemav1.Config
if err := rows.StructScan(&r); err != nil {
return errors.Wrap(err, "cannot fetch Prometheus config from DB")
}

switch r.Key {
case schemav1.ConfigKeyPrometheusUrl:
config.Url = r.Value
case schemav1.ConfigKeyPrometheusUsername:
config.Username = r.Value
case schemav1.ConfigKeyPrometheusPassword:
config.Password = r.Value
}
}

return nil
})
if err != nil {
return errors.Wrap(err, "cannot retrieve Prometheus config")
}
}

return nil
}

// AutoDetectPrometheus tries to auto-detect the Prometheus service in the monitoring namespace and
// if found sets the URL in the supplied Prometheus configuration. The first service with the label
// "app.kubernetes.io/name=prometheus" is used. Until now the ServiceTypes ClusterIP and NodePort are supported.
func AutoDetectPrometheus(ctx context.Context, clientset *kubernetes.Clientset, config *metrics.PrometheusConfig) error {
services, err := clientset.CoreV1().Services("monitoring").List(ctx, kmetav1.ListOptions{
LabelSelector: "app.kubernetes.io/name=prometheus",
})
if err != nil {
return errors.Wrap(err, "cannot list Prometheus services")
}

if len(services.Items) == 0 {
return errors.New("no Prometheus service found")
}

var ip string
var port int32

// Check if we are running in a Kubernetes cluster. If so, use the
// service's ClusterIP. Otherwise, use the API Server's IP and NodePort.
if _, err = rest.InClusterConfig(); err == nil {
for _, service := range services.Items {
if service.Spec.Type == v1.ServiceTypeClusterIP {
ip = service.Spec.ClusterIP
port = service.Spec.Ports[0].Port

break
}
}
} else if errors.Is(err, rest.ErrNotInCluster) {
for _, service := range services.Items {
if service.Spec.Type == v1.ServiceTypeNodePort {
ip = strings.Split(clientset.RESTClient().Get().URL().Host, ":")[0]
port = service.Spec.Ports[0].NodePort

break
}
}
}

if ip == "" {

}

config.Url = fmt.Sprintf("http://%s:%d", ip, port)

return nil
}
142 changes: 0 additions & 142 deletions pkg/metrics/config.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,7 @@
package metrics

import (
"context"
"database/sql"
"fmt"
"github.com/icinga/icinga-go-library/database"
"github.com/icinga/icinga-go-library/types"
schemav1 "github.com/icinga/icinga-kubernetes/pkg/schema/v1"
"github.com/pkg/errors"
kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"strings"
)

// PrometheusConfig defines Prometheus configuration.
Expand All @@ -31,135 +21,3 @@ func (c *PrometheusConfig) Validate() error {
}
return nil
}

func SyncPrometheusConfig(ctx context.Context, db *database.DB, config *PrometheusConfig) error {
_true := types.Bool{Bool: true, Valid: true}

var configPairs []*schemav1.Config
var deleteKeys []schemav1.ConfigKey

tx, err := db.BeginTxx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable})
if err != nil {
return errors.Wrap(err, "cannot start transaction")
}

if config.Url != "" {
configPairs = append(configPairs, &schemav1.Config{Key: schemav1.ConfigKeyPrometheusUrl, Value: config.Url, Locked: _true})

if config.Username != "" {
configPairs = append(configPairs, &schemav1.Config{Key: schemav1.ConfigKeyPrometheusUsername, Value: config.Username, Locked: _true})
configPairs = append(configPairs, &schemav1.Config{Key: schemav1.ConfigKeyPrometheusPassword, Value: config.Password, Locked: _true})
} else {
deleteKeys = append(deleteKeys, schemav1.ConfigKeyPrometheusUsername)
deleteKeys = append(deleteKeys, schemav1.ConfigKeyPrometheusPassword)
}
} else {
deleteKeys, err = cleanupKeys(ctx, db)
if err != nil {
if err := tx.Rollback(); err != nil {
return errors.Wrap(err, "cannot rollback transaction")
}
return errors.Wrap(err, "cannot cleanup Prometheus configuration")
}
}

if len(configPairs) > 0 {
upsertStmt, _ := db.BuildUpsertStmt(&schemav1.Config{})

if _, err := tx.NamedExecContext(ctx, upsertStmt, configPairs); err != nil {
if err := tx.Rollback(); err != nil {
return errors.Wrap(err, "cannot rollback transaction")
}
return errors.Wrap(err, "cannot upsert Prometheus configuration")
}
}

if len(deleteKeys) > 0 {
deleteStmt := fmt.Sprintf(
`DELETE FROM %s WHERE %s = (?)`,
database.TableName(&schemav1.Config{}),
"`key`",
)

for _, key := range deleteKeys {
if _, err := tx.ExecContext(ctx, deleteStmt, key); err != nil {
if err := tx.Rollback(); err != nil {
return errors.Wrap(err, "cannot rollback transaction")
}
return errors.Wrap(err, "cannot delete Prometheus credentials")
}
}
}

if err := tx.Commit(); err != nil {
return errors.Wrap(err, "cannot commit transaction")
}

return nil
}

func cleanupKeys(ctx context.Context, db *database.DB) ([]schemav1.ConfigKey, error) {
var dbConfig []*schemav1.Config
if err := db.SelectContext(ctx, &dbConfig, db.BuildSelectStmt(&schemav1.Config{}, &schemav1.Config{})); err != nil {
return nil, errors.Wrap(err, "cannot retrieve Prometheus configuration")
}

var deleteKeys []schemav1.ConfigKey

for _, c := range dbConfig {
if c.Locked.Bool {
switch c.Key {
case schemav1.ConfigKeyPrometheusUrl:
deleteKeys = append(deleteKeys, schemav1.ConfigKeyPrometheusUrl)
case schemav1.ConfigKeyPrometheusUsername:
deleteKeys = append(deleteKeys, schemav1.ConfigKeyPrometheusUsername)
case schemav1.ConfigKeyPrometheusPassword:
deleteKeys = append(deleteKeys, schemav1.ConfigKeyPrometheusPassword)
}
}
}

return deleteKeys, nil
}

func AutoDetectPrometheus(ctx context.Context, clientset *kubernetes.Clientset, config *PrometheusConfig) error {
services, err := clientset.CoreV1().Services("monitoring").List(ctx, kmetav1.ListOptions{
LabelSelector: "app.kubernetes.io/name=prometheus",
})
if err != nil {
return errors.Wrap(err, "cannot list Prometheus services")
}

if len(services.Items) == 0 {
return errors.New("no Prometheus service found")
}

var ip string
var port int32

// Check if we are running in a Kubernetes cluster. If so, use the
// service's ClusterIP. Otherwise, use the API Server's IP and NodePort.
if _, err = rest.InClusterConfig(); err == nil {
for _, service := range services.Items {
if service.Spec.Type == "ClusterIP" {
ip = services.Items[0].Spec.ClusterIP
port = services.Items[0].Spec.Ports[0].Port

break
}
}
} else if errors.Is(err, rest.ErrNotInCluster) {
for _, service := range services.Items {
if service.Spec.Type == "NodePort" {
ip = strings.Split(clientset.RESTClient().Get().URL().Host, ":")[0]
port = service.Spec.Ports[0].NodePort

break
}
}
}

config.Url = fmt.Sprintf("http://%s:%d", ip, port)

return nil
}

0 comments on commit 2a378e3

Please sign in to comment.