Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collector logging #1828

Merged
merged 4 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions cmd/network-observer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import (
)

type Config struct {
APIListenAddress string
APIDisableAccessLogs bool
APITLS TLSSpec
APIListenAddress string
APIEnableAccessLogs bool
APITLS TLSSpec

EnableConsole bool
ConsoleLocation string
Expand All @@ -24,6 +24,8 @@ type Config struct {
RouterTLS TLSSpec
FlowRecordTTL time.Duration

VanflowLoggingProfile string

EnableProfile bool
CORSAllowAll bool
}
Expand Down
7 changes: 6 additions & 1 deletion cmd/network-observer/internal/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"golang.org/x/sync/errgroup"
)

func New(logger *slog.Logger, factory session.ContainerFactory, reg *prometheus.Registry, flowRecordTTL time.Duration) *Collector {
func New(logger *slog.Logger, factory session.ContainerFactory, reg *prometheus.Registry, flowRecordTTL time.Duration, flowLogger func(vanflow.RecordMessage)) *Collector {
sessionCtr := factory.Create()

collector := &Collector{
Expand All @@ -37,6 +37,7 @@ func New(logger *slog.Logger, factory session.ContainerFactory, reg *prometheus.
recordRouting: make(eventsource.RecordStoreMap),
metrics: register(reg),
metricsAdaptor: opmetrics.New(reg),
flowLogging: flowLogger,
}

collector.Records = store.NewSyncMapStore(store.SyncMapStoreConfig{
Expand All @@ -61,6 +62,7 @@ func New(logger *slog.Logger, factory session.ContainerFactory, reg *prometheus.
type Collector struct {
logger *slog.Logger
flowRecordTTL time.Duration
flowLogging func(vanflow.RecordMessage)

session session.Container
discovery *eventsource.Discovery
Expand Down Expand Up @@ -375,6 +377,9 @@ func (c *Collector) discoveryHandler(ctx context.Context) func(eventsource.Info)
}
}

if c.flowLogging != nil {
client.OnRecord(c.flowLogging)
}
client.OnRecord(router.Route)

for _, address := range addresses {
Expand Down
241 changes: 241 additions & 0 deletions cmd/network-observer/internal/flowlog/logging.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
package flowlog

import (
"context"
"encoding/json"
"hash/fnv"
"log/slog"
"slices"
"sync"
"sync/atomic"
"time"

"github.com/skupperproject/skupper/pkg/vanflow"
"golang.org/x/time/rate"
)

const (
recordTypeMatchesAll string = "internal.flowlog.matchAll"
doNotSample = staticSampler(false)
)

// Rule specifies how particular vanflow record types should be logged.
type Rule struct {
// Priority of the rule. Lowest matching a record type wins.
Priority int
// Match is the set of record types the rule applies to
Match RecordTypeSet
// Strategy for sampling records
Strategy SampleStrategy
}

type MessageHandler func(vanflow.RecordMessage)

// New creates a MessageHandler given a set of rules and a log output function
func New(ctx context.Context, logFn func(msg string, args ...any), rules []Rule) MessageHandler {
handler := &handler{
logFn: logFn,
}

for _, rule := range rules {
if rule.Strategy == nil || rule.Match == nil {
continue
}
handler.rules = append(handler.rules, rule)
}
slices.SortFunc(handler.rules, func(l, r Rule) int {
return l.Priority - r.Priority
})
go handler.report(ctx)
return handler.handle
}

type SampleStrategy interface {
// Sample returns true when the record should be logged
Sample(r vanflow.Record) bool
}

type staticSampler bool

func (s staticSampler) Sample(vanflow.Record) bool {
return bool(s)
}

// Unlimited SampleStrategy always samples
func Unlimited() SampleStrategy {
return staticSampler(true)
}

type rateLimited struct {
limiter *rate.Limiter
}

// RateLimited SampleStrategy samples events up to a limit (in events per
// second). Events exceeding the limit are not logged.
func RateLimited(limit float64, burst int) SampleStrategy {
return rateLimited{
limiter: rate.NewLimiter(rate.Limit(limit), burst),
}
}

func (r rateLimited) Sample(vanflow.Record) bool {
return r.limiter.Allow()
}

// TransportFlowHash uses a deterministic hash based on a TransportBiflow ID.
// Uses the AppBiflow Parent field (Transport ID) so that ideally related flows
// are sampled together.
func TransportFlowHash(percent float64, parent SampleStrategy) SampleStrategy {
if percent < 0 || percent >= 1.0 {
panic("percent must be value in range [0, 1)")
}
if parent == nil {
parent = Unlimited()
}
return hashBasedSampler{
parent: parent,
mod: 10_000,
q: uint32(percent * 10_000),
}
}

type hashBasedSampler struct {
parent SampleStrategy
mod uint32
q uint32
}

func (h hashBasedSampler) Sample(r vanflow.Record) bool {
var transportID string
switch flow := r.(type) {
case vanflow.TransportBiflowRecord:
transportID = flow.ID
case vanflow.AppBiflowRecord:
if flow.Parent == nil {
return false
}
transportID = *flow.Parent
default:
return false
}
hash := fnv.New32a()
hash.Write([]byte(transportID))
m := hash.Sum32() % h.mod
if h.q >= m {
return h.parent.Sample(r)
}
return false
}

// RecordTypeSet specifies the Record Types that a Rule is applicable for.
type RecordTypeSet map[vanflow.TypeMeta]struct{}

func (r RecordTypeSet) matchesAll() bool {
_, ok := r[vanflow.TypeMeta{APIVersion: recordTypeMatchesAll}]
return ok
}

func NewRecordTypeSet(records ...vanflow.Record) RecordTypeSet {
set := RecordTypeSet{}
for _, r := range records {
set[r.GetTypeMeta()] = struct{}{}
}
return set
}

// NewRecordTypeSetAll returns a special RecordTypeSet that matches records of
// all types
func NewRecordTypeSetAll() RecordTypeSet {
set := RecordTypeSet{}
set[vanflow.TypeMeta{APIVersion: recordTypeMatchesAll}] = struct{}{}
return set
}

type handler struct {
logFn func(msg string, args ...any)
rules []Rule

resolved sync.Map
sampled sync.Map
}

func (h *handler) report(ctx context.Context) {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
h.logReport()
}
}
}

func (h *handler) logReport() {
sampleCounts := make(map[string]int)
h.sampled.Range(func(k, v any) bool {
h.sampled.Delete(k)
typ, ct := k.(vanflow.TypeMeta), v.(*atomic.Int64)
sampleCounts[typ.String()] = int(ct.Load())
return true
})

if len(sampleCounts) == 0 {
return
}
var counts []any
for typ, count := range sampleCounts {
counts = append(counts, slog.Int(typ, count))

}
h.logFn("some vanflow records were not logged", counts...)
}

func (h *handler) resolve(typ vanflow.TypeMeta) SampleStrategy {
r, ok := h.resolved.Load(typ)
if ok {
return r.(SampleStrategy)
}
var strategy SampleStrategy = doNotSample
for _, rule := range h.rules {
if rule.Match.matchesAll() {
strategy = rule.Strategy
break
}
if _, ok := rule.Match[typ]; ok {
strategy = rule.Strategy
break
}
}
h.resolved.Store(typ, strategy)
return strategy
}

func (h *handler) handle(msg vanflow.RecordMessage) {
attrs := slog.Group("message", slog.String("to", msg.To), slog.String("subject", msg.Subject))
for _, record := range msg.Records {
typ := record.GetTypeMeta()
strategy := h.resolve(typ)
if !strategy.Sample(record) {
if strategy != doNotSample {
prev, _ := h.sampled.LoadOrStore(typ, new(atomic.Int64))
prev.(*atomic.Int64).Add(1)
}
continue
}

// TODO(ck) more efficient slog.LogValuer for vanflow records?
raw, _ := json.Marshal(record)
var out map[string]any
json.Unmarshal(raw, &out)
recordValues := make([]any, 0, len(out))
for k, v := range out {
if v == nil {
continue
}
recordValues = append(recordValues, slog.Any(k, v))
}
h.logFn(record.GetTypeMeta().String(), slog.Group("record", recordValues...), attrs)
}
}
Loading