Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[USM] add user-mode benchmark #32292

Open
wants to merge 25 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
c085b09
created BenchmarkConsumer
amitslavin Dec 4, 2024
c6d9e90
test
amitslavin Dec 5, 2024
b882f7c
Merge remote-tracking branch 'origin/main' into amit.slavin/test-user…
amitslavin Dec 9, 2024
5f86349
added benchmark
amitslavin Dec 10, 2024
1df7f21
revert metric reset
amitslavin Dec 10, 2024
80e5c4f
support multiple http requests
amitslavin Dec 11, 2024
61daa0b
fix export function
amitslavin Dec 11, 2024
af9f635
clean code
amitslavin Dec 11, 2024
1164744
updated the code to support goroutine profile
amitslavin Dec 12, 2024
eb365d9
used shared lib
amitslavin Dec 12, 2024
03cb647
fix date
amitslavin Dec 12, 2024
609464c
update to BenchmarkHTTPEventConsumer
amitslavin Dec 17, 2024
d486639
Merge branch 'amit.slavin/test-user-space' into amit.slavin/USMON-130…
amitslavin Dec 17, 2024
617f254
fix typo
amitslavin Dec 17, 2024
2999f4a
added documentation
amitslavin Dec 17, 2024
f73c3e5
fix some cr notes
amitslavin Dec 18, 2024
160db04
Merge remote-tracking branch 'origin/main' into amit.slavin/USMON-130…
amitslavin Dec 18, 2024
00fcb97
updated the test to support different cases
amitslavin Dec 22, 2024
22fabe5
updated the test to include fragment
amitslavin Dec 23, 2024
b5f9f2d
add max events case
amitslavin Dec 23, 2024
74e9b95
Merge remote-tracking branch 'origin/main' into amit.slavin/USMON-130…
amitslavin Dec 24, 2024
d5e4d09
merged main
amitslavin Dec 24, 2024
bf0a127
fix lint
amitslavin Dec 24, 2024
cdfd528
updated doc
amitslavin Dec 24, 2024
b582fbd
fix cr notes
amitslavin Dec 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/network/protocols/events/batch_offsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func newOffsetManager(numCPUS int) *offsetManager {
}

// Get returns the data offset that hasn't been consumed yet for a given batch
func (o *offsetManager) Get(cpu int, batch *batch, syncing bool) (begin, end int) {
func (o *offsetManager) Get(cpu int, batch *Batch, syncing bool) (begin, end int) {
o.mux.Lock()
defer o.mux.Unlock()
state := o.stateByCPU[cpu]
Expand Down Expand Up @@ -85,6 +85,6 @@ func (o *offsetManager) NextBatchID(cpu int) int {
return o.stateByCPU[cpu].nextBatchID
}

func batchComplete(b *batch) bool {
func batchComplete(b *Batch) bool {
return b.Cap > 0 && b.Len == b.Cap
}
26 changes: 13 additions & 13 deletions pkg/network/protocols/events/batch_offsets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,36 +21,36 @@ func TestOffsets(t *testing.T) {
assert.Equal(t, 0, offsets.NextBatchID(1))

// reading full batch: cpu=0 batchID=0
begin, end := offsets.Get(0, &batch{Idx: 0, Len: 10, Cap: 10}, false)
begin, end := offsets.Get(0, &Batch{Idx: 0, Len: 10, Cap: 10}, false)
assert.Equal(t, 0, begin)
assert.Equal(t, 10, end)
// nextBatchID is advanced to 1 for cpu=0
assert.Equal(t, 1, offsets.NextBatchID(0))

// reading partial batch: cpu=1 batchID=0 sync=true
begin, end = offsets.Get(1, &batch{Idx: 0, Len: 8, Cap: 10}, true)
begin, end = offsets.Get(1, &Batch{Idx: 0, Len: 8, Cap: 10}, true)
assert.Equal(t, 0, begin)
assert.Equal(t, 8, end)
// nextBatchID remains 0 for cpu=1 since this batch hasn't been filled up yet
assert.Equal(t, 0, offsets.NextBatchID(1))

// reading full batch: cpu=1 batchID=0
begin, end = offsets.Get(1, &batch{Idx: 0, Len: 10, Cap: 10}, false)
begin, end = offsets.Get(1, &Batch{Idx: 0, Len: 10, Cap: 10}, false)
// notice we only read now the remaining offsets
assert.Equal(t, 8, begin)
assert.Equal(t, 10, end)
// nextBatchID is advanced to 1 for cpu=1
assert.Equal(t, 1, offsets.NextBatchID(1))

// reading partial batch: cpu=0 batchID=1 sync=true
begin, end = offsets.Get(0, &batch{Idx: 1, Len: 4, Cap: 10}, true)
begin, end = offsets.Get(0, &Batch{Idx: 1, Len: 4, Cap: 10}, true)
assert.Equal(t, 0, begin)
assert.Equal(t, 4, end)
// nextBatchID remains 1 for cpu=0
assert.Equal(t, 1, offsets.NextBatchID(0))

// reading partial batch: cpu=0 batchID=1 sync=true
begin, end = offsets.Get(0, &batch{Idx: 1, Len: 5, Cap: 10}, true)
begin, end = offsets.Get(0, &Batch{Idx: 1, Len: 5, Cap: 10}, true)
assert.Equal(t, 4, begin)
assert.Equal(t, 5, end)
// nextBatchID remains 1 for cpu=0
Expand All @@ -63,20 +63,20 @@ func TestDelayedBatchReads(t *testing.T) {

// this emulates the scenario where we preemptively read (sync=true) two
// complete batches in a row before they are read from perf buffer
begin, end := offsets.Get(0, &batch{Idx: 0, Len: 10, Cap: 10}, true)
begin, end := offsets.Get(0, &Batch{Idx: 0, Len: 10, Cap: 10}, true)
assert.Equal(t, 0, begin)
assert.Equal(t, 10, end)

begin, end = offsets.Get(0, &batch{Idx: 1, Len: 10, Cap: 10}, true)
begin, end = offsets.Get(0, &Batch{Idx: 1, Len: 10, Cap: 10}, true)
assert.Equal(t, 0, begin)
assert.Equal(t, 10, end)

// now the "delayed" batches from perf buffer are read in sequence
begin, end = offsets.Get(0, &batch{Idx: 0, Len: 10, Cap: 10}, true)
begin, end = offsets.Get(0, &Batch{Idx: 0, Len: 10, Cap: 10}, true)
assert.Equal(t, 0, begin)
assert.Equal(t, 0, end)

begin, end = offsets.Get(0, &batch{Idx: 1, Len: 10, Cap: 10}, true)
begin, end = offsets.Get(0, &Batch{Idx: 1, Len: 10, Cap: 10}, true)
assert.Equal(t, 0, begin)
assert.Equal(t, 0, end)
}
Expand All @@ -85,11 +85,11 @@ func TestUnchangedBatchRead(t *testing.T) {
const numCPUs = 1
offsets := newOffsetManager(numCPUs)

begin, end := offsets.Get(0, &batch{Idx: 0, Len: 5, Cap: 10}, true)
begin, end := offsets.Get(0, &Batch{Idx: 0, Len: 5, Cap: 10}, true)
assert.Equal(t, 0, begin)
assert.Equal(t, 5, end)

begin, end = offsets.Get(0, &batch{Idx: 0, Len: 5, Cap: 10}, true)
begin, end = offsets.Get(0, &Batch{Idx: 0, Len: 5, Cap: 10}, true)
assert.Equal(t, 5, begin)
assert.Equal(t, 5, end)
}
Expand All @@ -99,13 +99,13 @@ func TestReadGap(t *testing.T) {
offsets := newOffsetManager(numCPUs)

// this emulates the scenario where a batch is lost in the perf buffer
begin, end := offsets.Get(0, &batch{Idx: 0, Len: 10, Cap: 10}, true)
begin, end := offsets.Get(0, &Batch{Idx: 0, Len: 10, Cap: 10}, true)
assert.Equal(t, 0, begin)
assert.Equal(t, 10, end)
assert.Equal(t, 1, offsets.NextBatchID(0))

// batch idx=1 was lost
begin, end = offsets.Get(0, &batch{Idx: 2, Len: 10, Cap: 10}, true)
begin, end = offsets.Get(0, &Batch{Idx: 2, Len: 10, Cap: 10}, true)
assert.Equal(t, 0, begin)
assert.Equal(t, 10, end)
assert.Equal(t, 3, offsets.NextBatchID(0))
Expand Down
12 changes: 6 additions & 6 deletions pkg/network/protocols/events/batch_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,20 @@ import (
ddsync "github.com/DataDog/datadog-agent/pkg/util/sync"
)

var batchPool = ddsync.NewDefaultTypedPool[batch]()
var batchPool = ddsync.NewDefaultTypedPool[Batch]()

type batchReader struct {
sync.Mutex
numCPUs int
batchMap *maps.GenericMap[batchKey, batch]
batchMap *maps.GenericMap[batchKey, Batch]
offsets *offsetManager
workerPool *workerPool
stopped bool
}

func newBatchReader(offsetManager *offsetManager, batchMap *maps.GenericMap[batchKey, batch], numCPUs int) (*batchReader, error) {
func newBatchReader(offsetManager *offsetManager, batchMap *maps.GenericMap[batchKey, Batch], numCPUs int) (*batchReader, error) {
// initialize eBPF maps
batch := new(batch)
batch := new(Batch)
for i := 0; i < numCPUs; i++ {
// Ring buffer events don't have CPU information, so we associate each
// batch entry with a CPU during startup. This information is used by
Expand Down Expand Up @@ -57,7 +57,7 @@ func newBatchReader(offsetManager *offsetManager, batchMap *maps.GenericMap[batc

// ReadAll batches from eBPF (concurrently) and execute the given
// callback function for each batch
func (r *batchReader) ReadAll(f func(cpu int, b *batch)) {
func (r *batchReader) ReadAll(f func(cpu int, b *Batch)) {
// This lock is used only for the purposes of synchronizing termination
// and it's only held while *enqueing* the jobs.
r.Lock()
Expand All @@ -77,7 +77,7 @@ func (r *batchReader) ReadAll(f func(cpu int, b *batch)) {

b := batchPool.Get()
defer func() {
*b = batch{}
*b = Batch{}
batchPool.Put(b)
}()

Expand Down
14 changes: 7 additions & 7 deletions pkg/network/protocols/events/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
const (
batchMapSuffix = "_batches"
eventsMapSuffix = "_batch_events"
sizeOfBatch = int(unsafe.Sizeof(batch{}))
sizeOfBatch = int(unsafe.Sizeof(Batch{}))
)

var errInvalidPerfEvent = errors.New("invalid perf event")
Expand Down Expand Up @@ -59,7 +59,7 @@ type Consumer[V any] struct {
// 2) be thread-safe, as the callback may be executed concurrently from multiple go-routines;
func NewConsumer[V any](proto string, ebpf *manager.Manager, callback func([]V)) (*Consumer[V], error) {
batchMapName := proto + batchMapSuffix
batchMap, err := maps.GetMap[batchKey, batch](ebpf, batchMapName)
batchMap, err := maps.GetMap[batchKey, Batch](ebpf, batchMapName)
if err != nil {
return nil, fmt.Errorf("unable to find map %s: %s", batchMapName, err)
}
Expand Down Expand Up @@ -164,7 +164,7 @@ func (c *Consumer[V]) Start() {
return
}

c.batchReader.ReadAll(func(_ int, b *batch) {
c.batchReader.ReadAll(func(_ int, b *Batch) {
c.process(b, true)
})
if log.ShouldLog(log.DebugLvl) {
Expand Down Expand Up @@ -209,7 +209,7 @@ func (c *Consumer[V]) Stop() {
close(c.syncRequest)
}

func (c *Consumer[V]) process(b *batch, syncing bool) {
func (c *Consumer[V]) process(b *Batch, syncing bool) {
cpu := int(b.Cpu)

// Determine the subset of data we're interested in as we might have read
Expand Down Expand Up @@ -246,7 +246,7 @@ func (c *Consumer[V]) process(b *batch, syncing bool) {
c.callback(events)
}

func batchFromEventData(data []byte) (*batch, error) {
func batchFromEventData(data []byte) (*Batch, error) {
if len(data) < sizeOfBatch {
// For some reason the eBPF program sent us a perf event with a size
// different from what we're expecting.
Expand All @@ -260,10 +260,10 @@ func batchFromEventData(data []byte) (*batch, error) {
return nil, errInvalidPerfEvent
}

return (*batch)(unsafe.Pointer(&data[0])), nil
return (*Batch)(unsafe.Pointer(&data[0])), nil
}

func pointerToElement[V any](b *batch, elementIdx int) *V {
func pointerToElement[V any](b *Batch, elementIdx int) *V {
offset := elementIdx * int(b.Event_size)
return (*V)(unsafe.Pointer(uintptr(unsafe.Pointer(&b.Data[0])) + uintptr(offset)))
}
69 changes: 3 additions & 66 deletions pkg/network/protocols/events/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,9 @@ import (

manager "github.com/DataDog/ebpf-manager"
"github.com/cilium/ebpf"
"github.com/cilium/ebpf/features"
"github.com/cilium/ebpf/perf"
"github.com/cilium/ebpf/ringbuf"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

ddebpf "github.com/DataDog/datadog-agent/pkg/ebpf"
"github.com/DataDog/datadog-agent/pkg/ebpf/bytecode"
"github.com/DataDog/datadog-agent/pkg/network/config"
"github.com/DataDog/datadog-agent/pkg/util/kernel"
)
Expand All @@ -38,7 +33,7 @@ func TestConsumer(t *testing.T) {

const numEvents = 100
c := config.New()
program, err := newEBPFProgram(c)
program, err := NewEBPFProgram(c)
require.NoError(t, err)

var mux sync.Mutex
Expand Down Expand Up @@ -86,7 +81,7 @@ func TestInvalidBatchCountMetric(t *testing.T) {
}

c := config.New()
program, err := newEBPFProgram(c)
program, err := NewEBPFProgram(c)
require.NoError(t, err)
t.Cleanup(func() { program.Stop(manager.CleanAll) })

Expand All @@ -95,7 +90,7 @@ func TestInvalidBatchCountMetric(t *testing.T) {

// We are creating a raw sample with a data length of 4, which is smaller than sizeOfBatch
// and would be considered an invalid batch.
recordSample(c, consumer, []byte("test"))
RecordSample(c, consumer, []byte("test"))

consumer.Start()
t.Cleanup(func() { consumer.Stop() })
Expand All @@ -113,22 +108,6 @@ type eventGenerator struct {
testFile *os.File
}

// recordSample records a sample using the consumer handler.
func recordSample(c *config.Config, consumer *Consumer[uint64], sampleData []byte) {
// Ring buffers require kernel version 5.8.0 or higher, therefore, the handler is chosen based on the kernel version.
if c.EnableUSMRingBuffers && features.HaveMapType(ebpf.RingBuf) == nil {
handler := consumer.handler.(*ddebpf.RingBufferHandler)
handler.RecordHandler(&ringbuf.Record{
RawSample: sampleData,
}, nil, nil)
} else {
handler := consumer.handler.(*ddebpf.PerfHandler)
handler.RecordHandler(&perf.Record{
RawSample: sampleData,
}, nil, nil)
}
}

func newEventGenerator(program *manager.Manager, t *testing.T) *eventGenerator {
m, _, _ := program.GetMap("test")
require.NotNilf(t, m, "couldn't find test map")
Expand Down Expand Up @@ -169,45 +148,3 @@ func (e *eventGenerator) Generate(eventID uint64) error {
func (e *eventGenerator) Stop() {
e.testFile.Close()
}

func newEBPFProgram(c *config.Config) (*manager.Manager, error) {
bc, err := bytecode.GetReader(c.BPFDir, "usm_events_test-debug.o")
if err != nil {
return nil, err
}
defer bc.Close()

m := &manager.Manager{
Probes: []*manager.Probe{
{
ProbeIdentificationPair: manager.ProbeIdentificationPair{
EBPFFuncName: "tracepoint__syscalls__sys_enter_write",
},
},
},
}
options := manager.Options{
RemoveRlimit: true,
ActivatedProbes: []manager.ProbesSelector{
&manager.ProbeSelector{
ProbeIdentificationPair: manager.ProbeIdentificationPair{
EBPFFuncName: "tracepoint__syscalls__sys_enter_write",
},
},
},
ConstantEditors: []manager.ConstantEditor{
{
Name: "test_monitoring_enabled",
Value: uint64(1),
},
},
}

Configure(config.New(), "test", m, &options)
err = m.InitWithOptions(bc, options)
if err != nil {
return nil, err
}

return m, nil
}
Loading
Loading