Skip to content

Commit

Permalink
Use HTTP gzip compression for the Agent telemetry payloads
Browse files Browse the repository at this point in the history
  • Loading branch information
iglendd committed Dec 24, 2024
1 parent f5d99e9 commit e6aff9b
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 28 deletions.
121 changes: 100 additions & 21 deletions comp/core/agenttelemetry/impl/agenttelemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package agenttelemetryimpl

import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -138,12 +140,14 @@ func makeLogMock(t *testing.T) log.Component {
return logmock.New(t)
}

func makeSenderImpl(t *testing.T, c string) sender {
func makeSenderImpl(t *testing.T, cl client, c string) sender {
o := convertYamlStrToMap(t, c)
cfg := makeCfgMock(t, o)
log := makeLogMock(t)
client := newClientMock()
sndr, err := newSenderImpl(cfg, log, client)
if cl == nil {
cl = newClientMock()
}
sndr, err := newSenderImpl(cfg, log, cl)
assert.NoError(t, err)
return sndr
}
Expand Down Expand Up @@ -474,10 +478,10 @@ func TestNoTagSpecifiedAggregationHistogram(t *testing.T) {
// setup and initiate atel
tel := makeTelMock(t)
buckets := []float64{10, 100, 1000, 10000}
gauge := tel.NewHistogram("bar", "zoo", []string{"tag1", "tag2", "tag3"}, "", buckets)
gauge.WithTags(map[string]string{"tag1": "a1", "tag2": "b1", "tag3": "c1"}).Observe(1001)
gauge.WithTags(map[string]string{"tag1": "a2", "tag2": "b2", "tag3": "c2"}).Observe(1002)
gauge.WithTags(map[string]string{"tag1": "a3", "tag2": "b3", "tag3": "c3"}).Observe(1003)
hist := tel.NewHistogram("bar", "zoo", []string{"tag1", "tag2", "tag3"}, "", buckets)
hist.WithTags(map[string]string{"tag1": "a1", "tag2": "b1", "tag3": "c1"}).Observe(1001)
hist.WithTags(map[string]string{"tag1": "a2", "tag2": "b2", "tag3": "c2"}).Observe(1002)
hist.WithTags(map[string]string{"tag1": "a3", "tag2": "b3", "tag3": "c3"}).Observe(1003)

o := convertYamlStrToMap(t, c)
s := &senderMock{}
Expand Down Expand Up @@ -636,7 +640,7 @@ func TestTwoProfilesOnTheSameScheduleGenerateSinglePayload(t *testing.T) {
counter2.AddWithTags(20, map[string]string{"tag1": "a1", "tag2": "b1", "tag3": "c1"})

o := convertYamlStrToMap(t, c)
s := makeSenderImpl(t, c)
s := makeSenderImpl(t, nil, c)
r := newRunnerMock()
a := getTestAtel(t, tel, o, s, nil, r)
require.True(t, a.enabled)
Expand Down Expand Up @@ -673,7 +677,7 @@ func TestOneProfileWithOneMetricMultipleContextsGenerateTwoPayloads(t *testing.T
counter1.AddWithTags(20, map[string]string{"tag1": "a2", "tag2": "b2", "tag3": "c2"})

o := convertYamlStrToMap(t, c)
s := makeSenderImpl(t, c)
s := makeSenderImpl(t, nil, c)
r := newRunnerMock()
a := getTestAtel(t, tel, o, s, nil, r)
require.True(t, a.enabled)
Expand Down Expand Up @@ -749,7 +753,7 @@ func TestOneProfileWithTwoMetricGenerateSinglePayloads(t *testing.T) {
counter2.AddWithTags(20, map[string]string{"tag1": "a1", "tag2": "b1", "tag3": "c1"})

o := convertYamlStrToMap(t, c)
s := makeSenderImpl(t, c)
s := makeSenderImpl(t, nil, c)
r := newRunnerMock()
a := getTestAtel(t, tel, o, s, nil, r)
require.True(t, a.enabled)
Expand All @@ -771,7 +775,7 @@ func TestSenderConfigNoConfig(t *testing.T) {
agent_telemetry:
enabled: true
`
sndr := makeSenderImpl(t, c)
sndr := makeSenderImpl(t, nil, c)

url := buildURL(sndr.(*senderImpl).endpoints.Main)
assert.Equal(t, "https://instrumentation-telemetry-intake.datadoghq.com/api/v2/apmtelemetry", url)
Expand Down Expand Up @@ -799,7 +803,7 @@ func TestSenderConfigOnlySites(t *testing.T) {

for _, tt := range tests {
c := fmt.Sprintf(ctemp, tt.site)
sndr := makeSenderImpl(t, c)
sndr := makeSenderImpl(t, nil, c)
url := buildURL(sndr.(*senderImpl).endpoints.Main)
assert.Equal(t, tt.testURL, url)
}
Expand All @@ -816,7 +820,7 @@ func TestSenderConfigAdditionalEndpoint(t *testing.T) {
- api_key: bar
host: instrumentation-telemetry-intake.us5.datadoghq.com
`
sndr := makeSenderImpl(t, c)
sndr := makeSenderImpl(t, nil, c)
assert.NotNil(t, sndr)

assert.Len(t, sndr.(*senderImpl).endpoints.Endpoints, 2)
Expand All @@ -835,7 +839,7 @@ func TestSenderConfigPartialDDUrl(t *testing.T) {
enabled: true
dd_url: instrumentation-telemetry-intake.us5.datadoghq.com.
`
sndr := makeSenderImpl(t, c)
sndr := makeSenderImpl(t, nil, c)
assert.NotNil(t, sndr)

assert.Len(t, sndr.(*senderImpl).endpoints.Endpoints, 1)
Expand All @@ -852,7 +856,7 @@ func TestSenderConfigFullDDUrl(t *testing.T) {
enabled: true
dd_url: https://instrumentation-telemetry-intake.us5.datadoghq.com.
`
sndr := makeSenderImpl(t, c)
sndr := makeSenderImpl(t, nil, c)
assert.NotNil(t, sndr)

assert.Len(t, sndr.(*senderImpl).endpoints.Endpoints, 1)
Expand All @@ -872,7 +876,7 @@ func TestSenderConfigDDUrlWithAdditionalEndpoints(t *testing.T) {
- api_key: bar
host: instrumentation-telemetry-intake.us3.datadoghq.com.
`
sndr := makeSenderImpl(t, c)
sndr := makeSenderImpl(t, nil, c)
assert.NotNil(t, sndr)

assert.Len(t, sndr.(*senderImpl).endpoints.Endpoints, 2)
Expand All @@ -892,7 +896,7 @@ func TestSenderConfigDDUrlWithEmptyAdditionalPoint(t *testing.T) {
dd_url: instrumentation-telemetry-intake.us5.datadoghq.com.
additional_endpoints:
`
sndr := makeSenderImpl(t, c)
sndr := makeSenderImpl(t, nil, c)
assert.NotNil(t, sndr)

assert.Len(t, sndr.(*senderImpl).endpoints.Endpoints, 1)
Expand Down Expand Up @@ -932,7 +936,7 @@ func TestGetAsJSONScrub(t *testing.T) {
counter3.AddWithTags(11, map[string]string{"text": "test"})

o := convertYamlStrToMap(t, c)
s := makeSenderImpl(t, c)
s := makeSenderImpl(t, nil, c)
r := newRunnerMock()
a := getTestAtel(t, tel, o, s, nil, r)
require.True(t, a.enabled)
Expand Down Expand Up @@ -980,7 +984,7 @@ func TestAdjustPrometheusCounterValue(t *testing.T) {
// setup and initiate atel
tel := makeTelMock(t)
o := convertYamlStrToMap(t, c)
s := makeSenderImpl(t, c)
s := makeSenderImpl(t, nil, c)
r := newRunnerMock()
a := getTestAtel(t, tel, o, s, nil, r)
require.True(t, a.enabled)
Expand Down Expand Up @@ -1087,7 +1091,7 @@ func TestHistogramFloatUpperBoundNormalization(t *testing.T) {
// setup and initiate atel
tel := makeTelMock(t)
o := convertYamlStrToMap(t, c)
s := makeSenderImpl(t, c)
s := makeSenderImpl(t, nil, c)
r := newRunnerMock()
a := getTestAtel(t, tel, o, s, nil, r)
require.True(t, a.enabled)
Expand Down Expand Up @@ -1202,7 +1206,7 @@ func TestHistogramFloatUpperBoundNormalizationWithTags(t *testing.T) {
// setup and initiate atel
tel := makeTelMock(t)
o := convertYamlStrToMap(t, c)
s := makeSenderImpl(t, c)
s := makeSenderImpl(t, nil, c)
r := newRunnerMock()
a := getTestAtel(t, tel, o, s, nil, r)
require.True(t, a.enabled)
Expand Down Expand Up @@ -1299,3 +1303,78 @@ func TestHistogramFloatUpperBoundNormalizationWithTags(t *testing.T) {
assert.Equal(t, expecVals4[i], b.Count)
}
}

func TestUsingGZipCompressionInAgentTelemetrySender(t *testing.T) {
// Run with gzip (default)
var cfg1 = `
agent_telemetry:
enabled: true
profiles:
- name: xxx
metric:
metrics:
- name: foo.bar
`

tel := makeTelMock(t)
hist := tel.NewHistogram("foo", "bar", nil, "", []float64{1, 2, 5, 100})
hist.Observe(1)
hist.Observe(5)
hist.Observe(6)
hist.Observe(100)

// setup and initiate atel
o1 := convertYamlStrToMap(t, cfg1)
cl1 := newClientMock()
s1 := makeSenderImpl(t, cl1, cfg1)
r1 := newRunnerMock()
a1 := getTestAtel(t, tel, o1, s1, cl1, r1)
require.True(t, a1.enabled)

// run the runner to trigger the telemetry report
a1.start()
r1.(*runnerMock).run()
assert.True(t, len(cl1.(*clientMock).body) > 0)

// Run without gzip
var cfg2 = `
agent_telemetry:
use_compression: false
enabled: true
profiles:
- name: xxx
metric:
metrics:
- name: foo.bar
aggregate_tags:
`

// setup and initiate atel
o2 := convertYamlStrToMap(t, cfg2)
cl2 := newClientMock()
s2 := makeSenderImpl(t, cl2, cfg2)
r2 := newRunnerMock()
a2 := getTestAtel(t, tel, o2, s2, cl2, r2)
require.True(t, a2.enabled)

// run the runner to trigger the telemetry report
a2.start()
r2.(*runnerMock).run()
assert.True(t, len(cl2.(*clientMock).body) > 0)

// Check gzip vs. no-gzip body
compressReader := bytes.NewReader(cl1.(*clientMock).body)
gziper, err := gzip.NewReader(compressReader)
require.NoError(t, err)
defer gziper.Close()
var decompress bytes.Buffer
_, err = io.Copy(&decompress, gziper)
require.NoError(t, err)

// we cannot compare body (time stamp different and internal
// bucket serialization, but success above and significant size differences
// should be suffient
compressBodyLen := len(cl1.(*clientMock).body)
nonCompressBodyLen := len(cl2.(*clientMock).body)
assert.True(t, float64(nonCompressBodyLen)/float64(compressBodyLen) > 1.5)
}
36 changes: 29 additions & 7 deletions comp/core/agenttelemetry/impl/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ type senderImpl struct {
cfgComp config.Reader
logComp log.Component

client client
compress bool
compressionLevel int
client client

endpoints *logconfig.Endpoints

Expand Down Expand Up @@ -207,9 +209,12 @@ func newSenderImpl(
cfgComp: cfgComp,
logComp: logComp,

client: client,
endpoints: endpoints,
agentVersion: agentVersion.GetNumberAndPre(),
compress: cfgComp.GetBool("agent_telemetry.use_compression"),
compressionLevel: cfgComp.GetInt("agent_telemetry.compression_level"),
client: client,
endpoints: endpoints,
agentVersion: agentVersion.GetNumberAndPre(),

// pre-fill parts of payload which are not changing during run-time
payloadTemplate: Payload{
APIVersion: "v2",
Expand Down Expand Up @@ -321,11 +326,24 @@ func (s *senderImpl) flushSession(ss *senderSession) error {
return fmt.Errorf("failed to marshal agent telemetry payload: %w", err)
}

reqBody, err := scrubber.ScrubJSON(payloadJSON)
reqBodyRaw, err := scrubber.ScrubJSON(payloadJSON)
if err != nil {
return fmt.Errorf("failed to scrubl agent telemetry payload: %w", err)
}

// Try to compress the payload if needed
reqBody := reqBodyRaw
gzipCompressed := false
if s.compress {
reqBodyCompressed, err2 := gzipCompress(reqBodyRaw, s.compressionLevel)
if err2 == nil {
gzipCompressed = true
reqBody = reqBodyCompressed
} else {
s.logComp.Errorf("Failed to compress agent telemetry payload: %v", err)
}
}

// Send the payload to all endpoints
var errs error
reqType := payloads.RequestType
Expand All @@ -337,7 +355,7 @@ func (s *senderImpl) flushSession(ss *senderSession) error {
errs = errors.Join(errs, err)
continue
}
s.addHeaders(req, reqType, ep.GetAPIKey(), bodyLen)
s.addHeaders(req, reqType, ep.GetAPIKey(), bodyLen, gzipCompressed)
resp, err := s.client.Do(req.WithContext(ss.cancelCtx))
if err != nil {
errs = errors.Join(errs, err)
Expand Down Expand Up @@ -387,7 +405,7 @@ func (s *senderImpl) sendAgentMetricPayloads(ss *senderSession, metrics []*agent
}
}

func (s *senderImpl) addHeaders(req *http.Request, requesttype, apikey, bodylen string) {
func (s *senderImpl) addHeaders(req *http.Request, requesttype, apikey, bodylen string, gzipCompressed bool) {
req.Header.Add("DD-Api-Key", apikey)
req.Header.Add("Content-Type", "application/json")
req.Header.Add("Content-Length", bodylen)
Expand All @@ -397,4 +415,8 @@ func (s *senderImpl) addHeaders(req *http.Request, requesttype, apikey, bodylen
req.Header.Add("DD-Telemetry-Product-Version", s.agentVersion)
// Not clear how to acquire that. Appears that EVP adds it automatically
req.Header.Add("datadog-container-id", "")

if gzipCompressed {
req.Header.Set("Content-Encoding", "gzip")
}
}
24 changes: 24 additions & 0 deletions comp/core/agenttelemetry/impl/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package agenttelemetryimpl

import (
"bytes"
"compress/gzip"
"fmt"
"sort"

Expand Down Expand Up @@ -123,3 +125,25 @@ func cloneLabelsSorted(labels []*dto.LabelPair) []*dto.LabelPair {
func makeLabelPairKey(l *dto.LabelPair) string {
return fmt.Sprintf("%s:%s:", l.GetName(), l.GetValue())
}

// Compresses payload via gzip
func gzipCompress(payload []byte, compressionLevel int) ([]byte, error) {
var payloadCompressed bytes.Buffer
gzipWriter, err := gzip.NewWriterLevel(&payloadCompressed, compressionLevel)
if err != nil {
return nil, err
}
_, err = gzipWriter.Write(payload)
if err != nil {
return nil, err
}
err = gzipWriter.Flush()
if err != nil {
return nil, err
}
err = gzipWriter.Close()
if err != nil {
return nil, err
}
return payloadCompressed.Bytes(), nil
}
11 changes: 11 additions & 0 deletions releasenotes/notes/agent-tel-gzip-bba8a51c1aa3ba2f.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Each section from every release note are combined when the
# CHANGELOG.rst is rendered. So the text needs to be worded so that
# it does not depend on any information only available in another
# section. This may mean repeating some details, but each section
# must be readable independently of the other.
#
# Each section note must be formatted as reStructuredText.
---
enhancements:
- |
Use HTTP gzip compression for the Agent telemetry payloads.

0 comments on commit e6aff9b

Please sign in to comment.