Skip to content

Commit

Permalink
Merge branch 'context-propagation-4' into context-propagation-wip
Browse files Browse the repository at this point in the history
  • Loading branch information
mmetc committed Sep 18, 2024
2 parents 4f28033 + 5273f66 commit edb90c0
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 24 deletions.
6 changes: 4 additions & 2 deletions cmd/crowdsec-cli/clialert/alerts.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,15 +575,17 @@ func (cli *cliAlerts) newFlushCmd() *cobra.Command {
DisableAutoGenTag: true,
RunE: func(cmd *cobra.Command, _ []string) error {
cfg := cli.cfg()
ctx := cmd.Context()

if err := require.LAPI(cfg); err != nil {
return err
}
db, err := require.DBClient(cmd.Context(), cfg.DbConfig)
db, err := require.DBClient(ctx, cfg.DbConfig)
if err != nil {
return err
}
log.Info("Flushing alerts. !! This may take a long time !!")
err = db.FlushAlerts(maxAge, maxItems)
err = db.FlushAlerts(ctx, maxAge, maxItems)
if err != nil {
return fmt.Errorf("unable to flush alerts: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func NewServer(config *csconfig.LocalApiServerCfg) (*APIServer, error) {
}

if config.DbConfig.Flush != nil {
flushScheduler, err = dbClient.StartFlushScheduler(config.DbConfig.Flush)
flushScheduler, err = dbClient.StartFlushScheduler(ctx, config.DbConfig.Flush)
if err != nil {
return nil, err
}
Expand Down
43 changes: 22 additions & 21 deletions pkg/database/flush.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package database

import (
"context"
"errors"
"fmt"
"time"
Expand All @@ -26,7 +27,7 @@ const (
flushInterval = 1 * time.Minute
)

func (c *Client) StartFlushScheduler(config *csconfig.FlushDBCfg) (*gocron.Scheduler, error) {
func (c *Client) StartFlushScheduler(ctx context.Context, config *csconfig.FlushDBCfg) (*gocron.Scheduler, error) {
maxItems := 0
maxAge := ""

Expand All @@ -45,7 +46,7 @@ func (c *Client) StartFlushScheduler(config *csconfig.FlushDBCfg) (*gocron.Sched
// Init & Start cronjob every minute for alerts
scheduler := gocron.NewScheduler(time.UTC)

job, err := scheduler.Every(1).Minute().Do(c.FlushAlerts, maxAge, maxItems)
job, err := scheduler.Every(1).Minute().Do(c.FlushAlerts, ctx, maxAge, maxItems)
if err != nil {
return nil, fmt.Errorf("while starting FlushAlerts scheduler: %w", err)
}
Expand Down Expand Up @@ -100,14 +101,14 @@ func (c *Client) StartFlushScheduler(config *csconfig.FlushDBCfg) (*gocron.Sched
}
}

baJob, err := scheduler.Every(flushInterval).Do(c.FlushAgentsAndBouncers, config.AgentsGC, config.BouncersGC)
baJob, err := scheduler.Every(flushInterval).Do(c.FlushAgentsAndBouncers, ctx, config.AgentsGC, config.BouncersGC)
if err != nil {
return nil, fmt.Errorf("while starting FlushAgentsAndBouncers scheduler: %w", err)
}

baJob.SingletonMode()

metricsJob, err := scheduler.Every(flushInterval).Do(c.flushMetrics, config.MetricsMaxAge)
metricsJob, err := scheduler.Every(flushInterval).Do(c.flushMetrics, ctx, config.MetricsMaxAge)
if err != nil {
return nil, fmt.Errorf("while starting flushMetrics scheduler: %w", err)
}
Expand All @@ -120,7 +121,7 @@ func (c *Client) StartFlushScheduler(config *csconfig.FlushDBCfg) (*gocron.Sched
}

// flushMetrics deletes metrics older than maxAge, regardless if they have been pushed to CAPI or not
func (c *Client) flushMetrics(maxAge *time.Duration) {
func (c *Client) flushMetrics(ctx context.Context, maxAge *time.Duration) {
if maxAge == nil {
maxAge = ptr.Of(defaultMetricsMaxAge)
}
Expand All @@ -129,7 +130,7 @@ func (c *Client) flushMetrics(maxAge *time.Duration) {

deleted, err := c.Ent.Metric.Delete().Where(
metric.ReceivedAtLTE(time.Now().UTC().Add(-*maxAge)),
).Exec(c.CTX)
).Exec(ctx)
if err != nil {
c.Log.Errorf("while flushing metrics: %s", err)
return
Expand All @@ -140,10 +141,10 @@ func (c *Client) flushMetrics(maxAge *time.Duration) {
}
}

func (c *Client) FlushOrphans() {
func (c *Client) FlushOrphans(ctx context.Context) {
/* While it has only been linked to some very corner-case bug : https://github.com/crowdsecurity/crowdsec/issues/778 */
/* We want to take care of orphaned events for which the parent alert/decision has been deleted */
eventsCount, err := c.Ent.Event.Delete().Where(event.Not(event.HasOwner())).Exec(c.CTX)
eventsCount, err := c.Ent.Event.Delete().Where(event.Not(event.HasOwner())).Exec(ctx)
if err != nil {
c.Log.Warningf("error while deleting orphan events: %s", err)
return
Expand All @@ -154,7 +155,7 @@ func (c *Client) FlushOrphans() {
}

eventsCount, err = c.Ent.Decision.Delete().Where(
decision.Not(decision.HasOwner())).Where(decision.UntilLTE(time.Now().UTC())).Exec(c.CTX)
decision.Not(decision.HasOwner())).Where(decision.UntilLTE(time.Now().UTC())).Exec(ctx)
if err != nil {
c.Log.Warningf("error while deleting orphan decisions: %s", err)
return
Expand All @@ -165,7 +166,7 @@ func (c *Client) FlushOrphans() {
}
}

func (c *Client) flushBouncers(authType string, duration *time.Duration) {
func (c *Client) flushBouncers(ctx context.Context, authType string, duration *time.Duration) {
if duration == nil {
return
}
Expand All @@ -174,7 +175,7 @@ func (c *Client) flushBouncers(authType string, duration *time.Duration) {
bouncer.LastPullLTE(time.Now().UTC().Add(-*duration)),
).Where(
bouncer.AuthTypeEQ(authType),
).Exec(c.CTX)
).Exec(ctx)
if err != nil {
c.Log.Errorf("while auto-deleting expired bouncers (%s): %s", authType, err)
return
Expand All @@ -185,7 +186,7 @@ func (c *Client) flushBouncers(authType string, duration *time.Duration) {
}
}

func (c *Client) flushAgents(authType string, duration *time.Duration) {
func (c *Client) flushAgents(ctx context.Context, authType string, duration *time.Duration) {
if duration == nil {
return
}
Expand All @@ -194,7 +195,7 @@ func (c *Client) flushAgents(authType string, duration *time.Duration) {
machine.LastHeartbeatLTE(time.Now().UTC().Add(-*duration)),
machine.Not(machine.HasAlerts()),
machine.AuthTypeEQ(authType),
).Exec(c.CTX)
).Exec(ctx)
if err != nil {
c.Log.Errorf("while auto-deleting expired machines (%s): %s", authType, err)
return
Expand All @@ -205,23 +206,23 @@ func (c *Client) flushAgents(authType string, duration *time.Duration) {
}
}

func (c *Client) FlushAgentsAndBouncers(agentsCfg *csconfig.AuthGCCfg, bouncersCfg *csconfig.AuthGCCfg) error {
func (c *Client) FlushAgentsAndBouncers(ctx context.Context, agentsCfg *csconfig.AuthGCCfg, bouncersCfg *csconfig.AuthGCCfg) error {
log.Debug("starting FlushAgentsAndBouncers")

if agentsCfg != nil {
c.flushAgents(types.TlsAuthType, agentsCfg.CertDuration)
c.flushAgents(types.PasswordAuthType, agentsCfg.LoginPasswordDuration)
c.flushAgents(ctx, types.TlsAuthType, agentsCfg.CertDuration)
c.flushAgents(ctx, types.PasswordAuthType, agentsCfg.LoginPasswordDuration)
}

if bouncersCfg != nil {
c.flushBouncers(types.TlsAuthType, bouncersCfg.CertDuration)
c.flushBouncers(types.ApiKeyAuthType, bouncersCfg.ApiDuration)
c.flushBouncers(ctx, types.TlsAuthType, bouncersCfg.CertDuration)
c.flushBouncers(ctx, types.ApiKeyAuthType, bouncersCfg.ApiDuration)
}

return nil
}

func (c *Client) FlushAlerts(MaxAge string, MaxItems int) error {
func (c *Client) FlushAlerts(ctx context.Context, MaxAge string, MaxItems int) error {
var (
deletedByAge int
deletedByNbItem int
Expand All @@ -235,7 +236,7 @@ func (c *Client) FlushAlerts(MaxAge string, MaxItems int) error {
}

c.Log.Debug("Flushing orphan alerts")
c.FlushOrphans()
c.FlushOrphans(ctx)
c.Log.Debug("Done flushing orphan alerts")

totalAlerts, err = c.TotalAlerts()
Expand Down Expand Up @@ -287,7 +288,7 @@ func (c *Client) FlushAlerts(MaxAge string, MaxItems int) error {

if maxid > 0 {
// This may lead to orphan alerts (at least on MySQL), but the next time the flush job will run, they will be deleted
deletedByNbItem, err = c.Ent.Alert.Delete().Where(alert.IDLT(maxid)).Exec(c.CTX)
deletedByNbItem, err = c.Ent.Alert.Delete().Where(alert.IDLT(maxid)).Exec(ctx)
if err != nil {
c.Log.Errorf("FlushAlerts: Could not delete alerts: %s", err)
return fmt.Errorf("could not delete alerts: %w", err)
Expand Down

0 comments on commit edb90c0

Please sign in to comment.