From a7b4167196d4e7b82d09a6d6f08d9a8cea98d1c6 Mon Sep 17 00:00:00 2001 From: Fionera Date: Tue, 5 Mar 2024 04:58:14 +0100 Subject: [PATCH] feat: implement config handling This does leak fds as the logging target never gets closed, this has to be fixed before release but this will work for now. --- cmd/coraza-spoa/config.go | 138 ++++++++++++++++++++++++++++++++++++++ cmd/coraza-spoa/main.go | 91 +++++++++++++++++++------ examples/coraza-spoa.yaml | 8 ++- go.mod | 1 + go.sum | 4 ++ internal/agent.go | 24 +++++-- internal/application.go | 49 +++++++------- internal/e2e_test.go | 11 ++- 8 files changed, 272 insertions(+), 54 deletions(-) create mode 100644 cmd/coraza-spoa/config.go diff --git a/cmd/coraza-spoa/config.go b/cmd/coraza-spoa/config.go new file mode 100644 index 0000000..4a42c3b --- /dev/null +++ b/cmd/coraza-spoa/config.go @@ -0,0 +1,138 @@ +package main + +import ( + "fmt" + "io" + "net/url" + "os" + "time" + + "github.com/rs/zerolog" + "gopkg.in/yaml.v3" + + "github.com/corazawaf/coraza-spoa/internal" +) + +func readConfig() (*config, error) { + open, err := os.Open(configPath) + if err != nil { + return nil, err + } + + d := yaml.NewDecoder(open) + d.KnownFields(true) + + var cfg config + if err := d.Decode(&cfg); err != nil { + return nil, err + } + + if len(cfg.Applications) == 0 { + globalLogger.Warn().Msg("no applications defined") + } + + return &cfg, nil +} + +type config struct { + Bind string `yaml:"bind"` + Log logConfig `yaml:",inline"` + Applications []struct { + Log logConfig `yaml:",inline"` + Name string `yaml:"name"` + Directives string `yaml:"directives"` + ResponseCheck bool `yaml:"response_check"` + TransactionTTLMS int `yaml:"transaction_ttl_ms"` + TransactionActiveLimit int `yaml:"transaction_active_limit"` + TransactionActiveLimitReject bool `yaml:"transaction_active_limit_reject"` + } `yaml:"applications"` +} + +func (c config) networkAddressFromBind() (network string, address string) { + bindUrl, err := url.Parse(c.Bind) + if err == nil { + return bindUrl.Scheme, bindUrl.Path + } + + return "tcp", c.Bind +} + +func (c config) newApplications() (map[string]*internal.Application, error) { + allApps := make(map[string]*internal.Application) + + for name, a := range c.Applications { + logger, err := a.Log.newLogger() + if err != nil { + return nil, fmt.Errorf("creating logger for application %q: %v", name, err) + } + + appConfig := internal.AppConfig{ + Logger: logger, + Directives: a.Directives, + ResponseCheck: a.ResponseCheck, + TransactionTTLMS: time.Duration(a.TransactionTTLMS) * time.Millisecond, + } + + application, err := appConfig.NewApplication() + if err != nil { + return nil, fmt.Errorf("initializing application %q: %v", name, err) + } + + allApps[a.Name] = application + } + + return allApps, nil +} + +type logConfig struct { + Level string `yaml:"log_level"` + File string `yaml:"log_file"` + Format string `yaml:"log_format"` +} + +func (lc logConfig) outputWriter() (io.Writer, error) { + var out io.Writer + if lc.File == "" || lc.File == "/dev/stdout" { + out = os.Stdout + } else if lc.File == "/dev/stderr" { + out = os.Stderr + } else if lc.File == "/dev/null" { + out = io.Discard + } else { + // TODO: Close the handle if not used anymore. + // Currently these are leaked as soon as we reload. + f, err := os.OpenFile(lc.File, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) + if err != nil { + return nil, err + } + out = f + } + return out, nil +} + +func (lc logConfig) newLogger() (zerolog.Logger, error) { + out, err := lc.outputWriter() + if err != nil { + return globalLogger, err + } + + switch lc.Format { + case "console": + out = zerolog.ConsoleWriter{ + Out: out, + } + case "json": + default: + return globalLogger, fmt.Errorf("unknown log format: %v", lc.Format) + } + + if lc.Level == "" { + lc.Level = "info" + } + lvl, err := zerolog.ParseLevel(lc.Level) + if err != nil { + return globalLogger, err + } + + return zerolog.New(out).Level(lvl).With().Timestamp().Logger(), nil +} diff --git a/cmd/coraza-spoa/main.go b/cmd/coraza-spoa/main.go index 858b766..e6b76dd 100644 --- a/cmd/coraza-spoa/main.go +++ b/cmd/coraza-spoa/main.go @@ -11,33 +11,60 @@ import ( "os/signal" "syscall" - "github.com/rs/zerolog/log" + "github.com/rs/zerolog" "github.com/corazawaf/coraza-spoa/internal" ) +var configPath string +var globalLogger = zerolog.New(os.Stderr).With().Timestamp().Logger() + func main() { + flag.StringVar(&configPath, "config", "", "configuration file") flag.Parse() - log.Info().Msg("Starting coraza-spoa") - //TODO START HERE + if configPath == "" { + globalLogger.Fatal().Msg("Configuration file is not set") + } + + cfg, err := readConfig() + if err != nil { + globalLogger.Fatal().Err(err).Msg("Failed loading config") + } + + logger, err := cfg.Log.newLogger() + if err != nil { + globalLogger.Fatal().Err(err).Msg("Failed creating global logger") + } + globalLogger = logger + + apps, err := cfg.newApplications() + if err != nil { + globalLogger.Fatal().Err(err).Msg("Failed creating applications") + } + + ctx, cancelFunc := context.WithCancel(context.Background()) + defer cancelFunc() - l, err := net.Listen("tcp", "127.0.0.1:8000") + network, address := cfg.networkAddressFromBind() + l, err := (&net.ListenConfig{}).Listen(ctx, network, address) if err != nil { - return + globalLogger.Fatal().Err(err).Msg("Failed opening socket") } a := &internal.Agent{ - Context: context.Background(), - Applications: map[string]*internal.Application{ - "default": { - ResponseCheck: true, - TransactionTTLMs: 1000, - }, - }, + Context: ctx, + Applications: apps, + Logger: globalLogger, } + go func() { + defer cancelFunc() - log.Print(a.Serve(l)) + globalLogger.Info().Msg("Starting coraza-spoa") + if err := a.Serve(l); err != nil { + globalLogger.Fatal().Err(err).Msg("listener closed") + } + }() sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGUSR1, syscall.SIGINT) @@ -45,17 +72,43 @@ func main() { sig := <-sigCh switch sig { case syscall.SIGTERM: - log.Info().Msg("Received SIGTERM, shutting down...") + globalLogger.Info().Msg("Received SIGTERM, shutting down...") // this return will run cancel() and close the server return case syscall.SIGINT: - log.Info().Msg("Received SIGINT, shutting down...") + globalLogger.Info().Msg("Received SIGINT, shutting down...") return case syscall.SIGHUP: - log.Info().Msg("Received SIGHUP, reloading configuration...") - log.Error().Err(nil).Msg("Error loading configuration, using old configuration") - case syscall.SIGUSR1: - log.Info().Msg("SIGUSR1 received. Changing port is not supported yet") + globalLogger.Info().Msg("Received SIGHUP, reloading configuration...") + + newCfg, err := readConfig() + if err != nil { + globalLogger.Error().Err(err).Msg("Error loading configuration, using old configuration") + continue + } + + if cfg.Log != newCfg.Log { + newLogger, err := newCfg.Log.newLogger() + if err != nil { + globalLogger.Error().Err(err).Msg("Error creating new global logger, using old configuration") + continue + } + globalLogger = newLogger + } + + if cfg.Bind != newCfg.Bind { + globalLogger.Error().Msg("Changing bind is not supported yet, using old configuration") + continue + } + + apps, err := newCfg.newApplications() + if err != nil { + globalLogger.Error().Err(err).Msg("Error applying configuration, using old configuration") + continue + } + + a.ReplaceApplications(apps) + cfg = newCfg } } } diff --git a/examples/coraza-spoa.yaml b/examples/coraza-spoa.yaml index 266bedc..d4d31a7 100644 --- a/examples/coraza-spoa.yaml +++ b/examples/coraza-spoa.yaml @@ -1,9 +1,6 @@ # The SPOA server bind address bind: 0.0.0.0:9000 -# The maximum number of transactions which can be cached -transaction_active_limit: 1000 - # The log level configuration, one of: debug/info/warn/error/panic/fatal log_level: info # The log file path @@ -46,6 +43,11 @@ applications: log_level: info # The log file path log_file: /dev/stdout + # The log format, one of: console/json + log_format: console + + # The maximum number of transactions which can be cached + transaction_active_limit: 1000 # After reaching the maximum number of transactions: # If true then the new transactions will be rejected with deny and status code 503 diff --git a/go.mod b/go.mod index 65436ec..5e85641 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/magefile/mage v1.15.0 github.com/mccutchen/go-httpbin/v2 v2.13.4 github.com/rs/zerolog v1.32.0 + gopkg.in/yaml.v3 v3.0.1 istio.io/istio v0.0.0-20240218163812-d80ef7b19049 ) diff --git a/go.sum b/go.sum index 9fcf9c2..ef57211 100644 --- a/go.sum +++ b/go.sum @@ -46,6 +46,10 @@ golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/tools v0.17.0 h1:FvmRgNOcs3kOa+T20R1uhfP9F6HgG2mfxDv1vrx1Htc= golang.org/x/tools v0.17.0/go.mod h1:xsh6VxdV005rRVaS6SSAf9oiAqljS7UZUacMZ8Bnsps= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= istio.io/istio v0.0.0-20240218163812-d80ef7b19049 h1:jR4INLKnkLNgQRNMBjkAt1ctPnuTq+vQ9wlZSOtR1+o= istio.io/istio v0.0.0-20240218163812-d80ef7b19049/go.mod h1:5ATT2TaGbT/L1SwCYvs2ArNeLxHkPKwhvT7r3TPMu6M= rsc.io/binaryregexp v0.2.0 h1:HfqmD5MEmC0zvwBuF187nq9mdnXjXsSivRiXN7SmRkE= diff --git a/internal/agent.go b/internal/agent.go index 8b308f5..b0783e3 100644 --- a/internal/agent.go +++ b/internal/agent.go @@ -4,6 +4,7 @@ import ( "context" "errors" "net" + "sync" "github.com/dropmorepackets/haproxy-go/pkg/encoding" "github.com/dropmorepackets/haproxy-go/spop" @@ -13,8 +14,9 @@ import ( type Agent struct { Context context.Context Applications map[string]*Application + Logger zerolog.Logger - logger *zerolog.Logger + mtx sync.RWMutex } func (a *Agent) Serve(l net.Listener) error { @@ -26,6 +28,12 @@ func (a *Agent) Serve(l net.Listener) error { return agent.Serve(l) } +func (a *Agent) ReplaceApplications(newApps map[string]*Application) { + a.mtx.Lock() + a.Applications = newApps + a.mtx.Unlock() +} + func (a *Agent) HandleSPOE(ctx context.Context, writer *encoding.ActionWriter, message *encoding.Message) { const ( messageCorazaRequest = "coraza-req" @@ -39,14 +47,14 @@ func (a *Agent) HandleSPOE(ctx context.Context, writer *encoding.ActionWriter, m case messageCorazaResponse: messageHandler = (*Application).HandleResponse default: - a.logger.Debug().Str("message", name).Msg("unknown spoe message") + a.Logger.Debug().Str("message", name).Msg("unknown spoe message") return } k := encoding.AcquireKVEntry() defer encoding.ReleaseKVEntry(k) if !message.KV.Next(k) { - a.logger.Panic().Msg("failed reading kv entry") + a.Logger.Panic().Msg("failed reading kv entry") return } @@ -54,14 +62,16 @@ func (a *Agent) HandleSPOE(ctx context.Context, writer *encoding.ActionWriter, m if !k.NameEquals("app") { // Without knowing the app, we cannot continue. We could fall back to a default application, // but all following code would have to support that as we now already read one of the kv entries. - a.logger.Panic().Str("expected", "app").Str("got", appName).Msg("unexpected kv entry") + a.Logger.Panic().Str("expected", "app").Str("got", appName).Msg("unexpected kv entry") return } + a.mtx.RLock() app := a.Applications[appName] + a.mtx.RUnlock() if app == nil { // If we cannot resolve the app, we fail as this is an invalid configuration. - a.logger.Panic().Str("app", appName).Msg("app not found") + a.Logger.Panic().Str("app", appName).Msg("app not found") return } @@ -77,10 +87,10 @@ func (a *Agent) HandleSPOE(ctx context.Context, writer *encoding.ActionWriter, m _ = writer.SetString(encoding.VarScopeTransaction, "data", interruption.Interruption.Data) _ = writer.SetInt64(encoding.VarScopeTransaction, "ruleid", int64(interruption.Interruption.RuleID)) - a.logger.Debug().Err(err).Msg("sending interruption") + a.Logger.Debug().Err(err).Msg("sending interruption") return } // If the error is not an ErrInterrupted, we panic to let the spop stream fail. - a.logger.Panic().Err(err).Msg("Error handling request") + a.Logger.Panic().Err(err).Msg("Error handling request") } diff --git a/internal/application.go b/internal/application.go index b0d76fd..5f0094e 100644 --- a/internal/application.go +++ b/internal/application.go @@ -17,13 +17,18 @@ import ( "istio.io/istio/pkg/cache" ) +type AppConfig struct { + Directives string + ResponseCheck bool + Logger zerolog.Logger + TransactionTTLMS time.Duration +} + type Application struct { - waf coraza.WAF - logger *zerolog.Logger - cache cache.ExpiringCache + waf coraza.WAF + cache cache.ExpiringCache - ResponseCheck bool - TransactionTTLMs time.Duration + AppConfig } type applicationRequest struct { @@ -92,7 +97,7 @@ func (a *Application) HandleRequest(ctx context.Context, writer *encoding.Action // acquire a new kv entry to continue reading other message values. k = encoding.AcquireKVEntry() default: - a.logger.Debug().Str("name", name).Msg("unknown kv entry") + a.Logger.Debug().Str("name", name).Msg("unknown kv entry") } } @@ -105,7 +110,7 @@ func (a *Application) HandleRequest(ctx context.Context, writer *encoding.Action tx := a.waf.NewTransactionWithID(sb.String()) // write transaction as early as possible to prevent cache misses - a.cache.SetWithExpiration(tx.ID(), tx, a.TransactionTTLMs*time.Millisecond) + a.cache.SetWithExpiration(tx.ID(), tx, a.TransactionTTLMS*time.Millisecond) if err := writer.SetString(encoding.VarScopeTransaction, "id", tx.ID()); err != nil { return err } @@ -216,7 +221,7 @@ func (a *Application) HandleResponse(ctx context.Context, writer *encoding.Actio // acquire a new kv entry to continue reading other message values. k = encoding.AcquireKVEntry() default: - a.logger.Debug().Str("name", name).Msg("unknown kv entry") + a.Logger.Debug().Str("name", name).Msg("unknown kv entry") } } @@ -259,27 +264,25 @@ func (a *Application) HandleResponse(ctx context.Context, writer *encoding.Actio return tx.Close() } -func NewApplication(logger *zerolog.Logger, directives string) (*Application, error) { - a := &Application{ - logger: logger, - ResponseCheck: true, - TransactionTTLMs: 1000, +func (a AppConfig) NewApplication() (*Application, error) { + app := Application{ + AppConfig: a, } config := coraza.NewWAFConfig(). - WithDirectives(directives). - WithErrorCallback(a.logCallback) + WithDirectives(a.Directives). + WithErrorCallback(app.logCallback) waf, err := coraza.NewWAF(config) if err != nil { return nil, err } - a.waf = waf + app.waf = waf const defaultExpire = time.Second * 10 const defaultEvictionInterval = time.Second * 1 - a.cache = cache.NewTTLWithCallback(defaultExpire, defaultEvictionInterval, func(key, value any) { + app.cache = cache.NewTTLWithCallback(defaultExpire, defaultEvictionInterval, func(key, value any) { // everytime a transaction runs into a timeout it gets closed. tx, ok := value.(types.Transaction) if !ok { @@ -289,11 +292,11 @@ func NewApplication(logger *zerolog.Logger, directives string) (*Application, er // Process Logging won't do anything if TX was already logged. tx.ProcessLogging() if err := tx.Close(); err != nil { - a.logger.Error().Err(err).Str("tx", tx.ID()).Msg("error closing transaction") + a.Logger.Error().Err(err).Str("tx", tx.ID()).Msg("error closing transaction") } }) - return a, nil + return &app, nil } func (a *Application) logCallback(mr types.MatchedRule) { @@ -301,14 +304,14 @@ func (a *Application) logCallback(mr types.MatchedRule) { switch mr.Rule().Severity() { case types.RuleSeverityWarning: - l = a.logger.Warn() + l = a.Logger.Warn() case types.RuleSeverityNotice, types.RuleSeverityInfo: - l = a.logger.Info() + l = a.Logger.Info() case types.RuleSeverityDebug: - l = a.logger.Debug() + l = a.Logger.Debug() default: - l = a.logger.Error() + l = a.Logger.Error() } l.Msg(mr.ErrorLog()) } diff --git a/internal/e2e_test.go b/internal/e2e_test.go index 1725b91..2249920 100644 --- a/internal/e2e_test.go +++ b/internal/e2e_test.go @@ -54,7 +54,14 @@ func withCoraza(t *testing.T, f func(*testing.T, testutil.HAProxyConfig, string) logger := zerolog.New(os.Stderr) - application, err := NewApplication(&logger, directives) + appCfg := AppConfig{ + Directives: directives, + ResponseCheck: true, + Logger: logger, + TransactionTTLMS: 10000, + } + + application, err := appCfg.NewApplication() if err != nil { t.Fatal(err) } @@ -64,7 +71,7 @@ func withCoraza(t *testing.T, f func(*testing.T, testutil.HAProxyConfig, string) Applications: map[string]*Application{ "default": application, }, - logger: &logger, + Logger: logger, } // create the listener synchronously to prevent a race