diff --git a/cmd/network-observer/config.go b/cmd/network-observer/config.go index 5b30d3d81..3d2baa924 100644 --- a/cmd/network-observer/config.go +++ b/cmd/network-observer/config.go @@ -12,9 +12,9 @@ import ( ) type Config struct { - APIListenAddress string - APIDisableAccessLogs bool - APITLS TLSSpec + APIListenAddress string + APIEnableAccessLogs bool + APITLS TLSSpec EnableConsole bool ConsoleLocation string @@ -24,6 +24,8 @@ type Config struct { RouterTLS TLSSpec FlowRecordTTL time.Duration + VanflowLoggingProfile string + EnableProfile bool CORSAllowAll bool } diff --git a/cmd/network-observer/internal/collector/collector.go b/cmd/network-observer/internal/collector/collector.go index bdf49d7c6..eb7411f80 100644 --- a/cmd/network-observer/internal/collector/collector.go +++ b/cmd/network-observer/internal/collector/collector.go @@ -23,7 +23,7 @@ import ( "golang.org/x/sync/errgroup" ) -func New(logger *slog.Logger, factory session.ContainerFactory, reg *prometheus.Registry, flowRecordTTL time.Duration) *Collector { +func New(logger *slog.Logger, factory session.ContainerFactory, reg *prometheus.Registry, flowRecordTTL time.Duration, flowLogger func(vanflow.RecordMessage)) *Collector { sessionCtr := factory.Create() collector := &Collector{ @@ -37,6 +37,7 @@ func New(logger *slog.Logger, factory session.ContainerFactory, reg *prometheus. recordRouting: make(eventsource.RecordStoreMap), metrics: register(reg), metricsAdaptor: opmetrics.New(reg), + flowLogging: flowLogger, } collector.Records = store.NewSyncMapStore(store.SyncMapStoreConfig{ @@ -61,6 +62,7 @@ func New(logger *slog.Logger, factory session.ContainerFactory, reg *prometheus. type Collector struct { logger *slog.Logger flowRecordTTL time.Duration + flowLogging func(vanflow.RecordMessage) session session.Container discovery *eventsource.Discovery @@ -375,6 +377,9 @@ func (c *Collector) discoveryHandler(ctx context.Context) func(eventsource.Info) } } + if c.flowLogging != nil { + client.OnRecord(c.flowLogging) + } client.OnRecord(router.Route) for _, address := range addresses { diff --git a/cmd/network-observer/internal/flowlog/logging.go b/cmd/network-observer/internal/flowlog/logging.go new file mode 100644 index 000000000..f386b9025 --- /dev/null +++ b/cmd/network-observer/internal/flowlog/logging.go @@ -0,0 +1,241 @@ +package flowlog + +import ( + "context" + "encoding/json" + "hash/fnv" + "log/slog" + "slices" + "sync" + "sync/atomic" + "time" + + "github.com/skupperproject/skupper/pkg/vanflow" + "golang.org/x/time/rate" +) + +const ( + recordTypeMatchesAll string = "internal.flowlog.matchAll" + doNotSample = staticSampler(false) +) + +// Rule specifies how particular vanflow record types should be logged. +type Rule struct { + // Priority of the rule. Lowest matching a record type wins. + Priority int + // Match is the set of record types the rule applies to + Match RecordTypeSet + // Strategy for sampling records + Strategy SampleStrategy +} + +type MessageHandler func(vanflow.RecordMessage) + +// New creates a MessageHandler given a set of rules and a log output function +func New(ctx context.Context, logFn func(msg string, args ...any), rules []Rule) MessageHandler { + handler := &handler{ + logFn: logFn, + } + + for _, rule := range rules { + if rule.Strategy == nil || rule.Match == nil { + continue + } + handler.rules = append(handler.rules, rule) + } + slices.SortFunc(handler.rules, func(l, r Rule) int { + return l.Priority - r.Priority + }) + go handler.report(ctx) + return handler.handle +} + +type SampleStrategy interface { + // Sample returns true when the record should be logged + Sample(r vanflow.Record) bool +} + +type staticSampler bool + +func (s staticSampler) Sample(vanflow.Record) bool { + return bool(s) +} + +// Unlimited SampleStrategy always samples +func Unlimited() SampleStrategy { + return staticSampler(true) +} + +type rateLimited struct { + limiter *rate.Limiter +} + +// RateLimited SampleStrategy samples events up to a limit (in events per +// second). Events exceeding the limit are not logged. +func RateLimited(limit float64, burst int) SampleStrategy { + return rateLimited{ + limiter: rate.NewLimiter(rate.Limit(limit), burst), + } +} + +func (r rateLimited) Sample(vanflow.Record) bool { + return r.limiter.Allow() +} + +// TransportFlowHash uses a deterministic hash based on a TransportBiflow ID. +// Uses the AppBiflow Parent field (Transport ID) so that ideally related flows +// are sampled together. +func TransportFlowHash(percent float64, parent SampleStrategy) SampleStrategy { + if percent < 0 || percent >= 1.0 { + panic("percent must be value in range [0, 1)") + } + if parent == nil { + parent = Unlimited() + } + return hashBasedSampler{ + parent: parent, + mod: 10_000, + q: uint32(percent * 10_000), + } +} + +type hashBasedSampler struct { + parent SampleStrategy + mod uint32 + q uint32 +} + +func (h hashBasedSampler) Sample(r vanflow.Record) bool { + var transportID string + switch flow := r.(type) { + case vanflow.TransportBiflowRecord: + transportID = flow.ID + case vanflow.AppBiflowRecord: + if flow.Parent == nil { + return false + } + transportID = *flow.Parent + default: + return false + } + hash := fnv.New32a() + hash.Write([]byte(transportID)) + m := hash.Sum32() % h.mod + if h.q >= m { + return h.parent.Sample(r) + } + return false +} + +// RecordTypeSet specifies the Record Types that a Rule is applicable for. +type RecordTypeSet map[vanflow.TypeMeta]struct{} + +func (r RecordTypeSet) matchesAll() bool { + _, ok := r[vanflow.TypeMeta{APIVersion: recordTypeMatchesAll}] + return ok +} + +func NewRecordTypeSet(records ...vanflow.Record) RecordTypeSet { + set := RecordTypeSet{} + for _, r := range records { + set[r.GetTypeMeta()] = struct{}{} + } + return set +} + +// NewRecordTypeSetAll returns a special RecordTypeSet that matches records of +// all types +func NewRecordTypeSetAll() RecordTypeSet { + set := RecordTypeSet{} + set[vanflow.TypeMeta{APIVersion: recordTypeMatchesAll}] = struct{}{} + return set +} + +type handler struct { + logFn func(msg string, args ...any) + rules []Rule + + resolved sync.Map + sampled sync.Map +} + +func (h *handler) report(ctx context.Context) { + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + h.logReport() + } + } +} + +func (h *handler) logReport() { + sampleCounts := make(map[string]int) + h.sampled.Range(func(k, v any) bool { + h.sampled.Delete(k) + typ, ct := k.(vanflow.TypeMeta), v.(*atomic.Int64) + sampleCounts[typ.String()] = int(ct.Load()) + return true + }) + + if len(sampleCounts) == 0 { + return + } + var counts []any + for typ, count := range sampleCounts { + counts = append(counts, slog.Int(typ, count)) + + } + h.logFn("some vanflow records were not logged", counts...) +} + +func (h *handler) resolve(typ vanflow.TypeMeta) SampleStrategy { + r, ok := h.resolved.Load(typ) + if ok { + return r.(SampleStrategy) + } + var strategy SampleStrategy = doNotSample + for _, rule := range h.rules { + if rule.Match.matchesAll() { + strategy = rule.Strategy + break + } + if _, ok := rule.Match[typ]; ok { + strategy = rule.Strategy + break + } + } + h.resolved.Store(typ, strategy) + return strategy +} + +func (h *handler) handle(msg vanflow.RecordMessage) { + attrs := slog.Group("message", slog.String("to", msg.To), slog.String("subject", msg.Subject)) + for _, record := range msg.Records { + typ := record.GetTypeMeta() + strategy := h.resolve(typ) + if !strategy.Sample(record) { + if strategy != doNotSample { + prev, _ := h.sampled.LoadOrStore(typ, new(atomic.Int64)) + prev.(*atomic.Int64).Add(1) + } + continue + } + + // TODO(ck) more efficient slog.LogValuer for vanflow records? + raw, _ := json.Marshal(record) + var out map[string]any + json.Unmarshal(raw, &out) + recordValues := make([]any, 0, len(out)) + for k, v := range out { + if v == nil { + continue + } + recordValues = append(recordValues, slog.Any(k, v)) + } + h.logFn(record.GetTypeMeta().String(), slog.Group("record", recordValues...), attrs) + } +} diff --git a/cmd/network-observer/internal/flowlog/logging_test.go b/cmd/network-observer/internal/flowlog/logging_test.go new file mode 100644 index 000000000..ef8982955 --- /dev/null +++ b/cmd/network-observer/internal/flowlog/logging_test.go @@ -0,0 +1,240 @@ +package flowlog + +import ( + "context" + "log/slog" + "testing" + + "github.com/skupperproject/skupper/cmd/network-observer/internal/collector" + "github.com/skupperproject/skupper/pkg/vanflow" + "gotest.tools/v3/assert" +) + +func TestSampling(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + t.Run("lowest priority allow wins", func(t *testing.T) { + var ct int + messageCounter := func(string, ...any) { + ct++ + } + handler := New(ctx, messageCounter, []Rule{ // Lowest priority wins + { + Priority: 2, + Match: NewRecordTypeSet(vanflow.SiteRecord{}), + Strategy: doNotSample, + }, + { + Priority: 1, + Match: NewRecordTypeSet(vanflow.SiteRecord{}), + Strategy: Unlimited(), + }, + { + Priority: 5, + Match: NewRecordTypeSet(vanflow.SiteRecord{}), + Strategy: doNotSample, + }, + }) + + for x := 0; x < 1_000; x++ { + handler(vanflow.RecordMessage{Records: []vanflow.Record{ + vanflow.SiteRecord{}, + vanflow.SiteRecord{}, + vanflow.ProcessRecord{}, + }}) + } + assert.Equal(t, ct, 2_000) + }) + + t.Run("rate limt zero with burst", func(t *testing.T) { + ct := 0 + messageCounter := func(string, ...any) { + ct++ + } + handler := New(ctx, messageCounter, []Rule{ // zero rate with bursts + { + Priority: 1, + Match: NewRecordTypeSet(vanflow.SiteRecord{}), + Strategy: RateLimited(0.0, 16), + }, + }) + + for x := 0; x < 1_000; x++ { + handler(vanflow.RecordMessage{Records: []vanflow.Record{ + vanflow.SiteRecord{}, + vanflow.SiteRecord{}, + vanflow.ProcessRecord{}, + }}) + } + assert.Equal(t, ct, 16) + }) + + t.Run("hash based", func(t *testing.T) { + var ( + MagicPercentage = 0.1 + MagicMatchingIDs = []string{ + "test::7", + "test::3e2", + "test::317", + "test::3ad", + } + MagicSkippedIDs = []string{ + "test::d", + "test::3e3", + "test::318", + "test::3af", + } + ) + + var ( + ctTport int + ctApp int + ) + check := func(subject string, _ ...any) { + switch subject { + case vanflow.TransportBiflowRecord{}.GetTypeMeta().String(): + ctTport++ + case vanflow.AppBiflowRecord{}.GetTypeMeta().String(): + ctApp++ + default: + t.Errorf("unexpected type: %s", subject) + } + } + handler := New(ctx, check, []Rule{ + { + Priority: 1, + Match: NewRecordTypeSetAll(), + Strategy: TransportFlowHash(MagicPercentage, Unlimited()), + }, + }) + + for _, tportID := range MagicMatchingIDs { + handler(vanflow.RecordMessage{Records: []vanflow.Record{ + vanflow.SiteRecord{BaseRecord: vanflow.NewBase(tportID)}, + vanflow.ProcessRecord{BaseRecord: vanflow.NewBase(tportID)}, + vanflow.TransportBiflowRecord{BaseRecord: vanflow.NewBase(tportID)}, + vanflow.AppBiflowRecord{BaseRecord: vanflow.NewBase(tportID), Parent: &tportID}, + }}) + } + for _, tportID := range MagicSkippedIDs { + handler(vanflow.RecordMessage{Records: []vanflow.Record{ + vanflow.SiteRecord{BaseRecord: vanflow.NewBase(tportID)}, + vanflow.ProcessRecord{BaseRecord: vanflow.NewBase(tportID)}, + vanflow.TransportBiflowRecord{BaseRecord: vanflow.NewBase(tportID)}, + vanflow.AppBiflowRecord{BaseRecord: vanflow.NewBase(tportID), Parent: &tportID}, + }}) + } + assert.Equal(t, ctTport, ctApp) + assert.Equal(t, ctTport, len(MagicMatchingIDs)) + + ctTport, ctApp = 0, 0 + handler = New(ctx, check, []Rule{ + { + Priority: 1, + Match: NewRecordTypeSetAll(), + Strategy: TransportFlowHash(MagicPercentage, doNotSample), + }, + }) + handler(vanflow.RecordMessage{Records: []vanflow.Record{ + vanflow.TransportBiflowRecord{BaseRecord: vanflow.NewBase(MagicMatchingIDs[0])}, + vanflow.AppBiflowRecord{BaseRecord: vanflow.NewBase(MagicMatchingIDs[1])}, + }}) + assert.Equal(t, ctTport, ctApp) + assert.Equal(t, ctTport, 0) + }) + + t.Run("all types", func(t *testing.T) { + ct := 0 + messageCounter := func(string, ...any) { + ct++ + } + handler := New(ctx, messageCounter, []Rule{ // zero rate with bursts + { + Priority: 1, + Match: NewRecordTypeSetAll(), + Strategy: Unlimited(), + }, + }) + + for x := 0; x < 10; x++ { + + handler(vanflow.RecordMessage{Records: []vanflow.Record{ + vanflow.SiteRecord{}, + vanflow.SiteRecord{}, + vanflow.FlowRecord{}, + vanflow.AppBiflowRecord{}, + collector.AddressRecord{}, + vanflow.ProcessRecord{}, + }}) + } + assert.Equal(t, ct, 60) + }) + t.Run("log report empty", func(t *testing.T) { + type message struct { + Msg string + Args []any + } + messages := []message{} + + recordMessages := func(msg string, args ...any) { + messages = append(messages, message{Msg: msg, Args: args}) + } + handler := &handler{ + logFn: recordMessages, + rules: []Rule{ + { + Priority: 1, + Match: NewRecordTypeSet(vanflow.SiteRecord{}), + Strategy: Unlimited(), + }, + }, + } + handler.handle(vanflow.RecordMessage{ + Records: []vanflow.Record{ + vanflow.SiteRecord{}, + }, + }) + handler.handle(vanflow.RecordMessage{}) + assert.Equal(t, len(messages), 1) + messages = messages[:0] + handler.logReport() + assert.Equal(t, len(messages), 0) + }) + t.Run("log report", func(t *testing.T) { + type message struct { + Msg string + Args []any + } + messages := []message{} + + recordMessages := func(msg string, args ...any) { + messages = append(messages, message{Msg: msg, Args: args}) + } + handler := &handler{ + logFn: recordMessages, + rules: []Rule{ + { + Priority: 1, + Match: NewRecordTypeSet(vanflow.SiteRecord{}), + Strategy: RateLimited(0.0, 16), + }, + }, + } + for x := 0; x < 1_000; x++ { + handler.handle(vanflow.RecordMessage{Records: []vanflow.Record{ + vanflow.SiteRecord{}, + vanflow.SiteRecord{}, + vanflow.ProcessRecord{}, + }}) + } + assert.Equal(t, len(messages), 16) + messages = messages[:0] + handler.logReport() + assert.Equal(t, len(messages), 1) + assert.DeepEqual(t, messages[0].Args, []any{ + slog.Int("flow/v1/SiteRecord", 2_000-16), + }) + }) + +} diff --git a/cmd/network-observer/main.go b/cmd/network-observer/main.go index 270965ae1..84986d315 100755 --- a/cmd/network-observer/main.go +++ b/cmd/network-observer/main.go @@ -19,7 +19,9 @@ import ( "github.com/skupperproject/skupper/cmd/network-observer/internal/api" "github.com/skupperproject/skupper/cmd/network-observer/internal/collector" + "github.com/skupperproject/skupper/cmd/network-observer/internal/flowlog" "github.com/skupperproject/skupper/cmd/network-observer/internal/server" + "github.com/skupperproject/skupper/pkg/vanflow" "github.com/skupperproject/skupper/pkg/vanflow/session" "github.com/skupperproject/skupper/pkg/version" ) @@ -44,7 +46,27 @@ func run(cfg Config) error { return fmt.Errorf("failed to load router tls configuration: %s", err) } - collector := collector.New(logger.With(slog.String("component", "collector")), session.NewContainerFactory(cfg.RouterURL, sessionConfig), reg, cfg.FlowRecordTTL) + flowLogger := func(vanflow.RecordMessage) {} + vanflowSLog := logger.With(slog.String("component", "vanflow")) + switch cfg.VanflowLoggingProfile { + case "silent": + case "minimal": + flowLogger = flowlog.New(ctx, vanflowSLog.Info, loggingProfileMinimal) + case "moderate": + flowLogger = flowlog.New(ctx, vanflowSLog.Info, loggingProfileModerate) + case "all": + flowLogger = flowlog.New(ctx, vanflowSLog.Info, loggingProfileAll) + default: + return fmt.Errorf("unknown logging profile: %s", cfg.VanflowLoggingProfile) + } + + collector := collector.New( + logger.With(slog.String("component", "collector")), + session.NewContainerFactory(cfg.RouterURL, sessionConfig), + reg, + cfg.FlowRecordTTL, + flowLogger, + ) collectorAPI := server.New( logger.With(slog.String("component", "api")), @@ -77,7 +99,7 @@ func run(cfg Config) error { apiMux.PathPrefix("/").Handler(handleConsoleAssets(cfg.ConsoleLocation)) } - if !cfg.APIDisableAccessLogs { + if cfg.APIEnableAccessLogs { mux.Use(func(next http.Handler) http.Handler { return handlers.LoggingHandler(os.Stdout, next) }) @@ -182,7 +204,7 @@ func main() { flags.BoolVar(&cfg.RouterTLS.SkipVerify, "router-tls-insecure", false, "Set to skip verification of the router certificate and host name") flags.StringVar(&cfg.APIListenAddress, "listen", ":8080", "The address that the API Server will listen on") - flags.BoolVar(&cfg.APIDisableAccessLogs, "disable-access-logs", false, "Disables access logging for the API Server") + flags.BoolVar(&cfg.APIEnableAccessLogs, "enable-access-logs", false, "Enable access logging for the API Server") flags.StringVar(&cfg.APITLS.Cert, "tls-cert", "", "Path to the API Server certificate file") flags.StringVar(&cfg.APITLS.Key, "tls-key", "", "Path to the API Server certificate key file matching tls-cert") @@ -194,6 +216,8 @@ func main() { flags.BoolVar(&cfg.CORSAllowAll, "cors-allow-all", false, "Development option to allow all origins") flags.BoolVar(&cfg.EnableProfile, "profile", false, "Exposes the runtime profiling facilities from net/http/pprof on http://localhost:9970") + flags.StringVar(&cfg.VanflowLoggingProfile, "vanflow-logging-profile", "silent", "Controls low level vanflow record logging. Options are silent, minimal, moderate and all") + flags.Parse(os.Args[1:]) if *isVersion { fmt.Println(version.Version) diff --git a/cmd/network-observer/profiles.go b/cmd/network-observer/profiles.go new file mode 100644 index 000000000..f80a091d7 --- /dev/null +++ b/cmd/network-observer/profiles.go @@ -0,0 +1,55 @@ +package main + +import ( + "github.com/skupperproject/skupper/cmd/network-observer/internal/flowlog" + "github.com/skupperproject/skupper/pkg/vanflow" +) + +var ( + // loggingProfileMinimal logs 1 vanflow event per second (with bursts up to 32) + // reduces Link Record noise to 1 every ~20s. + // excludes network flow records + loggingProfileMinimal = []flowlog.Rule{ + { + Priority: 5, + Match: flowlog.NewRecordTypeSet( + vanflow.SiteRecord{}, vanflow.RouterRecord{}, + vanflow.ProcessRecord{}, vanflow.ConnectorRecord{}, + vanflow.ListenerRecord{}, vanflow.RouterAccessRecord{}, + vanflow.LogRecord{}, + ), + Strategy: flowlog.RateLimited(1.0, 32), + }, { + Priority: 1, + Match: flowlog.NewRecordTypeSet(vanflow.LinkRecord{}), + Strategy: flowlog.RateLimited(0.05, 32), + }, + } + // loggingProfileModerate is similar to minimal but doubles rate and burst + // limits. Also samples 1 in every 10 network flows up to 2 events per second. + loggingProfileModerate = []flowlog.Rule{ + { + Priority: 5, + Match: flowlog.NewRecordTypeSetAll(), + Strategy: flowlog.RateLimited(2.0, 64), + }, { + Priority: 1, + Match: flowlog.NewRecordTypeSet(vanflow.LinkRecord{}), + Strategy: flowlog.RateLimited(0.1, 64), + }, { + Priority: 1, + Match: flowlog.NewRecordTypeSet( + vanflow.AppBiflowRecord{}, + vanflow.TransportBiflowRecord{}, + ), + Strategy: flowlog.TransportFlowHash(0.1, flowlog.RateLimited(2.0, 64)), + }, + } + // loggingProfileAll logs all vanflow events. + loggingProfileAll = []flowlog.Rule{ + { + Match: flowlog.NewRecordTypeSetAll(), + Strategy: flowlog.Unlimited(), + }, + } +)