Skip to content

Commit

Permalink
Use HTTP zstd compression for the Agent telemetry payloads (#32440)
Browse files Browse the repository at this point in the history
  • Loading branch information
iglendd authored Dec 25, 2024
1 parent 8f37b9b commit 701ea4a
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 32 deletions.
122 changes: 97 additions & 25 deletions comp/core/agenttelemetry/impl/agenttelemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/DataDog/datadog-agent/comp/core/telemetry"
"github.com/DataDog/datadog-agent/comp/core/telemetry/telemetryimpl"
"github.com/DataDog/datadog-agent/pkg/util/fxutil"
"github.com/DataDog/zstd"
)

// HTTP client mock
Expand Down Expand Up @@ -139,12 +140,14 @@ func makeLogMock(t *testing.T) log.Component {
return logmock.New(t)
}

func makeSenderImpl(t *testing.T, c string) sender {
func makeSenderImpl(t *testing.T, cl client, c string) sender {
o := convertYamlStrToMap(t, c)
cfg := makeCfgMock(t, o)
log := makeLogMock(t)
client := newClientMock()
sndr, err := newSenderImpl(cfg, log, client)
if cl == nil {
cl = newClientMock()
}
sndr, err := newSenderImpl(cfg, log, cl)
assert.NoError(t, err)
return sndr
}
Expand Down Expand Up @@ -552,10 +555,10 @@ func TestNoTagSpecifiedAggregationHistogram(t *testing.T) {
// setup and initiate atel
tel := makeTelMock(t)
buckets := []float64{10, 100, 1000, 10000}
gauge := tel.NewHistogram("bar", "zoo", []string{"tag1", "tag2", "tag3"}, "", buckets)
gauge.WithTags(map[string]string{"tag1": "a1", "tag2": "b1", "tag3": "c1"}).Observe(1001)
gauge.WithTags(map[string]string{"tag1": "a2", "tag2": "b2", "tag3": "c2"}).Observe(1002)
gauge.WithTags(map[string]string{"tag1": "a3", "tag2": "b3", "tag3": "c3"}).Observe(1003)
hist := tel.NewHistogram("bar", "zoo", []string{"tag1", "tag2", "tag3"}, "", buckets)
hist.WithTags(map[string]string{"tag1": "a1", "tag2": "b1", "tag3": "c1"}).Observe(1001)
hist.WithTags(map[string]string{"tag1": "a2", "tag2": "b2", "tag3": "c2"}).Observe(1002)
hist.WithTags(map[string]string{"tag1": "a3", "tag2": "b3", "tag3": "c3"}).Observe(1003)

o := convertYamlStrToMap(t, c)
s := &senderMock{}
Expand Down Expand Up @@ -714,7 +717,7 @@ func TestTwoProfilesOnTheSameScheduleGenerateSinglePayload(t *testing.T) {
counter2.AddWithTags(20, map[string]string{"tag1": "a1", "tag2": "b1", "tag3": "c1"})

o := convertYamlStrToMap(t, c)
s := makeSenderImpl(t, c)
s := makeSenderImpl(t, nil, c)
r := newRunnerMock()
a := getTestAtel(t, tel, o, s, nil, r)
require.True(t, a.enabled)
Expand Down Expand Up @@ -751,7 +754,7 @@ func TestOneProfileWithOneMetricMultipleContextsGenerateTwoPayloads(t *testing.T
counter1.AddWithTags(20, map[string]string{"tag1": "a2", "tag2": "b2", "tag3": "c2"})

o := convertYamlStrToMap(t, c)
s := makeSenderImpl(t, c)
s := makeSenderImpl(t, nil, c)
r := newRunnerMock()
a := getTestAtel(t, tel, o, s, nil, r)
require.True(t, a.enabled)
Expand Down Expand Up @@ -827,7 +830,7 @@ func TestOneProfileWithTwoMetricGenerateSinglePayloads(t *testing.T) {
counter2.AddWithTags(20, map[string]string{"tag1": "a1", "tag2": "b1", "tag3": "c1"})

o := convertYamlStrToMap(t, c)
s := makeSenderImpl(t, c)
s := makeSenderImpl(t, nil, c)
r := newRunnerMock()
a := getTestAtel(t, tel, o, s, nil, r)
require.True(t, a.enabled)
Expand All @@ -849,7 +852,7 @@ func TestSenderConfigNoConfig(t *testing.T) {
agent_telemetry:
enabled: true
`
sndr := makeSenderImpl(t, c)
sndr := makeSenderImpl(t, nil, c)

url := buildURL(sndr.(*senderImpl).endpoints.Main)
assert.Equal(t, "https://instrumentation-telemetry-intake.datadoghq.com/api/v2/apmtelemetry", url)
Expand Down Expand Up @@ -877,7 +880,7 @@ func TestSenderConfigOnlySites(t *testing.T) {

for _, tt := range tests {
c := fmt.Sprintf(ctemp, tt.site)
sndr := makeSenderImpl(t, c)
sndr := makeSenderImpl(t, nil, c)
url := buildURL(sndr.(*senderImpl).endpoints.Main)
assert.Equal(t, tt.testURL, url)
}
Expand All @@ -894,7 +897,7 @@ func TestSenderConfigAdditionalEndpoint(t *testing.T) {
- api_key: bar
host: instrumentation-telemetry-intake.us5.datadoghq.com
`
sndr := makeSenderImpl(t, c)
sndr := makeSenderImpl(t, nil, c)
assert.NotNil(t, sndr)

assert.Len(t, sndr.(*senderImpl).endpoints.Endpoints, 2)
Expand All @@ -913,7 +916,7 @@ func TestSenderConfigPartialDDUrl(t *testing.T) {
enabled: true
dd_url: instrumentation-telemetry-intake.us5.datadoghq.com.
`
sndr := makeSenderImpl(t, c)
sndr := makeSenderImpl(t, nil, c)
assert.NotNil(t, sndr)

assert.Len(t, sndr.(*senderImpl).endpoints.Endpoints, 1)
Expand All @@ -930,7 +933,7 @@ func TestSenderConfigFullDDUrl(t *testing.T) {
enabled: true
dd_url: https://instrumentation-telemetry-intake.us5.datadoghq.com.
`
sndr := makeSenderImpl(t, c)
sndr := makeSenderImpl(t, nil, c)
assert.NotNil(t, sndr)

assert.Len(t, sndr.(*senderImpl).endpoints.Endpoints, 1)
Expand All @@ -950,7 +953,7 @@ func TestSenderConfigDDUrlWithAdditionalEndpoints(t *testing.T) {
- api_key: bar
host: instrumentation-telemetry-intake.us3.datadoghq.com.
`
sndr := makeSenderImpl(t, c)
sndr := makeSenderImpl(t, nil, c)
assert.NotNil(t, sndr)

assert.Len(t, sndr.(*senderImpl).endpoints.Endpoints, 2)
Expand All @@ -970,7 +973,7 @@ func TestSenderConfigDDUrlWithEmptyAdditionalPoint(t *testing.T) {
dd_url: instrumentation-telemetry-intake.us5.datadoghq.com.
additional_endpoints:
`
sndr := makeSenderImpl(t, c)
sndr := makeSenderImpl(t, nil, c)
assert.NotNil(t, sndr)

assert.Len(t, sndr.(*senderImpl).endpoints.Endpoints, 1)
Expand Down Expand Up @@ -1010,7 +1013,7 @@ func TestGetAsJSONScrub(t *testing.T) {
counter3.AddWithTags(11, map[string]string{"text": "test"})

o := convertYamlStrToMap(t, c)
s := makeSenderImpl(t, c)
s := makeSenderImpl(t, nil, c)
r := newRunnerMock()
a := getTestAtel(t, tel, o, s, nil, r)
require.True(t, a.enabled)
Expand Down Expand Up @@ -1058,7 +1061,7 @@ func TestAdjustPrometheusCounterValueMultipleTags(t *testing.T) {
// setup and initiate atel
tel := makeTelMock(t)
o := convertYamlStrToMap(t, c)
s := makeSenderImpl(t, c)
s := makeSenderImpl(t, nil, c)
r := newRunnerMock()
a := getTestAtel(t, tel, o, s, nil, r)
require.True(t, a.enabled)
Expand Down Expand Up @@ -1167,7 +1170,7 @@ func TestAdjustPrometheusCounterValueMultipleTagValues(t *testing.T) {
// setup and initiate atel
tel := makeTelMock(t)
o := convertYamlStrToMap(t, c)
s := makeSenderImpl(t, c)
s := makeSenderImpl(t, nil, c)
r := newRunnerMock()
a := getTestAtel(t, tel, o, s, nil, r)
require.True(t, a.enabled)
Expand Down Expand Up @@ -1241,7 +1244,7 @@ func TestAdjustPrometheusCounterValueTagless(t *testing.T) {
// setup and initiate atel
tel := makeTelMock(t)
o := convertYamlStrToMap(t, c)
s := makeSenderImpl(t, c)
s := makeSenderImpl(t, nil, c)
r := newRunnerMock()
a := getTestAtel(t, tel, o, s, nil, r)
require.True(t, a.enabled)
Expand Down Expand Up @@ -1348,7 +1351,7 @@ func TestHistogramFloatUpperBoundNormalization(t *testing.T) {
// setup and initiate atel
tel := makeTelMock(t)
o := convertYamlStrToMap(t, c)
s := makeSenderImpl(t, c)
s := makeSenderImpl(t, nil, c)
r := newRunnerMock()
a := getTestAtel(t, tel, o, s, nil, r)
require.True(t, a.enabled)
Expand Down Expand Up @@ -1474,7 +1477,7 @@ func TestHistogramFloatUpperBoundNormalizationWithTags(t *testing.T) {
// setup and initiate atel
tel := makeTelMock(t)
o := convertYamlStrToMap(t, c)
s := makeSenderImpl(t, c)
s := makeSenderImpl(t, nil, c)
r := newRunnerMock()
a := getTestAtel(t, tel, o, s, nil, r)
require.True(t, a.enabled)
Expand Down Expand Up @@ -1591,7 +1594,7 @@ func TestHistogramFloatUpperBoundNormalizationWithMultivalueTags(t *testing.T) {
// setup and initiate atel
tel := makeTelMock(t)
o := convertYamlStrToMap(t, c)
s := makeSenderImpl(t, c)
s := makeSenderImpl(t, nil, c)
r := newRunnerMock()
a := getTestAtel(t, tel, o, s, nil, r)
require.True(t, a.enabled)
Expand Down Expand Up @@ -1841,7 +1844,7 @@ func TestHistogramPercentile(t *testing.T) {
// setup and initiate atel
tel := makeTelMock(t)
o := convertYamlStrToMap(t, c)
s := makeSenderImpl(t, c)
s := makeSenderImpl(t, nil, c)
r := newRunnerMock()
a := getTestAtel(t, tel, o, s, nil, r)
require.True(t, a.enabled)
Expand Down Expand Up @@ -1895,3 +1898,72 @@ func TestHistogramPercentile(t *testing.T) {
assert.Equal(t, 20.0, *metric.P95)
assert.Equal(t, 20.0, *metric.P99)
}

func TestUsingPayloadCompressionInAgentTelemetrySender(t *testing.T) {
// Run with compression (by default default)
var cfg1 = `
agent_telemetry:
enabled: true
profiles:
- name: xxx
metric:
metrics:
- name: foo.bar
`

tel := makeTelMock(t)
hist := tel.NewHistogram("foo", "bar", nil, "", []float64{1, 2, 5, 100})
hist.Observe(1)
hist.Observe(5)
hist.Observe(6)
hist.Observe(100)

// setup and initiate atel
o1 := convertYamlStrToMap(t, cfg1)
cl1 := newClientMock()
s1 := makeSenderImpl(t, cl1, cfg1)
r1 := newRunnerMock()
a1 := getTestAtel(t, tel, o1, s1, cl1, r1)
require.True(t, a1.enabled)

// run the runner to trigger the telemetry report
a1.start()
r1.(*runnerMock).run()
assert.True(t, len(cl1.(*clientMock).body) > 0)

// Run without compression
var cfg2 = `
agent_telemetry:
use_compression: false
enabled: true
profiles:
- name: xxx
metric:
metrics:
- name: foo.bar
aggregate_tags:
`

// setup and initiate atel
o2 := convertYamlStrToMap(t, cfg2)
cl2 := newClientMock()
s2 := makeSenderImpl(t, cl2, cfg2)
r2 := newRunnerMock()
a2 := getTestAtel(t, tel, o2, s2, cl2, r2)
require.True(t, a2.enabled)

// run the runner to trigger the telemetry report
a2.start()
r2.(*runnerMock).run()
assert.True(t, len(cl2.(*clientMock).body) > 0)
decompressBody, err := zstd.Decompress(nil, cl1.(*clientMock).body)
require.NoError(t, err)
require.NotZero(t, len(decompressBody))

// we cannot compare body (time stamp different and internal
// bucket serialization, but success above and significant size differences
// should be suffient
compressBodyLen := len(cl1.(*clientMock).body)
nonCompressBodyLen := len(cl2.(*clientMock).body)
assert.True(t, float64(nonCompressBodyLen)/float64(compressBodyLen) > 1.5)
}
37 changes: 30 additions & 7 deletions comp/core/agenttelemetry/impl/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
httputils "github.com/DataDog/datadog-agent/pkg/util/http"
"github.com/DataDog/datadog-agent/pkg/util/scrubber"
"github.com/DataDog/datadog-agent/pkg/version"
"github.com/DataDog/zstd"
)

const (
Expand Down Expand Up @@ -58,7 +59,9 @@ type senderImpl struct {
cfgComp config.Reader
logComp log.Component

client client
compress bool
compressionLevel int
client client

endpoints *logconfig.Endpoints

Expand Down Expand Up @@ -211,9 +214,12 @@ func newSenderImpl(
cfgComp: cfgComp,
logComp: logComp,

client: client,
endpoints: endpoints,
agentVersion: agentVersion.GetNumberAndPre(),
compress: cfgComp.GetBool("agent_telemetry.use_compression"),
compressionLevel: cfgComp.GetInt("agent_telemetry.compression_level"),
client: client,
endpoints: endpoints,
agentVersion: agentVersion.GetNumberAndPre(),

// pre-fill parts of payload which are not changing during run-time
payloadTemplate: Payload{
APIVersion: "v2",
Expand Down Expand Up @@ -370,11 +376,24 @@ func (s *senderImpl) flushSession(ss *senderSession) error {
return fmt.Errorf("failed to marshal agent telemetry payload: %w", err)
}

reqBody, err := scrubber.ScrubJSON(payloadJSON)
reqBodyRaw, err := scrubber.ScrubJSON(payloadJSON)
if err != nil {
return fmt.Errorf("failed to scrubl agent telemetry payload: %w", err)
}

// Try to compress the payload if needed
reqBody := reqBodyRaw
compressed := false
if s.compress {
reqBodyCompressed, err2 := zstd.CompressLevel(nil, reqBodyRaw, s.compressionLevel)
if err2 == nil {
compressed = true
reqBody = reqBodyCompressed
} else {
s.logComp.Errorf("Failed to compress agent telemetry payload: %v", err)
}
}

// Send the payload to all endpoints
var errs error
reqType := payloads.RequestType
Expand All @@ -386,7 +405,7 @@ func (s *senderImpl) flushSession(ss *senderSession) error {
errs = errors.Join(errs, err)
continue
}
s.addHeaders(req, reqType, ep.GetAPIKey(), bodyLen)
s.addHeaders(req, reqType, ep.GetAPIKey(), bodyLen, compressed)
resp, err := s.client.Do(req.WithContext(ss.cancelCtx))
if err != nil {
errs = errors.Join(errs, err)
Expand Down Expand Up @@ -436,7 +455,7 @@ func (s *senderImpl) sendAgentMetricPayloads(ss *senderSession, metrics []*agent
}
}

func (s *senderImpl) addHeaders(req *http.Request, requesttype, apikey, bodylen string) {
func (s *senderImpl) addHeaders(req *http.Request, requesttype, apikey, bodylen string, compressed bool) {
req.Header.Add("DD-Api-Key", apikey)
req.Header.Add("Content-Type", "application/json")
req.Header.Add("Content-Length", bodylen)
Expand All @@ -446,4 +465,8 @@ func (s *senderImpl) addHeaders(req *http.Request, requesttype, apikey, bodylen
req.Header.Add("DD-Telemetry-Product-Version", s.agentVersion)
// Not clear how to acquire that. Appears that EVP adds it automatically
req.Header.Add("datadog-container-id", "")

if compressed {
req.Header.Set("Content-Encoding", "zstd")
}
}
11 changes: 11 additions & 0 deletions releasenotes/notes/agent-tel-gzip-bba8a51c1aa3ba2f.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Each section from every release note are combined when the
# CHANGELOG.rst is rendered. So the text needs to be worded so that
# it does not depend on any information only available in another
# section. This may mean repeating some details, but each section
# must be readable independently of the other.
#
# Each section note must be formatted as reStructuredText.
---
enhancements:
- |
Use HTTP zstd compression for the Agent telemetry payloads.

0 comments on commit 701ea4a

Please sign in to comment.