From d9675d3c49eebcfba9b879d776513d7cc240cca0 Mon Sep 17 00:00:00 2001 From: Len Gamburg Date: Wed, 18 Dec 2024 09:02:52 -0500 Subject: [PATCH] Extended Agent telemetry histogram details - Added to a histogram's payload previously omitted and implicit `+Inf` bucket value - Added histogram's p75, p95 and p99 values (expressed as the upper-bound for the matching bucket). --- .../agenttelemetry/impl/agenttelemetry.go | 38 +- .../impl/agenttelemetry_test.go | 569 +++++++++++++++--- comp/core/agenttelemetry/impl/sender.go | 49 ++ comp/core/agenttelemetry/impl/utils.go | 5 + ...t-tel-extend-histogr-6e2da94e63edcaf8.yaml | 14 + 5 files changed, 582 insertions(+), 93 deletions(-) create mode 100644 releasenotes/notes/agent-tel-extend-histogr-6e2da94e63edcaf8.yaml diff --git a/comp/core/agenttelemetry/impl/agenttelemetry.go b/comp/core/agenttelemetry/impl/agenttelemetry.go index 2b54f2b1eb5c7..18daf57c0beb9 100644 --- a/comp/core/agenttelemetry/impl/agenttelemetry.go +++ b/comp/core/agenttelemetry/impl/agenttelemetry.go @@ -276,44 +276,52 @@ func buildKeysForMetricsPreviousValues(mt dto.MetricType, metricName string, met keyName = builder.String() } + // Add bucket names to the key if mt == dto.MetricType_HISTOGRAM { // add bucket names to the key for _, bucket := range m.Histogram.GetBucket() { keyNames = append(keyNames, fmt.Sprintf("%v:%v", keyName, bucket.GetUpperBound())) } - } else { - keyNames = append(keyNames, keyName) } + + // For regular metric (and for HISTOGRAM +Inf bucket which follows the last bucket) + keyNames = append(keyNames, keyName) } return keyNames } +// Swap current value with the previous value and deduct the previous value from the current value +func deductAndUpdatePrevValue(key string, prevPromMetricValues map[string]uint64, curValue *uint64) { + origCurValue := *curValue + if prevValue, ok := prevPromMetricValues[key]; ok { + *curValue -= prevValue + } + prevPromMetricValues[key] = origCurValue +} + func convertPromHistogramsToDatadogHistogramsValues(metrics []*dto.Metric, prevPromMetricValues map[string]uint64, keyNames []string) { if len(metrics) > 0 { bucketCount := len(metrics[0].Histogram.GetBucket()) + var prevValue uint64 + for i, m := range metrics { - // First, deduct the previous cumulative count from the current one + // 1. deduct the previous cumulative count from each explicit buckets for j, b := range m.Histogram.GetBucket() { - key := keyNames[(i*bucketCount)+j] - curValue := b.GetCumulativeCount() - - // Adjust the counter value if found - if prevValue, ok := prevPromMetricValues[key]; ok { - *b.CumulativeCount -= prevValue - } - - // Upsert the cache of previous counter values - prevPromMetricValues[key] = curValue + deductAndUpdatePrevValue(keyNames[(i*(bucketCount+1))+j], prevPromMetricValues, b.CumulativeCount) } + // 2. deduct the previous cumulative count from the implicit "+Inf" bucket + deductAndUpdatePrevValue(keyNames[((i+1)*(bucketCount+1))-1], prevPromMetricValues, m.Histogram.SampleCount) - // Then, de-cumulate next bucket value from the previous bucket values - var prevValue uint64 + // 3. "De-cumulate" next explicit bucket value from the preceding bucket value + prevValue = 0 for _, b := range m.Histogram.GetBucket() { curValue := b.GetCumulativeCount() *b.CumulativeCount -= prevValue prevValue = curValue } + // 4. "De-cumulate" implicit "+Inf" bucket value from the preceding bucket value + *m.Histogram.SampleCount -= prevValue } } } diff --git a/comp/core/agenttelemetry/impl/agenttelemetry_test.go b/comp/core/agenttelemetry/impl/agenttelemetry_test.go index f0524cae80129..333d2e1855352 100644 --- a/comp/core/agenttelemetry/impl/agenttelemetry_test.go +++ b/comp/core/agenttelemetry/impl/agenttelemetry_test.go @@ -10,6 +10,7 @@ import ( "encoding/json" "fmt" "io" + "maps" "net/http" "testing" @@ -198,6 +199,94 @@ func getCommonOverrideConfig(enabled bool, site string) map[string]any { } } +func (p *Payload) UnmarshalAgentMetrics(itfPayload map[string]interface{}) error { + var ok bool + + p.RequestType = "agent-metrics" + p.APIVersion = itfPayload["request_type"].(string) + + var metricsItfPayload map[string]interface{} + metricsItfPayload, ok = itfPayload["payload"].(map[string]interface{}) + if !ok { + return fmt.Errorf("payload not found") + } + var metricsItf map[string]interface{} + metricsItf, ok = metricsItfPayload["metrics"].(map[string]interface{}) + if !ok { + return fmt.Errorf("metrics not found") + } + + var err error + var metricsPayload AgentMetricsPayload + metricsPayload.Metrics = make(map[string]interface{}) + for k, v := range metricsItf { + if k == "agent_metadata" { + // Re(un)marshal the meatadata + var metadata AgentMetadataPayload + var metadataBytes []byte + if metadataBytes, err = json.Marshal(v); err != nil { + return err + } + if err = json.Unmarshal(metadataBytes, &metadata); err != nil { + return err + } + metricsPayload.Metrics[k] = metadata + } else { + // Re(un)marshal the metric + var metric MetricPayload + var metricBytes []byte + if metricBytes, err = json.Marshal(v); err != nil { + return err + } + if err = json.Unmarshal(metricBytes, &metric); err != nil { + return err + } + metricsPayload.Metrics[k] = metric + } + } + p.Payload = metricsPayload + return nil +} + +func (p *Payload) UnmarshalMessageBatch(itfPayload map[string]interface{}) error { + payloadsRaw, ok := itfPayload["payload"].([]interface{}) + if !ok { + return fmt.Errorf("payload not found") + } + + // ensure all payloads which should be agent-metrics + var payloads []Payload + for _, payloadRaw := range payloadsRaw { + itfChildPayload, ok := payloadRaw.(map[string]interface{}) + if !ok { + return fmt.Errorf("invalid payload item type") + } + + requestTypeRaw, ok := itfChildPayload["request_type"] + if !ok { + return fmt.Errorf("request_type not found") + } + requestType, ok := requestTypeRaw.(string) + if !ok { + return fmt.Errorf("request_type type is invalid") + } + + if requestType != "agent-metrics" { + return fmt.Errorf("request_type should be agent-metrics") + } + + var payload Payload + if err := payload.UnmarshalAgentMetrics(itfChildPayload); err != nil { + return err + } + payloads = append(payloads, payload) + + } + p.Payload = payloads + + return nil +} + // This is a unit test function do not use it for actual code (at least yet) // since it is not 100% full implementation of the unmarshalling func (p *Payload) UnmarshalJSON(b []byte) (err error) { @@ -206,60 +295,21 @@ func (p *Payload) UnmarshalJSON(b []byte) (err error) { return err } - requestType, ok := itfPayload["request_type"] + requestTypeRaw, ok := itfPayload["request_type"] if !ok { return fmt.Errorf("request_type not found") } - if requestType.(string) == "agent-metrics" { - p.RequestType = requestType.(string) - p.APIVersion = itfPayload["request_type"].(string) - p.EventTime = int64(itfPayload["event_time"].(float64)) - p.DebugFlag = itfPayload["debug"].(bool) - - var metricsItfPayload map[string]interface{} - metricsItfPayload, ok = itfPayload["payload"].(map[string]interface{}) - if !ok { - return fmt.Errorf("payload not found") - } - var metricsItf map[string]interface{} - metricsItf, ok = metricsItfPayload["metrics"].(map[string]interface{}) - if !ok { - return fmt.Errorf("metrics not found") - } + requestType, ok := requestTypeRaw.(string) + if !ok { + return fmt.Errorf("request_type type is invalid") + } - var metricsPayload AgentMetricsPayload - metricsPayload.Metrics = make(map[string]interface{}) - for k, v := range metricsItf { - if k == "agent_metadata" { - // Re(un)marshal the meatadata - var metadata AgentMetadataPayload - var metadataBytes []byte - if metadataBytes, err = json.Marshal(v); err != nil { - return err - } - if err = json.Unmarshal(metadataBytes, &metadata); err != nil { - return err - } - metricsPayload.Metrics[k] = metadata - } else { - // Re(un)marshal the metric - var metric MetricPayload - var metricBytes []byte - if metricBytes, err = json.Marshal(v); err != nil { - return err - } - if err = json.Unmarshal(metricBytes, &metric); err != nil { - return err - } - metricsPayload.Metrics[k] = metric - } - } - p.Payload = metricsPayload - return nil + if requestType == "agent-metrics" { + return p.UnmarshalAgentMetrics(itfPayload) } - if requestType.(string) == "message-batch" { - return fmt.Errorf("message-batch request_type is not supported yet") + if requestType == "message-batch" { + return p.UnmarshalMessageBatch(itfPayload) } return fmt.Errorf("request_type should be either agent-metrics or message-batch") @@ -290,6 +340,34 @@ func getPayloadMetric(a *atel, metricName string) (*MetricPayload, bool) { return nil, false } +func getPayloadMetrics(a *atel, metricName string) ([]*MetricPayload, bool) { + payload, err := getPayload(a) + if err != nil { + return nil, false + } + + var payloads []*MetricPayload + for _, payload := range payload.Payload.([]Payload) { + metrics := payload.Payload.(AgentMetricsPayload).Metrics + if metricItf, ok := metrics[metricName]; ok { + metric := metricItf.(MetricPayload) + payloads = append(payloads, &metric) + } + } + + return payloads, true +} + +func getPayloadMetricByTagValues(metrics []*MetricPayload, tags map[string]interface{}) (*MetricPayload, bool) { + for _, m := range metrics { + if maps.Equal(m.Tags, tags) { + return m, true + } + } + + return nil, false +} + // Validate the payload // metric, ok := metrics["foo.bar"] @@ -1113,16 +1191,20 @@ func TestHistogramFloatUpperBoundNormalization(t *testing.T) { hist.Observe(100) hist.Observe(100) hist.Observe(100) + // +inf - 2 + hist.Observe(10000) + hist.Observe(20000) // Test payload1 metric1, ok := getPayloadMetric(a, "foo.bar") require.True(t, ok) - require.True(t, len(metric1.Buckets) > 0) + require.Len(t, metric1.Buckets, 5) expecVals1 := map[string]uint64{ - "1": 5, - "2": 0, - "5": 3, - "100": 6, + "1": 5, + "2": 0, + "5": 3, + "100": 6, + "+Inf": 2, } for k, b := range metric1.Buckets { assert.Equal(t, expecVals1[k], b) @@ -1131,12 +1213,13 @@ func TestHistogramFloatUpperBoundNormalization(t *testing.T) { // Test payload2 (no new observations, everything is reset) metric2, ok := getPayloadMetric(a, "foo.bar") require.True(t, ok) - require.True(t, len(metric2.Buckets) > 0) + require.Len(t, metric2.Buckets, 5) expecVals2 := map[string]uint64{ - "1": 0, - "2": 0, - "5": 0, - "100": 0, + "1": 0, + "2": 0, + "5": 0, + "100": 0, + "+Inf": 0, } for k, b := range metric2.Buckets { assert.Equal(t, expecVals2[k], b) @@ -1162,15 +1245,21 @@ func TestHistogramFloatUpperBoundNormalization(t *testing.T) { hist.Observe(100) hist.Observe(100) hist.Observe(100) + // +inf - 3 + hist.Observe(10000) + hist.Observe(20000) + hist.Observe(30000) + // Test payload3 metric3, ok := getPayloadMetric(a, "foo.bar") require.True(t, ok) - require.True(t, len(metric3.Buckets) > 0) + require.Len(t, metric3.Buckets, 5) expecVals3 := map[string]uint64{ - "1": 5, - "2": 0, - "5": 3, - "100": 6, + "1": 5, + "2": 0, + "5": 3, + "100": 6, + "+Inf": 3, } for k, b := range metric3.Buckets { assert.Equal(t, expecVals3[k], b) @@ -1232,12 +1321,13 @@ func TestHistogramFloatUpperBoundNormalizationWithTags(t *testing.T) { // Test payload1 metric1, ok := getPayloadMetric(a, "foo.bar") require.True(t, ok) - require.True(t, len(metric1.Buckets) > 0) + require.Len(t, metric1.Buckets, 5) expecVals1 := map[string]uint64{ - "1": 5, - "2": 0, - "5": 3, - "100": 6, + "1": 5, + "2": 0, + "5": 3, + "100": 6, + "+inf": 0, } for k, b := range metric1.Buckets { assert.Equal(t, expecVals1[k], b) @@ -1246,12 +1336,13 @@ func TestHistogramFloatUpperBoundNormalizationWithTags(t *testing.T) { // Test payload2 (no new observations, everything is reset) metric2, ok := getPayloadMetric(a, "foo.bar") require.True(t, ok) - require.True(t, len(metric2.Buckets) > 0) + require.Len(t, metric2.Buckets, 5) expecVals2 := map[string]uint64{ - "1": 0, - "2": 0, - "5": 0, - "100": 0, + "1": 0, + "2": 0, + "5": 0, + "100": 0, + "+inf": 0, } for k, b := range metric2.Buckets { assert.Equal(t, expecVals2[k], b) @@ -1280,12 +1371,13 @@ func TestHistogramFloatUpperBoundNormalizationWithTags(t *testing.T) { // Test payload3 metric3, ok := getPayloadMetric(a, "foo.bar") require.True(t, ok) - require.True(t, len(metric3.Buckets) > 0) + require.Len(t, metric3.Buckets, 5) expecVals3 := map[string]uint64{ - "1": 5, - "2": 0, - "5": 3, - "100": 6, + "1": 5, + "2": 0, + "5": 3, + "100": 6, + "+inf": 0, } for k, b := range metric3.Buckets { assert.Equal(t, expecVals3[k], b) @@ -1299,3 +1391,324 @@ func TestHistogramFloatUpperBoundNormalizationWithTags(t *testing.T) { assert.Equal(t, expecVals4[i], b.Count) } } + +func TestHistogramFloatUpperBoundNormalizationWithMultivalueTags(t *testing.T) { + var c = ` + agent_telemetry: + enabled: true + profiles: + - name: xxx + metric: + metrics: + - name: foo.bar + aggregate_tags: + - tag + ` + + // setup and initiate atel + tel := makeTelMock(t) + o := convertYamlStrToMap(t, c) + s := makeSenderImpl(t, c) + r := newRunnerMock() + a := getTestAtel(t, tel, o, s, nil, r) + require.True(t, a.enabled) + + // setup and initiate atel + hist := tel.NewHistogram("foo", "bar", []string{"tag"}, "", []float64{1, 2, 5, 100}) + + // bucket 0 - 5 + hist.Observe(1, "val1") + hist.Observe(1, "val1") + hist.Observe(1, "val1") + hist.Observe(1, "val1") + hist.Observe(1, "val1") + // bucket 1 - 0 + // .. + // bucket 2 - 3 + hist.Observe(5, "val1") + hist.Observe(5, "val1") + hist.Observe(5, "val1") + // bucket 4 - 6 + hist.Observe(6, "val1") + hist.Observe(100, "val1") + hist.Observe(100, "val1") + hist.Observe(100, "val1") + hist.Observe(100, "val1") + hist.Observe(100, "val1") + // bucket +inf - 2 + hist.Observe(1000, "val1") + hist.Observe(2000, "val1") + + // bucket 0 - 10 + hist.Observe(1, "val2") + hist.Observe(1, "val2") + hist.Observe(1, "val2") + hist.Observe(1, "val2") + hist.Observe(1, "val2") + hist.Observe(1, "val2") + hist.Observe(1, "val2") + hist.Observe(1, "val2") + hist.Observe(1, "val2") + hist.Observe(1, "val2") + // bucket 1 - 5 + hist.Observe(2, "val2") + hist.Observe(2, "val2") + hist.Observe(2, "val2") + hist.Observe(2, "val2") + hist.Observe(2, "val2") + // bucket 2 - 6 + hist.Observe(5, "val2") + hist.Observe(5, "val2") + hist.Observe(5, "val2") + hist.Observe(5, "val2") + hist.Observe(5, "val2") + hist.Observe(5, "val2") + // bucket 4 - 12 + hist.Observe(6, "val2") + hist.Observe(6, "val2") + hist.Observe(100, "val2") + hist.Observe(100, "val2") + hist.Observe(100, "val2") + hist.Observe(100, "val2") + hist.Observe(100, "val2") + hist.Observe(100, "val2") + hist.Observe(100, "val2") + hist.Observe(100, "val2") + hist.Observe(100, "val2") + hist.Observe(100, "val2") + // bucket +inf - 4 + hist.Observe(1000, "val2") + hist.Observe(1000, "val2") + hist.Observe(2000, "val2") + hist.Observe(2000, "val2") + + // Test payload1 + metrics1, ok := getPayloadMetrics(a, "foo.bar") + require.True(t, ok) + require.Len(t, metrics1, 2) + require.Len(t, metrics1[0].Buckets, 5) + expecVals1 := map[string]struct { + n1 uint64 + n2 uint64 + }{ + "1": {5, 10}, + "2": {0, 5}, + "5": {3, 6}, + "100": {6, 12}, + "+Inf": {2, 4}, + } + metrics11, ok := getPayloadMetricByTagValues(metrics1, map[string]interface{}{"tag": "val1"}) + require.True(t, ok) + for k, b := range metrics11.Buckets { + assert.Equal(t, expecVals1[k].n1, b) + } + metrics12, ok := getPayloadMetricByTagValues(metrics1, map[string]interface{}{"tag": "val2"}) + require.True(t, ok) + for k, b := range metrics12.Buckets { + assert.Equal(t, expecVals1[k].n2, b) + } + + // Test payload2 (no new observations, everything is reset) + metrics2, ok := getPayloadMetrics(a, "foo.bar") + require.True(t, ok) + require.Len(t, metrics2, 2) + require.Len(t, metrics2[0].Buckets, 5) + require.Len(t, metrics2[1].Buckets, 5) + expecVals2 := map[string]struct { + n1 uint64 + n2 uint64 + }{ + "1": {0, 0}, + "2": {0, 0}, + "5": {0, 0}, + "100": {0, 0}, + "+Inf": {0, 0}, + } + metrics21, ok := getPayloadMetricByTagValues(metrics2, map[string]interface{}{"tag": "val1"}) + require.True(t, ok) + for k, b := range metrics21.Buckets { + assert.Equal(t, expecVals2[k].n1, b) + } + metrics22, ok := getPayloadMetricByTagValues(metrics2, map[string]interface{}{"tag": "val2"}) + require.True(t, ok) + for k, b := range metrics22.Buckets { + assert.Equal(t, expecVals2[k].n2, b) + } + + // Repeat the same observation with the same results) + // bucket 0 - 5 + hist.Observe(1, "val1") + hist.Observe(1, "val1") + hist.Observe(1, "val1") + hist.Observe(1, "val1") + hist.Observe(1, "val1") + // bucket 1 - 0 + // .. + // bucket 2 - 3 + hist.Observe(5, "val1") + hist.Observe(5, "val1") + hist.Observe(5, "val1") + // bucket 4 - 6 + hist.Observe(6, "val1") + hist.Observe(100, "val1") + hist.Observe(100, "val1") + hist.Observe(100, "val1") + hist.Observe(100, "val1") + hist.Observe(100, "val1") + // bucket +inf - 2 + hist.Observe(1000, "val1") + hist.Observe(2000, "val1") + + // bucket 0 - 10 + hist.Observe(1, "val2") + hist.Observe(1, "val2") + hist.Observe(1, "val2") + hist.Observe(1, "val2") + hist.Observe(1, "val2") + hist.Observe(1, "val2") + hist.Observe(1, "val2") + hist.Observe(1, "val2") + hist.Observe(1, "val2") + hist.Observe(1, "val2") + // bucket 1 - 5 + hist.Observe(2, "val2") + hist.Observe(2, "val2") + hist.Observe(2, "val2") + hist.Observe(2, "val2") + hist.Observe(2, "val2") + // bucket 2 - 6 + hist.Observe(5, "val2") + hist.Observe(5, "val2") + hist.Observe(5, "val2") + hist.Observe(5, "val2") + hist.Observe(5, "val2") + hist.Observe(5, "val2") + // bucket 4 - 12 + hist.Observe(6, "val2") + hist.Observe(6, "val2") + hist.Observe(100, "val2") + hist.Observe(100, "val2") + hist.Observe(100, "val2") + hist.Observe(100, "val2") + hist.Observe(100, "val2") + hist.Observe(100, "val2") + hist.Observe(100, "val2") + hist.Observe(100, "val2") + hist.Observe(100, "val2") + hist.Observe(100, "val2") + // bucket +inf - 4 + hist.Observe(1000, "val2") + hist.Observe(1000, "val2") + hist.Observe(2000, "val2") + hist.Observe(2000, "val2") + + // Test payload3 + metrics3, ok := getPayloadMetrics(a, "foo.bar") + require.True(t, ok) + require.Len(t, metrics3, 2) + require.Len(t, metrics3[0].Buckets, 5) + require.Len(t, metrics3[1].Buckets, 5) + expecVals3 := map[string]struct { + n1 uint64 + n2 uint64 + }{ + "1": {5, 10}, + "2": {0, 5}, + "5": {3, 6}, + "100": {6, 12}, + "+Inf": {2, 4}, + } + metrics31, ok := getPayloadMetricByTagValues(metrics3, map[string]interface{}{"tag": "val1"}) + require.True(t, ok) + for k, b := range metrics31.Buckets { + assert.Equal(t, expecVals3[k].n1, b) + } + metrics32, ok := getPayloadMetricByTagValues(metrics3, map[string]interface{}{"tag": "val2"}) + require.True(t, ok) + for k, b := range metrics32.Buckets { + assert.Equal(t, expecVals3[k].n2, b) + } + + // Test raw buckets, they should be still accumulated + tags1 := map[string]string{"tag": "val1"} + rawHist1 := hist.WithTags(tags1) + expecVals41 := []uint64{10, 10, 16, 28} + for i, b := range rawHist1.Get().Buckets { + assert.Equal(t, expecVals41[i], b.Count) + } + tags2 := map[string]string{"tag": "val2"} + rawHist2 := hist.WithTags(tags2) + expecVals42 := []uint64{20, 30, 42, 66} + for i, b := range rawHist2.Get().Buckets { + assert.Equal(t, expecVals42[i], b.Count) + } +} + +func TestHistogramPercentile(t *testing.T) { + var c = ` + agent_telemetry: + enabled: true + profiles: + - name: xxx + metric: + metrics: + - name: foo.bar + ` + + // setup and initiate atel + tel := makeTelMock(t) + o := convertYamlStrToMap(t, c) + s := makeSenderImpl(t, c) + r := newRunnerMock() + a := getTestAtel(t, tel, o, s, nil, r) + require.True(t, a.enabled) + + // setup and initiate atel + hist := tel.NewHistogram("foo", "bar", nil, "", []float64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}) + for i := 1; i <= 10; i++ { + hist.Observe(1) + hist.Observe(2) + hist.Observe(3) + hist.Observe(4) + hist.Observe(5) + hist.Observe(6) + hist.Observe(7) + hist.Observe(8) + hist.Observe(9) + } + hist.Observe(10) + hist.Observe(10) + + metric, ok := getPayloadMetric(a, "foo.bar") + require.True(t, ok) + require.NotNil(t, metric.P75) + require.NotNil(t, metric.P95) + require.NotNil(t, metric.P99) + + // 75% of 92 observations is 69.0 (upper bound of the 6th bucket - 7) + // 95% of 92 observations is 87.0 (upper bound of the 8th bucket - 9) + // 95% of 92 observations is 92.0 (upper bound of the 10th bucket - 10) + assert.Equal(t, 7.0, *metric.P75) + assert.Equal(t, 9.0, *metric.P95) + assert.Equal(t, 10.0, *metric.P99) + + // Test percentile in +Inf upper bound (p75 in 10th bucket) and p95 and p99 in +Inf bucket) + for i := 1; i <= 10; i++ { + hist.Observe(10) + } + for i := 1; i <= 4; i++ { + hist.Observe(11) + } + + metric, ok = getPayloadMetric(a, "foo.bar") + require.True(t, ok) + require.NotNil(t, metric.P75) + require.NotNil(t, metric.P95) + require.NotNil(t, metric.P99) + + // For percentile point of view +Inf bucket upper boundary is 2x of last explicit upper boundary + // maybe in the future it will be configurable + assert.Equal(t, 10.0, *metric.P75) + assert.Equal(t, 20.0, *metric.P95) + assert.Equal(t, 20.0, *metric.P99) +} diff --git a/comp/core/agenttelemetry/impl/sender.go b/comp/core/agenttelemetry/impl/sender.go index 585807eb6a539..f489667dc8eb8 100644 --- a/comp/core/agenttelemetry/impl/sender.go +++ b/comp/core/agenttelemetry/impl/sender.go @@ -11,6 +11,7 @@ import ( "encoding/json" "errors" "fmt" + "math" "net/http" "net/url" "strconv" @@ -133,6 +134,9 @@ type MetricPayload struct { Type string `json:"type"` Tags map[string]interface{} `json:"tags,omitempty"` Buckets map[string]uint64 `json:"buckets,omitempty"` + P75 *float64 `json:"p75,omitempty"` + P95 *float64 `json:"p95,omitempty"` + P99 *float64 `json:"p99,omitempty"` } func httpClientFactory(cfg config.Reader, timeout time.Duration) func() *http.Client { @@ -253,8 +257,53 @@ func (s *senderImpl) addMetricPayload( for _, bucket := range histogram.GetBucket() { boundNameRaw := fmt.Sprintf("%v", bucket.GetUpperBound()) boundName := strings.ReplaceAll(boundNameRaw, ".", "_") + payload.Buckets[boundName] = bucket.GetCumulativeCount() } + payload.Buckets["+Inf"] = histogram.GetSampleCount() + + // Calculate fixed 75, 95 and 99 precentiles. Percentile calculation finds + // a bucket which, with all preceding buckets, contains that percentile item. + // For convenience, percentile values are not the bucket number but its + // upper-bound. If a percentile belongs to the implicit "+inf" bucket, which + // has no explicit upper-bound, we will use the last bucket upper bound times 2. + // The upper-bound of the "+Inf" bucket is defined as 2x of the preceding + // bucket boundary, but it is totally arbitrary. In the future we may use a + // configuration value to set it up. + var totalCount uint64 + for _, bucket := range histogram.GetBucket() { + totalCount += bucket.GetCumulativeCount() + } + totalCount += histogram.GetSampleCount() + p75 := uint64(math.Floor(float64(totalCount) * 0.75)) + p95 := uint64(math.Floor(float64(totalCount) * 0.95)) + p99 := uint64(math.Floor(float64(totalCount) * 0.99)) + var curCount uint64 + for _, bucket := range histogram.GetBucket() { + curCount += bucket.GetCumulativeCount() + if payload.P75 == nil && curCount >= p75 { + p75Value := bucket.GetUpperBound() + payload.P75 = &p75Value + } + if payload.P95 == nil && curCount >= p95 { + p95Value := bucket.GetUpperBound() + payload.P95 = &p95Value + } + if payload.P99 == nil && curCount >= p99 { + p99Value := bucket.GetUpperBound() + payload.P99 = &p99Value + } + } + maxUpperBound := 2 * (histogram.GetBucket()[len(histogram.GetBucket())-1].GetUpperBound()) + if payload.P75 == nil { + payload.P75 = &maxUpperBound + } + if payload.P95 == nil { + payload.P95 = &maxUpperBound + } + if payload.P99 == nil { + payload.P99 = &maxUpperBound + } } // Add metric tags diff --git a/comp/core/agenttelemetry/impl/utils.go b/comp/core/agenttelemetry/impl/utils.go index 07f6b86aebf40..bac723115698d 100644 --- a/comp/core/agenttelemetry/impl/utils.go +++ b/comp/core/agenttelemetry/impl/utils.go @@ -98,11 +98,16 @@ func aggregateMetric(mt dto.MetricType, aggm *dto.Metric, srcm *dto.Metric) { aggmb.Exemplar.Label = nil } } + + // copy the sample count (it is implicit "+Inf" bucket) + aggm.Histogram.SampleCount = srcm.Histogram.SampleCount } else { // for the same metric family bucket structure is the same for i, srcb := range srcm.Histogram.Bucket { *aggm.Histogram.Bucket[i].CumulativeCount += srcb.GetCumulativeCount() } + // copy the sample count (it is implicit "+Inf" bucket) + *aggm.Histogram.SampleCount += srcm.Histogram.GetSampleCount() } } } diff --git a/releasenotes/notes/agent-tel-extend-histogr-6e2da94e63edcaf8.yaml b/releasenotes/notes/agent-tel-extend-histogr-6e2da94e63edcaf8.yaml new file mode 100644 index 0000000000000..9ab3b31035239 --- /dev/null +++ b/releasenotes/notes/agent-tel-extend-histogr-6e2da94e63edcaf8.yaml @@ -0,0 +1,14 @@ +# Each section from every release note are combined when the +# CHANGELOG.rst is rendered. So the text needs to be worded so that +# it does not depend on any information only available in another +# section. This may mean repeating some details, but each section +# must be readable independently of the other. +# +# Each section note must be formatted as reStructuredText. +--- +enhancements: + - | + Extended Agent telemetry histogram details, specifically: + - Added to a histogram's payload previously omitted and implicit `+Inf` bucket value + - Added histogram's p75, p95 and p99 values (expressed as the upper-bound for the + matching bucket).