From e39609ec675bb4ce12524d1ede6decce44db2bc9 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab <43658574+mahadzaryab1@users.noreply.github.com> Date: Sun, 22 Dec 2024 21:47:16 -0500 Subject: [PATCH 1/6] [v2][adjuster] Implement adjuster for correct timestamps for clockskew (#6392) ## Which problem is this PR solving? - Towards #6344 ## Description of the changes - Implemented an adjuster to correct timestamps for clockskew. ## How was this change tested? - Added unit tests ## Checklist - [x] I have read https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md - [x] I have signed all commits - [x] I have added unit tests for the new functionality - [x] I have run lint and test steps successfully - for `jaeger`: `make lint test` - for `jaeger-ui`: `npm run lint` and `npm run test` --------- Signed-off-by: Mahad Zaryab --- cmd/query/app/querysvc/adjuster/clockskew.go | 207 +++++++++++++ .../app/querysvc/adjuster/clockskew_test.go | 292 ++++++++++++++++++ pkg/otelsemconv/semconv.go | 4 + 3 files changed, 503 insertions(+) create mode 100644 cmd/query/app/querysvc/adjuster/clockskew.go create mode 100644 cmd/query/app/querysvc/adjuster/clockskew_test.go diff --git a/cmd/query/app/querysvc/adjuster/clockskew.go b/cmd/query/app/querysvc/adjuster/clockskew.go new file mode 100644 index 00000000000..0f0bbc40e63 --- /dev/null +++ b/cmd/query/app/querysvc/adjuster/clockskew.go @@ -0,0 +1,207 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package adjuster + +import ( + "fmt" + "time" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/jaegertracing/jaeger/internal/jptrace" + "github.com/jaegertracing/jaeger/pkg/otelsemconv" +) + +const ( + warningDuplicateSpanID = "duplicate span IDs; skipping clock skew adjustment" + warningMissingParentSpanID = "parent span ID=%s is not in the trace; skipping clock skew adjustment" + warningMaxDeltaExceeded = "max clock skew adjustment delta of %v exceeded; not applying calculated delta of %v" + warningSkewAdjustDisabled = "clock skew adjustment disabled; not applying calculated delta of %v" +) + +// ClockSkew returns an Adjuster that corrects span timestamps for clock skew. +// +// This adjuster modifies the start and log timestamps of child spans that are +// inconsistent with their parent spans due to clock differences between hosts. +// It assumes all spans have unique IDs and should be used after SpanIDUniquifier. +// +// The adjuster determines if two spans belong to the same source by deriving a +// unique string representation of a host based on resource attributes, +// such as `host.id`, `host.ip`, or `host.name`. +// If two spans have the same host key, they are considered to be from +// the same source, and no clock skew adjustment is expected between them. +// +// Parameters: +// - maxDelta: The maximum allowable time adjustment. Adjustments exceeding +// this value will be ignored. +func ClockSkew(maxDelta time.Duration) Adjuster { + return Func(func(traces ptrace.Traces) { + adjuster := &clockSkewAdjuster{ + traces: traces, + maxDelta: maxDelta, + } + adjuster.buildNodesMap() + adjuster.buildSubGraphs() + for _, root := range adjuster.roots { + skew := clockSkew{hostKey: root.hostKey} + adjuster.adjustNode(root, nil, skew) + } + }) +} + +type clockSkewAdjuster struct { + traces ptrace.Traces + maxDelta time.Duration + spans map[pcommon.SpanID]*node + roots map[pcommon.SpanID]*node +} + +type clockSkew struct { + delta time.Duration + hostKey string +} + +type node struct { + span ptrace.Span + children []*node + hostKey string +} + +// hostKey derives a unique string representation of a host based on resource attributes. +// This is used to determine if two spans are from the same host. +func hostKey(resource ptrace.ResourceSpans) string { + if attr, ok := resource.Resource().Attributes().Get(string(otelsemconv.HostIDKey)); ok { + return attr.Str() + } + if attr, ok := resource.Resource().Attributes().Get(string(otelsemconv.HostIPKey)); ok { + if attr.Type() == pcommon.ValueTypeStr { + return attr.Str() + } else if attr.Type() == pcommon.ValueTypeSlice { + ips := attr.Slice() + if ips.Len() > 0 { + return ips.At(0).AsString() + } + } + } + if attr, ok := resource.Resource().Attributes().Get(string(otelsemconv.HostNameKey)); ok { + return attr.Str() + } + return "" +} + +// buildNodesMap creates a mapping of span IDs to their corresponding nodes. +func (a *clockSkewAdjuster) buildNodesMap() { + a.spans = make(map[pcommon.SpanID]*node) + resources := a.traces.ResourceSpans() + for i := 0; i < resources.Len(); i++ { + resource := resources.At(i) + hk := hostKey(resource) + scopes := resource.ScopeSpans() + for j := 0; j < scopes.Len(); j++ { + spans := scopes.At(j).Spans() + for k := 0; k < spans.Len(); k++ { + span := spans.At(k) + if _, exists := a.spans[span.SpanID()]; exists { + jptrace.AddWarning(span, warningDuplicateSpanID) + } else { + a.spans[span.SpanID()] = &node{ + span: span, + hostKey: hk, + } + } + } + } + } +} + +// finds all spans that have no parent, i.e. where parentID is either 0 +// or points to an ID for which there is no span. +func (a *clockSkewAdjuster) buildSubGraphs() { + a.roots = make(map[pcommon.SpanID]*node) + for _, n := range a.spans { + if n.span.ParentSpanID() == pcommon.NewSpanIDEmpty() { + a.roots[n.span.SpanID()] = n + continue + } + if p, ok := a.spans[n.span.ParentSpanID()]; ok { + p.children = append(p.children, n) + } else { + warning := fmt.Sprintf(warningMissingParentSpanID, n.span.ParentSpanID()) + jptrace.AddWarning(n.span, warning) + // treat spans with invalid parent ID as root spans + a.roots[n.span.SpanID()] = n + } + } +} + +func (a *clockSkewAdjuster) adjustNode(n *node, parent *node, skew clockSkew) { + if (n.hostKey != skew.hostKey || n.hostKey == "") && parent != nil { + // Node n is from a different host. The parent has already been adjusted, + // so we can compare this node's timestamps against the parent. + skew = clockSkew{ + hostKey: n.hostKey, + delta: a.calculateSkew(n, parent), + } + } + a.adjustTimestamps(n, skew) + for _, child := range n.children { + a.adjustNode(child, n, skew) + } +} + +func (*clockSkewAdjuster) calculateSkew(child *node, parent *node) time.Duration { + parentStartTime := parent.span.StartTimestamp().AsTime() + childStartTime := child.span.StartTimestamp().AsTime() + parentEndTime := parent.span.EndTimestamp().AsTime() + childEndTime := child.span.EndTimestamp().AsTime() + parentDuration := parentEndTime.Sub(parentStartTime) + childDuration := childEndTime.Sub(childStartTime) + + if childDuration > parentDuration { + // When the child lasted longer than the parent, it was either + // async or the parent may have timed out before child responded. + // The only reasonable adjustment we can do in this case is to make + // sure the child does not start before parent. + if childStartTime.Before(parentStartTime) { + return parentStartTime.Sub(childStartTime) + } + return 0 + } + if !childStartTime.Before(parentStartTime) && !childEndTime.After(parentEndTime) { + // child already fits within the parent span, do not adjust + return 0 + } + // Assume that network latency is equally split between req and res. + latency := (parentDuration - childDuration) / 2 + // Goal: parentStartTime + latency = childStartTime + adjustment + return parentStartTime.Add(latency).Sub(childStartTime) +} + +func (a *clockSkewAdjuster) adjustTimestamps(n *node, skew clockSkew) { + if skew.delta == 0 { + return + } + if absDuration(skew.delta) > a.maxDelta { + if a.maxDelta == 0 { + jptrace.AddWarning(n.span, fmt.Sprintf(warningSkewAdjustDisabled, skew.delta)) + return + } + jptrace.AddWarning(n.span, fmt.Sprintf(warningMaxDeltaExceeded, a.maxDelta, skew.delta)) + return + } + n.span.SetStartTimestamp(pcommon.NewTimestampFromTime(n.span.StartTimestamp().AsTime().Add(skew.delta))) + jptrace.AddWarning(n.span, fmt.Sprintf("This span's timestamps were adjusted by %v", skew.delta)) + for i := 0; i < n.span.Events().Len(); i++ { + event := n.span.Events().At(i) + event.SetTimestamp(pcommon.NewTimestampFromTime(event.Timestamp().AsTime().Add(skew.delta))) + } +} + +func absDuration(d time.Duration) time.Duration { + if d < 0 { + return -1 * d + } + return d +} diff --git a/cmd/query/app/querysvc/adjuster/clockskew_test.go b/cmd/query/app/querysvc/adjuster/clockskew_test.go new file mode 100644 index 00000000000..26c1a8347ce --- /dev/null +++ b/cmd/query/app/querysvc/adjuster/clockskew_test.go @@ -0,0 +1,292 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package adjuster + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/jaegertracing/jaeger/internal/jptrace" +) + +func TestClockSkewAdjuster(t *testing.T) { + type testSpan struct { + id, parent [8]byte + startTime, duration int + events []int // timestamps for logs + host string + adjusted int // start time after adjustment + adjustedEvents []int // adjusted log timestamps + } + + toTime := func(t int) time.Time { + return time.Unix(0, (time.Duration(t) * time.Millisecond).Nanoseconds()) + } + + // helper function that constructs a trace from a list of span prototypes + makeTrace := func(spanPrototypes []testSpan) ptrace.Traces { + trace := ptrace.NewTraces() + for _, spanProto := range spanPrototypes { + traceID := pcommon.TraceID([16]byte{0, 0, 0, 0, 0, 0, 0, 1}) + span := ptrace.NewSpan() + span.SetTraceID(traceID) + span.SetSpanID(spanProto.id) + span.SetParentSpanID(spanProto.parent) + span.SetStartTimestamp(pcommon.NewTimestampFromTime(toTime(spanProto.startTime))) + span.SetEndTimestamp(pcommon.NewTimestampFromTime(toTime(spanProto.startTime + spanProto.duration))) + + events := ptrace.NewSpanEventSlice() + for _, log := range spanProto.events { + event := events.AppendEmpty() + event.SetTimestamp(pcommon.NewTimestampFromTime(toTime(log))) + event.Attributes().PutStr("event", "some event") + } + events.CopyTo(span.Events()) + + resource := ptrace.NewResourceSpans() + resource.Resource().Attributes().PutEmptySlice("host.ip").AppendEmpty().SetStr(spanProto.host) + + span.CopyTo(resource.ScopeSpans().AppendEmpty().Spans().AppendEmpty()) + resource.CopyTo(trace.ResourceSpans().AppendEmpty()) + } + return trace + } + + testCases := []struct { + description string + trace []testSpan + err string + maxAdjust time.Duration + }{ + { + description: "single span with bad parent", + trace: []testSpan{ + {id: [8]byte{1}, parent: [8]byte{0, 0, 0, 0, 0, 0, 0, 99}, startTime: 0, duration: 100, host: "a", adjusted: 0}, + }, + err: "parent span ID=0000000000000063 is not in the trace; skipping clock skew adjustment", // 99 == 0x63 + }, + { + description: "single span with empty host key", + trace: []testSpan{ + {id: [8]byte{1}, parent: [8]byte{0}, startTime: 0, duration: 100, adjusted: 0}, + }, + }, + { + description: "two spans with the same ID", + trace: []testSpan{ + {id: [8]byte{1}, parent: [8]byte{0}, startTime: 0, duration: 100, host: "a", adjusted: 0}, + {id: [8]byte{1}, parent: [8]byte{0}, startTime: 0, duration: 100, host: "a", adjusted: 0}, + }, + err: "duplicate span IDs; skipping clock skew adjustment", + }, + { + description: "parent-child on the same host", + trace: []testSpan{ + {id: [8]byte{1}, parent: [8]byte{0}, startTime: 0, duration: 100, host: "a", adjusted: 0}, + {id: [8]byte{2}, parent: [8]byte{1}, startTime: 10, duration: 50, host: "a", adjusted: 10}, + }, + }, + { + description: "do not adjust parent-child on the same host", + trace: []testSpan{ + {id: [8]byte{1}, parent: [8]byte{0}, startTime: 10, duration: 100, host: "a", adjusted: 10}, + {id: [8]byte{2}, parent: [8]byte{1}, startTime: 0, duration: 50, host: "a", adjusted: 0}, + }, + }, + { + description: "do not adjust child that fits inside parent", + trace: []testSpan{ + {id: [8]byte{1}, parent: [8]byte{0}, startTime: 10, duration: 100, host: "a", adjusted: 10}, + {id: [8]byte{2}, parent: [8]byte{1}, startTime: 20, duration: 50, host: "b", adjusted: 20}, + }, + }, + { + description: "do not adjust child that is longer than parent", + trace: []testSpan{ + {id: [8]byte{1}, parent: [8]byte{0}, startTime: 10, duration: 100, host: "a", adjusted: 10}, + {id: [8]byte{2}, parent: [8]byte{1}, startTime: 20, duration: 150, host: "b", adjusted: 20}, + }, + }, + { + description: "do not apply positive adjustment due to max skew adjustment", + trace: []testSpan{ + {id: [8]byte{1}, parent: [8]byte{0}, startTime: 10, duration: 100, host: "a", adjusted: 10}, + {id: [8]byte{2}, parent: [8]byte{1}, startTime: 0, duration: 50, host: "b", adjusted: 0}, + }, + maxAdjust: 10 * time.Millisecond, + err: "max clock skew adjustment delta of 10ms exceeded; not applying calculated delta of 35ms", + }, + { + description: "do not apply negative adjustment due to max skew adjustment", + trace: []testSpan{ + {id: [8]byte{1}, parent: [8]byte{0}, startTime: 10, duration: 100, host: "a", adjusted: 10}, + {id: [8]byte{2}, parent: [8]byte{1}, startTime: 80, duration: 50, host: "b", adjusted: 80}, + }, + maxAdjust: 10 * time.Millisecond, + err: "max clock skew adjustment delta of 10ms exceeded; not applying calculated delta of -45ms", + }, + { + description: "do not apply adjustment due to disabled adjustment", + trace: []testSpan{ + {id: [8]byte{1}, parent: [8]byte{0}, startTime: 10, duration: 100, host: "a", adjusted: 10}, + {id: [8]byte{2}, parent: [8]byte{1}, startTime: 0, duration: 50, host: "b", adjusted: 0}, + }, + err: "clock skew adjustment disabled; not applying calculated delta of 35ms", + }, + { + description: "adjust child starting before parent", + trace: []testSpan{ + {id: [8]byte{1}, parent: [8]byte{0}, startTime: 10, duration: 100, host: "a", adjusted: 10}, + // latency = (100-50) / 2 = 25 + // delta = (10 - 0) + latency = 35 + { + id: [8]byte{2}, parent: [8]byte{1}, startTime: 0, duration: 50, host: "b", adjusted: 35, + events: []int{5, 10}, adjustedEvents: []int{40, 45}, + }, + }, + maxAdjust: time.Second, + }, + { + description: "adjust child starting before parent even if it is longer", + trace: []testSpan{ + {id: [8]byte{1}, parent: [8]byte{0}, startTime: 10, duration: 100, host: "a", adjusted: 10}, + {id: [8]byte{2}, parent: [8]byte{1}, startTime: 0, duration: 150, host: "b", adjusted: 10}, + }, + maxAdjust: time.Second, + }, + { + description: "adjust child ending after parent but being shorter", + trace: []testSpan{ + {id: [8]byte{1}, parent: [8]byte{0}, startTime: 10, duration: 100, host: "a", adjusted: 10}, + // latency: (100 - 70) / 2 = 15 + // new child start time: 10 + latency = 25, delta = -25 + {id: [8]byte{2}, parent: [8]byte{1}, startTime: 50, duration: 70, host: "b", adjusted: 25}, + // same host 'b', so same delta = -25 + // new start time: 60 + delta = 35 + { + id: [8]byte{3}, parent: [8]byte{2}, startTime: 60, duration: 20, host: "b", adjusted: 35, + events: []int{65, 70}, adjustedEvents: []int{40, 45}, + }, + }, + maxAdjust: time.Second, + }, + } + + for _, tt := range testCases { + testCase := tt // capture loop var + t.Run(testCase.description, func(t *testing.T) { + trace := makeTrace(testCase.trace) + adjuster := ClockSkew(tt.maxAdjust) + adjuster.Adjust(trace) + + var gotErr string + for i, proto := range testCase.trace { + id := proto.id + span := trace.ResourceSpans().At(i).ScopeSpans().At(0).Spans().At(0) + require.EqualValues(t, proto.id, span.SpanID(), "expecting span with span ID = %d", id) + + warnings := jptrace.GetWarnings(span) + if testCase.err == "" { + if proto.adjusted == proto.startTime { + assert.Empty(t, warnings, "no warnings in span %s", span.SpanID) + } else { + assert.Len(t, warnings, 1, "warning about adjutment added to span %s", span.SpanID) + } + } else { + if len(warnings) > 0 { + gotErr = warnings[0] + } + } + + // compare values as int because assert.Equal prints uint64 as hex + assert.Equal( + t, toTime(proto.adjusted).UTC(), span.StartTimestamp().AsTime(), + "adjusted start time of span[ID = %d]", id) + for i, logTs := range proto.adjustedEvents { + assert.Equal( + t, toTime(logTs).UTC(), span.Events().At(i).Timestamp().AsTime(), + "adjusted log timestamp of span[ID = %d], log[%d]", id, i) + } + } + assert.Equal(t, testCase.err, gotErr) + }) + } +} + +func TestHostKey(t *testing.T) { + tests := []struct { + name string + resource ptrace.ResourceSpans + expected string + }{ + { + name: "host.id attribute", + resource: func() ptrace.ResourceSpans { + rs := ptrace.NewResourceSpans() + rs.Resource().Attributes().PutStr("host.id", "host-123") + return rs + }(), + expected: "host-123", + }, + { + name: "string host.ip attribute", + resource: func() ptrace.ResourceSpans { + rs := ptrace.NewResourceSpans() + rs.Resource().Attributes().PutStr("host.ip", "192.168.1.1") + return rs + }(), + expected: "192.168.1.1", + }, + { + name: "slice host.ip attribute", + resource: func() ptrace.ResourceSpans { + rs := ptrace.NewResourceSpans() + addresses := rs.Resource().Attributes().PutEmptySlice("host.ip") + addresses.AppendEmpty().SetStr("192.168.1.1") + addresses.AppendEmpty().SetStr("192.168.1.2") + return rs + }(), + expected: "192.168.1.1", + }, + { + name: "empty host.ip attribute slice", + resource: func() ptrace.ResourceSpans { + rs := ptrace.NewResourceSpans() + rs.Resource().Attributes().PutEmptySlice("host.ip") + return rs + }(), + expected: "", + }, + { + name: "host.name attribute", + resource: func() ptrace.ResourceSpans { + rs := ptrace.NewResourceSpans() + rs.Resource().Attributes().PutStr("host.name", "hostname") + return rs + }(), + expected: "hostname", + }, + { + name: "no relevant attributes", + resource: func() ptrace.ResourceSpans { + rs := ptrace.NewResourceSpans() + rs.Resource().Attributes().PutStr("service.name", "service-123") + return rs + }(), + expected: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := hostKey(tt.resource) + assert.Equal(t, tt.expected, result) + }) + } +} diff --git a/pkg/otelsemconv/semconv.go b/pkg/otelsemconv/semconv.go index 9acb3b88e2c..5427dbfcc26 100644 --- a/pkg/otelsemconv/semconv.go +++ b/pkg/otelsemconv/semconv.go @@ -24,6 +24,10 @@ const ( DBSystemKey = semconv.DBSystemKey PeerServiceKey = semconv.PeerServiceKey HTTPResponseStatusCodeKey = semconv.HTTPResponseStatusCodeKey + + HostIDKey = semconv.HostIDKey + HostIPKey = semconv.HostIPKey + HostNameKey = semconv.HostNameKey ) var HTTPResponseStatusCode = semconv.HTTPResponseStatusCode From 6bec1ad06b58b394ad4f621d087fd5f71ca08a2b Mon Sep 17 00:00:00 2001 From: Mend Renovate Date: Mon, 23 Dec 2024 14:55:08 +0100 Subject: [PATCH 2/6] fix(deps): update module google.golang.org/protobuf to v1.36.1 (#6395) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR contains the following updates: | Package | Change | Age | Adoption | Passing | Confidence | |---|---|---|---|---|---| | [google.golang.org/protobuf](https://redirect.github.com/protocolbuffers/protobuf-go) | `v1.36.0` -> `v1.36.1` | [![age](https://developer.mend.io/api/mc/badges/age/go/google.golang.org%2fprotobuf/v1.36.1?slim=true)](https://docs.renovatebot.com/merge-confidence/) | [![adoption](https://developer.mend.io/api/mc/badges/adoption/go/google.golang.org%2fprotobuf/v1.36.1?slim=true)](https://docs.renovatebot.com/merge-confidence/) | [![passing](https://developer.mend.io/api/mc/badges/compatibility/go/google.golang.org%2fprotobuf/v1.36.0/v1.36.1?slim=true)](https://docs.renovatebot.com/merge-confidence/) | [![confidence](https://developer.mend.io/api/mc/badges/confidence/go/google.golang.org%2fprotobuf/v1.36.0/v1.36.1?slim=true)](https://docs.renovatebot.com/merge-confidence/) | --- ### Release Notes
protocolbuffers/protobuf-go (google.golang.org/protobuf) ### [`v1.36.1`](https://redirect.github.com/protocolbuffers/protobuf-go/releases/tag/v1.36.1) [Compare Source](https://redirect.github.com/protocolbuffers/protobuf-go/compare/v1.36.0...v1.36.1) **Full Changelog**: https://github.com/protocolbuffers/protobuf-go/compare/v1.36.0...v1.36.1 Bug fixes: [CL/638495](https://go-review.googlesource.com/c/protobuf/+/638495): internal/impl: revert IsSynthetic() check to fix panic Maintenance: [CL/637475](https://go-review.googlesource.com/c/protobuf/+/637475): internal/errors: delete compatibility code for Go before 1.13
--- ### Configuration 📅 **Schedule**: Branch creation - At any time (no schedule defined), Automerge - At any time (no schedule defined). 🚦 **Automerge**: Disabled by config. Please merge this manually once you are satisfied. ♻ **Rebasing**: Whenever PR becomes conflicted, or you tick the rebase/retry checkbox. 🔕 **Ignore**: Close this PR and you won't be reminded about this update again. --- - [ ] If you want to rebase/retry this PR, check this box --- This PR was generated by [Mend Renovate](https://mend.io/renovate/). View the [repository job log](https://developer.mend.io/github/jaegertracing/jaeger). Signed-off-by: Mend Renovate --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 762d0f365f4..a7eafab345e 100644 --- a/go.mod +++ b/go.mod @@ -98,7 +98,7 @@ require ( golang.org/x/net v0.33.0 golang.org/x/sys v0.28.0 google.golang.org/grpc v1.69.2 - google.golang.org/protobuf v1.36.0 + google.golang.org/protobuf v1.36.1 gopkg.in/yaml.v3 v3.0.1 ) diff --git a/go.sum b/go.sum index 5d8e67a5e7a..d694e4003f1 100644 --- a/go.sum +++ b/go.sum @@ -964,8 +964,8 @@ google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= -google.golang.org/protobuf v1.36.0 h1:mjIs9gYtt56AzC4ZaffQuh88TZurBGhIJMBZGSxNerQ= -google.golang.org/protobuf v1.36.0/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk= +google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= From 4ecb0860ce7ae901f8d77261b4022e5030288073 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab <43658574+mahadzaryab1@users.noreply.github.com> Date: Mon, 23 Dec 2024 12:00:46 -0500 Subject: [PATCH 3/6] [v2][adjuster] Implement otlp to model translator with post processing (#6394) ## Which problem is this PR solving? - Towards #6344 ## Description of the changes - Implemented a translator in `jptrace` with a function `ProtoFromTraces` that is a wrapper of the upstream `ProtoFromTraces` in opentelemetry-collector-contrib jaeger translator. This function appends the warnings in `jaeger.internal.warnings` to the corresponding warnings field in the proto model. ## How was this change tested? - Added unit tests ## Checklist - [x] I have read https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md - [x] I have signed all commits - [x] I have added unit tests for the new functionality - [x] I have run lint and test steps successfully - for `jaeger`: `make lint test` - for `jaeger-ui`: `npm run lint` and `npm run test` --------- Signed-off-by: Mahad Zaryab --- .golangci.yml | 8 +++ cmd/collector/app/handler/otlp_receiver.go | 4 +- .../processors/adaptivesampling/processor.go | 4 +- cmd/query/app/otlp_translator.go | 4 +- internal/jptrace/translator.go | 58 +++++++++++++++++++ internal/jptrace/translator_test.go | 55 ++++++++++++++++++ model/ids.go | 16 +++++ model/ids_test.go | 52 +++++++++++++++++ storage_v2/v1adapter/writer.go | 4 +- 9 files changed, 197 insertions(+), 8 deletions(-) create mode 100644 internal/jptrace/translator.go create mode 100644 internal/jptrace/translator_test.go diff --git a/.golangci.yml b/.golangci.yml index 5427041391e..2cca15f1d14 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -139,6 +139,14 @@ linters-settings: files: - "**_test.go" + # TODO: enable after import is not used anywhere + # disallow-otel-contrib-translator: + # deny: + # - pkg: github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger + # desc: "Use jptrace package instead of opentelemetry-collector-contrib/pkg/translator/jaeger" + # files: + # - "!**/jptrace/**" + goimports: local-prefixes: github.com/jaegertracing/jaeger gosec: diff --git a/cmd/collector/app/handler/otlp_receiver.go b/cmd/collector/app/handler/otlp_receiver.go index ba450e4a298..ebde8d2ba38 100644 --- a/cmd/collector/app/handler/otlp_receiver.go +++ b/cmd/collector/app/handler/otlp_receiver.go @@ -7,7 +7,6 @@ import ( "context" "fmt" - otlp2jaeger "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/config/confignet" @@ -23,6 +22,7 @@ import ( "github.com/jaegertracing/jaeger/cmd/collector/app/flags" "github.com/jaegertracing/jaeger/cmd/collector/app/processor" + "github.com/jaegertracing/jaeger/internal/jptrace" "github.com/jaegertracing/jaeger/pkg/tenancy" ) @@ -108,7 +108,7 @@ type consumerDelegate struct { } func (c *consumerDelegate) consume(ctx context.Context, td ptrace.Traces) error { - batches := otlp2jaeger.ProtoFromTraces(td) + batches := jptrace.ProtoFromTraces(td) for _, batch := range batches { err := c.batchConsumer.consume(ctx, batch) if err != nil { diff --git a/cmd/jaeger/internal/processors/adaptivesampling/processor.go b/cmd/jaeger/internal/processors/adaptivesampling/processor.go index 4d14f7f99c4..7a0ca2a0641 100644 --- a/cmd/jaeger/internal/processors/adaptivesampling/processor.go +++ b/cmd/jaeger/internal/processors/adaptivesampling/processor.go @@ -7,12 +7,12 @@ import ( "context" "fmt" - otlp2jaeger "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/ptrace" "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/samplingstrategy" "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/remotesampling" + "github.com/jaegertracing/jaeger/internal/jptrace" "github.com/jaegertracing/jaeger/internal/metrics/otelmetrics" "github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/adaptive" ) @@ -65,7 +65,7 @@ func (tp *traceProcessor) close(context.Context) error { } func (tp *traceProcessor) processTraces(_ context.Context, td ptrace.Traces) (ptrace.Traces, error) { - batches := otlp2jaeger.ProtoFromTraces(td) + batches := jptrace.ProtoFromTraces(td) for _, batch := range batches { for _, span := range batch.Spans { if span.Process == nil { diff --git a/cmd/query/app/otlp_translator.go b/cmd/query/app/otlp_translator.go index 06f7c63e440..daaaabf2aeb 100644 --- a/cmd/query/app/otlp_translator.go +++ b/cmd/query/app/otlp_translator.go @@ -6,9 +6,9 @@ package app import ( "fmt" - model2otel "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" "go.opentelemetry.io/collector/pdata/ptrace" + "github.com/jaegertracing/jaeger/internal/jptrace" "github.com/jaegertracing/jaeger/model" ) @@ -18,7 +18,7 @@ func otlp2traces(otlpSpans []byte) ([]*model.Trace, error) { if err != nil { return nil, fmt.Errorf("cannot unmarshal OTLP : %w", err) } - jaegerBatches := model2otel.ProtoFromTraces(otlpTraces) + jaegerBatches := jptrace.ProtoFromTraces(otlpTraces) var traces []*model.Trace traceMap := make(map[model.TraceID]*model.Trace) for _, batch := range jaegerBatches { diff --git a/internal/jptrace/translator.go b/internal/jptrace/translator.go new file mode 100644 index 00000000000..7480b2b5075 --- /dev/null +++ b/internal/jptrace/translator.go @@ -0,0 +1,58 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package jptrace + +import ( + jaegerTranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/jaegertracing/jaeger/model" +) + +// ProtoFromTraces converts OpenTelemetry traces (ptrace.Traces) +// to Jaeger model batches ([]*model.Batch). +func ProtoFromTraces(traces ptrace.Traces) []*model.Batch { + batches := jaegerTranslator.ProtoFromTraces(traces) + spanMap := createSpanMapFromBatches(batches) + transferWarnings(traces, spanMap) + return batches +} + +func createSpanMapFromBatches(batches []*model.Batch) map[model.SpanID]*model.Span { + spanMap := make(map[model.SpanID]*model.Span) + for _, batch := range batches { + for _, span := range batch.Spans { + spanMap[span.SpanID] = span + } + } + return spanMap +} + +func transferWarnings(traces ptrace.Traces, spanMap map[model.SpanID]*model.Span) { + resources := traces.ResourceSpans() + for i := 0; i < resources.Len(); i++ { + scopes := resources.At(i).ScopeSpans() + for j := 0; j < scopes.Len(); j++ { + spans := scopes.At(j).Spans() + for k := 0; k < spans.Len(); k++ { + otelSpan := spans.At(k) + warnings := GetWarnings(otelSpan) + span := spanMap[model.SpanIDFromOTEL(otelSpan.SpanID())] + span.Warnings = append(span.Warnings, warnings...) + // filter out the warning tag + span.Tags = filterTags(span.Tags, warningsAttribute) + } + } + } +} + +func filterTags(tags []model.KeyValue, keyToRemove string) []model.KeyValue { + var filteredTags []model.KeyValue + for _, tag := range tags { + if tag.Key != keyToRemove { + filteredTags = append(filteredTags, tag) + } + } + return filteredTags +} diff --git a/internal/jptrace/translator_test.go b/internal/jptrace/translator_test.go new file mode 100644 index 00000000000..5ff3129a00f --- /dev/null +++ b/internal/jptrace/translator_test.go @@ -0,0 +1,55 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package jptrace + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/jaegertracing/jaeger/model" +) + +func TestProtoFromTraces_AddsWarnings(t *testing.T) { + traces := ptrace.NewTraces() + rs1 := traces.ResourceSpans().AppendEmpty() + ss1 := rs1.ScopeSpans().AppendEmpty() + span1 := ss1.Spans().AppendEmpty() + span1.SetName("test-span-1") + span1.SetSpanID(pcommon.SpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8})) + AddWarning(span1, "test-warning-1") + AddWarning(span1, "test-warning-2") + span1.Attributes().PutStr("key", "value") + + ss2 := rs1.ScopeSpans().AppendEmpty() + span2 := ss2.Spans().AppendEmpty() + span2.SetName("test-span-2") + span2.SetSpanID(pcommon.SpanID([8]byte{9, 10, 11, 12, 13, 14, 15, 16})) + + rs2 := traces.ResourceSpans().AppendEmpty() + ss3 := rs2.ScopeSpans().AppendEmpty() + span3 := ss3.Spans().AppendEmpty() + span3.SetName("test-span-3") + span3.SetSpanID(pcommon.SpanID([8]byte{17, 18, 19, 20, 21, 22, 23, 24})) + AddWarning(span3, "test-warning-3") + + batches := ProtoFromTraces(traces) + + assert.Len(t, batches, 2) + + assert.Len(t, batches[0].Spans, 2) + assert.Equal(t, "test-span-1", batches[0].Spans[0].OperationName) + assert.Equal(t, []string{"test-warning-1", "test-warning-2"}, batches[0].Spans[0].Warnings) + assert.Equal(t, []model.KeyValue{{Key: "key", VStr: "value"}}, batches[0].Spans[0].Tags) + assert.Equal(t, "test-span-2", batches[0].Spans[1].OperationName) + assert.Empty(t, batches[0].Spans[1].Warnings) + assert.Empty(t, batches[0].Spans[1].Tags) + + assert.Len(t, batches[1].Spans, 1) + assert.Equal(t, "test-span-3", batches[1].Spans[0].OperationName) + assert.Equal(t, []string{"test-warning-3"}, batches[1].Spans[0].Warnings) + assert.Empty(t, batches[1].Spans[0].Tags) +} diff --git a/model/ids.go b/model/ids.go index 5bf93dc43be..4c7b77c6aa9 100644 --- a/model/ids.go +++ b/model/ids.go @@ -261,3 +261,19 @@ func (s *SpanID) UnmarshalJSON(data []byte) error { func (s *SpanID) UnmarshalJSONPB(_ *jsonpb.Unmarshaler, b []byte) error { return s.UnmarshalJSON(b) } + +// ToOTELSpanID converts the SpanID to OTEL's representation of a span identitfier. +// This was taken from +// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/internal/coreinternal/idutils/big_endian_converter.go. +func (s SpanID) ToOTELSpanID() pcommon.SpanID { + spanID := [8]byte{} + binary.BigEndian.PutUint64(spanID[:], uint64(s)) + return pcommon.SpanID(spanID) +} + +// ToOTELSpanID converts OTEL's SpanID to the model representation of a span identitfier. +// This was taken from +// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/internal/coreinternal/idutils/big_endian_converter.go. +func SpanIDFromOTEL(spanID pcommon.SpanID) SpanID { + return SpanID(binary.BigEndian.Uint64(spanID[:])) +} diff --git a/model/ids_test.go b/model/ids_test.go index 40c6f801a5c..ef2a9a65beb 100644 --- a/model/ids_test.go +++ b/model/ids_test.go @@ -137,3 +137,55 @@ func TestTraceIDFromOTEL(t *testing.T) { } require.Equal(t, expected, model.TraceIDFromOTEL(otelTraceID)) } + +func TestToOTELSpanID(t *testing.T) { + tests := []struct { + name string + spanID model.SpanID + expected pcommon.SpanID + }{ + { + name: "zero span ID", + spanID: model.NewSpanID(0), + expected: pcommon.NewSpanIDEmpty(), + }, + { + name: "non-zero span ID", + spanID: model.NewSpanID(1), + expected: pcommon.SpanID([8]byte{0, 0, 0, 0, 0, 0, 0, 1}), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + actual := test.spanID.ToOTELSpanID() + assert.Equal(t, test.expected, actual) + }) + } +} + +func TestSpanIDFromOTEL(t *testing.T) { + tests := []struct { + name string + otelSpanID pcommon.SpanID + expected model.SpanID + }{ + { + name: "zero span ID", + otelSpanID: pcommon.NewSpanIDEmpty(), + expected: model.NewSpanID(0), + }, + { + name: "non-zero span ID", + otelSpanID: pcommon.SpanID([8]byte{0, 0, 0, 0, 0, 0, 0, 1}), + expected: model.NewSpanID(1), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + actual := model.SpanIDFromOTEL(test.otelSpanID) + assert.Equal(t, test.expected, actual) + }) + } +} diff --git a/storage_v2/v1adapter/writer.go b/storage_v2/v1adapter/writer.go index c0a9d4ced99..b16b54ed43e 100644 --- a/storage_v2/v1adapter/writer.go +++ b/storage_v2/v1adapter/writer.go @@ -7,9 +7,9 @@ import ( "context" "errors" - otlp2jaeger "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" "go.opentelemetry.io/collector/pdata/ptrace" + "github.com/jaegertracing/jaeger/internal/jptrace" "github.com/jaegertracing/jaeger/storage/spanstore" "github.com/jaegertracing/jaeger/storage_v2/tracestore" ) @@ -26,7 +26,7 @@ func NewTraceWriter(spanWriter spanstore.Writer) tracestore.Writer { // WriteTraces implements tracestore.Writer. func (t *TraceWriter) WriteTraces(ctx context.Context, td ptrace.Traces) error { - batches := otlp2jaeger.ProtoFromTraces(td) + batches := jptrace.ProtoFromTraces(td) var errs []error for _, batch := range batches { for _, span := range batch.Spans { From c8a15483b8894a0fb13013f9622f8d2c2592c415 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab <43658574+mahadzaryab1@users.noreply.github.com> Date: Mon, 23 Dec 2024 16:02:11 -0500 Subject: [PATCH 4/6] [v2][adjuster] Implement function to get standard adjusters to operate on otlp format (#6396) ## Which problem is this PR solving? - Towards #6344 ## Description of the changes - Implemented a function `StandardAdjusters` that returns a list of adjusters to be applied on ptrace.Traces - This will be used by the v2 query service in https://github.com/jaegertracing/jaeger/pull/6343 ## Checklist - [x] I have read https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md - [x] I have signed all commits - [x] I have added unit tests for the new functionality - [x] I have run lint and test steps successfully - for `jaeger`: `make lint test` - for `jaeger-ui`: `npm run lint` and `npm run test` --------- Signed-off-by: Mahad Zaryab --- cmd/query/app/querysvc/adjuster/clockskew.go | 4 +-- .../app/querysvc/adjuster/clockskew_test.go | 2 +- cmd/query/app/querysvc/adjuster/hash.go | 6 ++--- cmd/query/app/querysvc/adjuster/hash_test.go | 10 ++++---- .../app/querysvc/adjuster/ipattribute.go | 4 +-- .../app/querysvc/adjuster/ipattribute_test.go | 2 +- ...urceattributes.go => libraryattributes.go} | 4 +-- ...utes_test.go => libraryattributes_test.go} | 8 +++--- cmd/query/app/querysvc/adjuster/sort.go | 4 +-- cmd/query/app/querysvc/adjuster/sort_test.go | 2 +- .../app/querysvc/adjuster/spaniduniquifier.go | 4 +-- .../adjuster/spaniduniquifier_test.go | 4 +-- cmd/query/app/querysvc/adjuster/spanlinks.go | 4 +-- .../app/querysvc/adjuster/spanlinks_test.go | 2 +- cmd/query/app/querysvc/adjuster/standard.go | 23 +++++++++++++++++ .../app/querysvc/adjuster/standard_test.go | 25 +++++++++++++++++++ 16 files changed, 78 insertions(+), 30 deletions(-) rename cmd/query/app/querysvc/adjuster/{resourceattributes.go => libraryattributes.go} (93%) rename cmd/query/app/querysvc/adjuster/{resourceattributes_test.go => libraryattributes_test.go} (97%) create mode 100644 cmd/query/app/querysvc/adjuster/standard.go create mode 100644 cmd/query/app/querysvc/adjuster/standard_test.go diff --git a/cmd/query/app/querysvc/adjuster/clockskew.go b/cmd/query/app/querysvc/adjuster/clockskew.go index 0f0bbc40e63..70e00fd1238 100644 --- a/cmd/query/app/querysvc/adjuster/clockskew.go +++ b/cmd/query/app/querysvc/adjuster/clockskew.go @@ -21,7 +21,7 @@ const ( warningSkewAdjustDisabled = "clock skew adjustment disabled; not applying calculated delta of %v" ) -// ClockSkew returns an Adjuster that corrects span timestamps for clock skew. +// CorrectClockSkew returns an Adjuster that corrects span timestamps for clock skew. // // This adjuster modifies the start and log timestamps of child spans that are // inconsistent with their parent spans due to clock differences between hosts. @@ -36,7 +36,7 @@ const ( // Parameters: // - maxDelta: The maximum allowable time adjustment. Adjustments exceeding // this value will be ignored. -func ClockSkew(maxDelta time.Duration) Adjuster { +func CorrectClockSkew(maxDelta time.Duration) Adjuster { return Func(func(traces ptrace.Traces) { adjuster := &clockSkewAdjuster{ traces: traces, diff --git a/cmd/query/app/querysvc/adjuster/clockskew_test.go b/cmd/query/app/querysvc/adjuster/clockskew_test.go index 26c1a8347ce..cfa94e0c292 100644 --- a/cmd/query/app/querysvc/adjuster/clockskew_test.go +++ b/cmd/query/app/querysvc/adjuster/clockskew_test.go @@ -182,7 +182,7 @@ func TestClockSkewAdjuster(t *testing.T) { testCase := tt // capture loop var t.Run(testCase.description, func(t *testing.T) { trace := makeTrace(testCase.trace) - adjuster := ClockSkew(tt.maxAdjust) + adjuster := CorrectClockSkew(tt.maxAdjust) adjuster.Adjust(trace) var gotErr string diff --git a/cmd/query/app/querysvc/adjuster/hash.go b/cmd/query/app/querysvc/adjuster/hash.go index 25afbbc1621..55d6347ed9b 100644 --- a/cmd/query/app/querysvc/adjuster/hash.go +++ b/cmd/query/app/querysvc/adjuster/hash.go @@ -14,7 +14,7 @@ import ( var _ Adjuster = (*SpanHashDeduper)(nil) -// SpanHash creates an adjuster that deduplicates spans by removing all but one span +// DeduplicateSpans creates an adjuster that deduplicates spans by removing all but one span // with the same hash code. This is particularly useful for scenarios where spans // may be duplicated during archival, such as with ElasticSearch archival. // @@ -23,8 +23,8 @@ var _ Adjuster = (*SpanHashDeduper)(nil) // // To ensure consistent hash codes, this adjuster should be executed after // SortAttributesAndEvents, which normalizes the order of collections within the span. -func SpanHash() SpanHashDeduper { - return SpanHashDeduper{ +func DeduplicateSpans() *SpanHashDeduper { + return &SpanHashDeduper{ marshaler: &ptrace.ProtoMarshaler{}, } } diff --git a/cmd/query/app/querysvc/adjuster/hash_test.go b/cmd/query/app/querysvc/adjuster/hash_test.go index dd90d7ab1a3..d5f97795418 100644 --- a/cmd/query/app/querysvc/adjuster/hash_test.go +++ b/cmd/query/app/querysvc/adjuster/hash_test.go @@ -13,7 +13,7 @@ import ( ) func TestSpanHash_EmptySpans(t *testing.T) { - adjuster := SpanHash() + adjuster := DeduplicateSpans() input := ptrace.NewTraces() expected := ptrace.NewTraces() adjuster.Adjust(input) @@ -21,7 +21,7 @@ func TestSpanHash_EmptySpans(t *testing.T) { } func TestSpanHash_RemovesDuplicateSpans(t *testing.T) { - adjuster := SpanHash() + adjuster := DeduplicateSpans() input := func() ptrace.Traces { traces := ptrace.NewTraces() @@ -126,7 +126,7 @@ func TestSpanHash_RemovesDuplicateSpans(t *testing.T) { } func TestSpanHash_NoDuplicateSpans(t *testing.T) { - adjuster := SpanHash() + adjuster := DeduplicateSpans() input := func() ptrace.Traces { traces := ptrace.NewTraces() rs := traces.ResourceSpans().AppendEmpty() @@ -180,7 +180,7 @@ func TestSpanHash_NoDuplicateSpans(t *testing.T) { } func TestSpanHash_DuplicateSpansDifferentScopeAttributes(t *testing.T) { - adjuster := SpanHash() + adjuster := DeduplicateSpans() input := func() ptrace.Traces { traces := ptrace.NewTraces() rs := traces.ResourceSpans().AppendEmpty() @@ -234,7 +234,7 @@ func TestSpanHash_DuplicateSpansDifferentScopeAttributes(t *testing.T) { } func TestSpanHash_DuplicateSpansDifferentResourceAttributes(t *testing.T) { - adjuster := SpanHash() + adjuster := DeduplicateSpans() input := func() ptrace.Traces { traces := ptrace.NewTraces() rs := traces.ResourceSpans().AppendEmpty() diff --git a/cmd/query/app/querysvc/adjuster/ipattribute.go b/cmd/query/app/querysvc/adjuster/ipattribute.go index a00f93b3254..89c361a0cc3 100644 --- a/cmd/query/app/querysvc/adjuster/ipattribute.go +++ b/cmd/query/app/querysvc/adjuster/ipattribute.go @@ -19,10 +19,10 @@ var ipAttributesToCorrect = map[string]struct{}{ "peer.ipv4": {}, } -// IPAttribute returns an adjuster that replaces numeric "ip" attributes, +// NormalizeIPAttributes returns an adjuster that replaces numeric "ip" attributes, // which usually contain IPv4 packed into uint32, with their string // representation (e.g. "8.8.8.8""). -func IPAttribute() IPAttributeAdjuster { +func NormalizeIPAttributes() IPAttributeAdjuster { return IPAttributeAdjuster{} } diff --git a/cmd/query/app/querysvc/adjuster/ipattribute_test.go b/cmd/query/app/querysvc/adjuster/ipattribute_test.go index d97f1bf35ed..4e5d0917230 100644 --- a/cmd/query/app/querysvc/adjuster/ipattribute_test.go +++ b/cmd/query/app/querysvc/adjuster/ipattribute_test.go @@ -58,7 +58,7 @@ func TestIPAttributeAdjuster(t *testing.T) { } } - IPAttribute().Adjust(traces) + NormalizeIPAttributes().Adjust(traces) resourceSpan := traces.ResourceSpans().At(0) assert.Equal(t, 3, resourceSpan.Resource().Attributes().Len()) diff --git a/cmd/query/app/querysvc/adjuster/resourceattributes.go b/cmd/query/app/querysvc/adjuster/libraryattributes.go similarity index 93% rename from cmd/query/app/querysvc/adjuster/resourceattributes.go rename to cmd/query/app/querysvc/adjuster/libraryattributes.go index 6bedd4ff4e9..d2d0bf64dfd 100644 --- a/cmd/query/app/querysvc/adjuster/resourceattributes.go +++ b/cmd/query/app/querysvc/adjuster/libraryattributes.go @@ -21,11 +21,11 @@ var libraryKeys = map[string]struct{}{ string(otelsemconv.TelemetryDistroVersionKey): {}, } -// ResourceAttributes creates an adjuster that moves the OpenTelemetry library +// MoveLibraryAttributes creates an adjuster that moves the OpenTelemetry library // attributes from spans to the parent resource so that the UI can // display them separately under Process. // https://github.com/jaegertracing/jaeger/issues/4534 -func ResourceAttributes() ResourceAttributesAdjuster { +func MoveLibraryAttributes() ResourceAttributesAdjuster { return ResourceAttributesAdjuster{} } diff --git a/cmd/query/app/querysvc/adjuster/resourceattributes_test.go b/cmd/query/app/querysvc/adjuster/libraryattributes_test.go similarity index 97% rename from cmd/query/app/querysvc/adjuster/resourceattributes_test.go rename to cmd/query/app/querysvc/adjuster/libraryattributes_test.go index 591820db4a2..ebcf8801080 100644 --- a/cmd/query/app/querysvc/adjuster/resourceattributes_test.go +++ b/cmd/query/app/querysvc/adjuster/libraryattributes_test.go @@ -25,7 +25,7 @@ func TestResourceAttributesAdjuster_SpanWithLibraryAttributes(t *testing.T) { span.Attributes().PutStr(string(otelsemconv.TelemetryDistroVersionKey), "blah") span.Attributes().PutStr("another_key", "another_value") - adjuster := ResourceAttributes() + adjuster := MoveLibraryAttributes() adjuster.Adjust(traces) resultSpanAttributes := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes() @@ -67,7 +67,7 @@ func TestResourceAttributesAdjuster_SpanWithoutLibraryAttributes(t *testing.T) { span := rs.ScopeSpans().AppendEmpty().Spans().AppendEmpty() span.Attributes().PutStr("random_key", "random_value") - adjuster := ResourceAttributes() + adjuster := MoveLibraryAttributes() adjuster.Adjust(traces) resultSpanAttributes := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes() @@ -85,7 +85,7 @@ func TestResourceAttributesAdjuster_SpanWithConflictingLibraryAttributes(t *test span.Attributes().PutStr("random_key", "random_value") span.Attributes().PutStr(string(otelsemconv.TelemetrySDKLanguageKey), "Java") - adjuster := ResourceAttributes() + adjuster := MoveLibraryAttributes() adjuster.Adjust(traces) resultSpan := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0) @@ -119,7 +119,7 @@ func TestResourceAttributesAdjuster_SpanWithNonConflictingLibraryAttributes(t *t span.Attributes().PutStr("random_key", "random_value") span.Attributes().PutStr(string(otelsemconv.TelemetrySDKLanguageKey), "Go") - adjuster := ResourceAttributes() + adjuster := MoveLibraryAttributes() adjuster.Adjust(traces) resultSpanAttributes := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes() diff --git a/cmd/query/app/querysvc/adjuster/sort.go b/cmd/query/app/querysvc/adjuster/sort.go index 0ed38af3286..96f50de4a49 100644 --- a/cmd/query/app/querysvc/adjuster/sort.go +++ b/cmd/query/app/querysvc/adjuster/sort.go @@ -12,14 +12,14 @@ import ( var _ Adjuster = (*SortAttributesAndEventsAdjuster)(nil) -// SortAttributesAndEvents creates an adjuster that standardizes trace data by sorting elements: +// SortCollections creates an adjuster that standardizes trace data by sorting elements: // - Resource attributes are sorted lexicographically by their keys. // - Scope attributes are sorted lexicographically by their keys. // - Span attributes are sorted lexicographically by their keys. // - Span events are sorted lexicographically by their names. // - Attributes within each span event are sorted lexicographically by their keys. // - Attributes within each span link are sorted lexicographically by their keys. -func SortAttributesAndEvents() SortAttributesAndEventsAdjuster { +func SortCollections() SortAttributesAndEventsAdjuster { return SortAttributesAndEventsAdjuster{} } diff --git a/cmd/query/app/querysvc/adjuster/sort_test.go b/cmd/query/app/querysvc/adjuster/sort_test.go index d9bef8baa37..ba9d849cff8 100644 --- a/cmd/query/app/querysvc/adjuster/sort_test.go +++ b/cmd/query/app/querysvc/adjuster/sort_test.go @@ -11,7 +11,7 @@ import ( ) func TestSortAttributesAndEventsAdjuster(t *testing.T) { - adjuster := SortAttributesAndEvents() + adjuster := SortCollections() input := func() ptrace.Traces { traces := ptrace.NewTraces() rs := traces.ResourceSpans().AppendEmpty() diff --git a/cmd/query/app/querysvc/adjuster/spaniduniquifier.go b/cmd/query/app/querysvc/adjuster/spaniduniquifier.go index b21ba7eeb6d..81720896e18 100644 --- a/cmd/query/app/querysvc/adjuster/spaniduniquifier.go +++ b/cmd/query/app/querysvc/adjuster/spaniduniquifier.go @@ -15,7 +15,7 @@ import ( var errTooManySpans = errors.New("cannot assign unique span ID, too many spans in the trace") -// SpanIDUniquifier returns an adjuster that changes span ids for server +// DeduplicateClientServerSpanIDs returns an adjuster that changes span ids for server // spans (i.e. spans with tag: span.kind == server) if there is another // client span that shares the same span ID. This is needed to deal with // Zipkin-style clients that reuse the same span ID for both client and server @@ -23,7 +23,7 @@ var errTooManySpans = errors.New("cannot assign unique span ID, too many spans i // // Any issues encountered during adjustment are recorded as warnings in the // span. -func SpanIDUniquifier() Adjuster { +func DeduplicateClientServerSpanIDs() Adjuster { return Func(func(traces ptrace.Traces) { adjuster := spanIDDeduper{ spansByID: make(map[pcommon.SpanID][]ptrace.Span), diff --git a/cmd/query/app/querysvc/adjuster/spaniduniquifier_test.go b/cmd/query/app/querysvc/adjuster/spaniduniquifier_test.go index 5ad6868f443..282e0dcd50e 100644 --- a/cmd/query/app/querysvc/adjuster/spaniduniquifier_test.go +++ b/cmd/query/app/querysvc/adjuster/spaniduniquifier_test.go @@ -46,7 +46,7 @@ func makeTraces() ptrace.Traces { func TestSpanIDUniquifierTriggered(t *testing.T) { traces := makeTraces() - deduper := SpanIDUniquifier() + deduper := DeduplicateClientServerSpanIDs() deduper.Adjust(traces) spans := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans() @@ -73,7 +73,7 @@ func TestSpanIDUniquifierNotTriggered(t *testing.T) { spans.At(2).CopyTo(newSpans.AppendEmpty()) newSpans.CopyTo(spans) - deduper := SpanIDUniquifier() + deduper := DeduplicateClientServerSpanIDs() deduper.Adjust(traces) gotSpans := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans() diff --git a/cmd/query/app/querysvc/adjuster/spanlinks.go b/cmd/query/app/querysvc/adjuster/spanlinks.go index 60730fa5ecf..a075be452b8 100644 --- a/cmd/query/app/querysvc/adjuster/spanlinks.go +++ b/cmd/query/app/querysvc/adjuster/spanlinks.go @@ -15,8 +15,8 @@ const ( var _ Adjuster = (*LinksAdjuster)(nil) -// SpanLinks creates an adjuster that removes span links with empty trace IDs. -func SpanLinks() LinksAdjuster { +// RemoveEmptySpanLinks creates an adjuster that removes span links with empty trace IDs. +func RemoveEmptySpanLinks() LinksAdjuster { return LinksAdjuster{} } diff --git a/cmd/query/app/querysvc/adjuster/spanlinks_test.go b/cmd/query/app/querysvc/adjuster/spanlinks_test.go index 33a8a5a78a6..ed2f69439cc 100644 --- a/cmd/query/app/querysvc/adjuster/spanlinks_test.go +++ b/cmd/query/app/querysvc/adjuster/spanlinks_test.go @@ -31,7 +31,7 @@ func TestLinksAdjuster(t *testing.T) { spanC.Links().AppendEmpty().SetTraceID(pcommon.TraceID([]byte{0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0})) spanC.Links().AppendEmpty().SetTraceID(pcommon.TraceID([]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0})) - SpanLinks().Adjust(traces) + RemoveEmptySpanLinks().Adjust(traces) spans := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans() gotSpansA := spans.At(0) diff --git a/cmd/query/app/querysvc/adjuster/standard.go b/cmd/query/app/querysvc/adjuster/standard.go new file mode 100644 index 00000000000..cfbb8915192 --- /dev/null +++ b/cmd/query/app/querysvc/adjuster/standard.go @@ -0,0 +1,23 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package adjuster + +import ( + "time" +) + +// StandardAdjusters returns a list of adjusters applied by the query service +// before returning the data to the API clients. +func StandardAdjusters(maxClockSkewAdjust time.Duration) []Adjuster { + return []Adjuster{ + DeduplicateClientServerSpanIDs(), + SortCollections(), + // DeduplicateSpans depends on SortCollections running first + DeduplicateSpans(), + CorrectClockSkew(maxClockSkewAdjust), + NormalizeIPAttributes(), + MoveLibraryAttributes(), + RemoveEmptySpanLinks(), + } +} diff --git a/cmd/query/app/querysvc/adjuster/standard_test.go b/cmd/query/app/querysvc/adjuster/standard_test.go new file mode 100644 index 00000000000..d2fadfe2ec0 --- /dev/null +++ b/cmd/query/app/querysvc/adjuster/standard_test.go @@ -0,0 +1,25 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package adjuster + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestStandardAdjusters(t *testing.T) { + maxClockSkewAdjust := 10 * time.Second + adjusters := StandardAdjusters(maxClockSkewAdjust) + + assert.Len(t, adjusters, 7, "Expected 7 adjusters") + assert.IsType(t, DeduplicateClientServerSpanIDs(), adjusters[0]) + assert.IsType(t, SortCollections(), adjusters[1]) + assert.IsType(t, DeduplicateSpans(), adjusters[2]) + assert.IsType(t, CorrectClockSkew(maxClockSkewAdjust), adjusters[3]) + assert.IsType(t, NormalizeIPAttributes(), adjusters[4]) + assert.IsType(t, MoveLibraryAttributes(), adjusters[5]) + assert.IsType(t, RemoveEmptySpanLinks(), adjusters[6]) +} From 5b1c76b7885e047ad3924455f27a31f35fd32b50 Mon Sep 17 00:00:00 2001 From: Jonah Kowall Date: Mon, 23 Dec 2024 16:06:11 -0500 Subject: [PATCH 5/6] Compatibility updates for v2 in README (#6316) ## Which problem is this PR solving? Addresses part of https://github.com/jaegertracing/documentation/issues/117 ## Description of the changes Update text to match Jaeger v2 ## How was this change tested? Not tested ## Checklist - [X] I have read https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md - [X] I have signed all commits --------- Signed-off-by: Jonah Kowall --- README.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 783edaeb599..b843db4fbf3 100644 --- a/README.md +++ b/README.md @@ -47,19 +47,19 @@ Jaeger is an open source project with open governance. We welcome contributions ## Version Compatibility Guarantees -Occasionally, CLI flags can be deprecated due to, for example, usability improvements or new functionality. +Since Jaeger uses many components from the [OpenTelemetry Collector](https://github.com/open-telemetry/opentelemetry-collector/) we try to maintain configuration compatibility between Jaeger releases. Occasionally, configuration options in Jaeger (or in Jaeger v1 CLI flags) can be deprecated due to usability improvements, new functionality, or changes in our dependencies. In such situations, developers introducing the deprecation are required to follow [these guidelines](./CONTRIBUTING.md#deprecating-cli-flags). -In short, for a deprecated CLI flag, you should expect to see the following message in the `--help` documentation: +In short, for a deprecated configuration option, you should expect to see the following message in the documentation or release notes: ``` (deprecated, will be removed after yyyy-mm-dd or in release vX.Y.Z, whichever is later) ``` A grace period of at least **3 months** or **two minor version bumps** (whichever is later) from the first release -containing the deprecation notice will be provided before the deprecated CLI flag _can_ be deleted. +containing the deprecation notice will be provided before the deprecated configuration option _can_ be deleted. -For example, consider a scenario where v1.28.0 is released on 01-Jun-2021 containing a deprecation notice for a CLI flag. -This flag will remain in a deprecated state until the later of 01-Sep-2021 or v1.30.0 where it _can_ be removed on or after either of those events. +For example, consider a scenario where v2.0.0 is released on 01-Sep-2024 containing a deprecation notice for a configuration option. +This configuration option will remain in a deprecated state until the later of 01-Dec-2024 or v2.2.0 where it _can_ be removed on or after either of those events. It may remain deprecated for longer than the aforementioned grace period. ## Go Version Compatibility Guarantees From 53468aad5bcb2ee1e89386d38dfac556ddaf4095 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab <43658574+mahadzaryab1@users.noreply.github.com> Date: Mon, 23 Dec 2024 23:57:01 -0500 Subject: [PATCH 6/6] [v2][adjuster] Implement model to otlp translator with post processing (#6397) ## Which problem is this PR solving? - Towards #6344 ## Description of the changes - Implemented a translator in `jptrace` with a function `ProtoToTraces` that is a wrapper of the upstream `ProtoToTraces` in opentelemetry-collector-contrib jaeger translator. This function appends the warnings in model.Span to the `jaeger.internal.warnings` attribute in the corresponding span in ptrace.Traces. ## How was this change tested? - Added unit tests ## Checklist - [x] I have read https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md - [x] I have signed all commits - [x] I have added unit tests for the new functionality - [x] I have run lint and test steps successfully - for `jaeger`: `make lint test` - for `jaeger-ui`: `npm run lint` and `npm run test` --------- Signed-off-by: Mahad Zaryab --- .golangci.yml | 13 ++-- .../internal/integration/span_writer.go | 7 +- cmd/query/app/apiv3/otlp_translator.go | 5 +- cmd/query/app/querysvc/adjuster/clockskew.go | 10 +-- cmd/query/app/querysvc/adjuster/hash.go | 2 +- .../querysvc/adjuster/libraryattributes.go | 2 +- .../app/querysvc/adjuster/spaniduniquifier.go | 2 +- cmd/query/app/querysvc/adjuster/spanlinks.go | 2 +- internal/jptrace/translator.go | 55 ++++++++++++++-- internal/jptrace/translator_test.go | 66 ++++++++++++++++++- internal/jptrace/warning.go | 12 ++-- internal/jptrace/warning_test.go | 12 +++- storage_v2/v1adapter/reader.go | 8 +-- 13 files changed, 153 insertions(+), 43 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index 2cca15f1d14..bf9ff03b876 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -139,13 +139,12 @@ linters-settings: files: - "**_test.go" - # TODO: enable after import is not used anywhere - # disallow-otel-contrib-translator: - # deny: - # - pkg: github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger - # desc: "Use jptrace package instead of opentelemetry-collector-contrib/pkg/translator/jaeger" - # files: - # - "!**/jptrace/**" + disallow-otel-contrib-translator: + deny: + - pkg: github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger + desc: "Use jptrace package instead of opentelemetry-collector-contrib/pkg/translator/jaeger" + files: + - "!**/jptrace/**" goimports: local-prefixes: github.com/jaegertracing/jaeger diff --git a/cmd/jaeger/internal/integration/span_writer.go b/cmd/jaeger/internal/integration/span_writer.go index da722b1681f..2f718fb5022 100644 --- a/cmd/jaeger/internal/integration/span_writer.go +++ b/cmd/jaeger/internal/integration/span_writer.go @@ -9,7 +9,6 @@ import ( "io" "time" - jaeger2otlp "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/exporter" @@ -17,6 +16,7 @@ import ( "go.opentelemetry.io/collector/exporter/otlpexporter" "go.uber.org/zap" + "github.com/jaegertracing/jaeger/internal/jptrace" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/storage/spanstore" ) @@ -68,15 +68,12 @@ func (w *spanWriter) Close() error { } func (w *spanWriter) WriteSpan(ctx context.Context, span *model.Span) error { - td, err := jaeger2otlp.ProtoToTraces([]*model.Batch{ + td := jptrace.ProtoToTraces([]*model.Batch{ { Spans: []*model.Span{span}, Process: span.Process, }, }) - if err != nil { - return err - } return w.exporter.ConsumeTraces(ctx, td) } diff --git a/cmd/query/app/apiv3/otlp_translator.go b/cmd/query/app/apiv3/otlp_translator.go index 1a39ca79845..e9767720903 100644 --- a/cmd/query/app/apiv3/otlp_translator.go +++ b/cmd/query/app/apiv3/otlp_translator.go @@ -4,15 +4,14 @@ package apiv3 import ( - model2otel "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" "go.opentelemetry.io/collector/pdata/ptrace" + "github.com/jaegertracing/jaeger/internal/jptrace" "github.com/jaegertracing/jaeger/model" ) func modelToOTLP(spans []*model.Span) ptrace.Traces { batch := &model.Batch{Spans: spans} - // there is never an error returned from ProtoToTraces - tr, _ := model2otel.ProtoToTraces([]*model.Batch{batch}) + tr := jptrace.ProtoToTraces([]*model.Batch{batch}) return tr } diff --git a/cmd/query/app/querysvc/adjuster/clockskew.go b/cmd/query/app/querysvc/adjuster/clockskew.go index 70e00fd1238..c3ea335aa2f 100644 --- a/cmd/query/app/querysvc/adjuster/clockskew.go +++ b/cmd/query/app/querysvc/adjuster/clockskew.go @@ -104,7 +104,7 @@ func (a *clockSkewAdjuster) buildNodesMap() { for k := 0; k < spans.Len(); k++ { span := spans.At(k) if _, exists := a.spans[span.SpanID()]; exists { - jptrace.AddWarning(span, warningDuplicateSpanID) + jptrace.AddWarnings(span, warningDuplicateSpanID) } else { a.spans[span.SpanID()] = &node{ span: span, @@ -129,7 +129,7 @@ func (a *clockSkewAdjuster) buildSubGraphs() { p.children = append(p.children, n) } else { warning := fmt.Sprintf(warningMissingParentSpanID, n.span.ParentSpanID()) - jptrace.AddWarning(n.span, warning) + jptrace.AddWarnings(n.span, warning) // treat spans with invalid parent ID as root spans a.roots[n.span.SpanID()] = n } @@ -185,14 +185,14 @@ func (a *clockSkewAdjuster) adjustTimestamps(n *node, skew clockSkew) { } if absDuration(skew.delta) > a.maxDelta { if a.maxDelta == 0 { - jptrace.AddWarning(n.span, fmt.Sprintf(warningSkewAdjustDisabled, skew.delta)) + jptrace.AddWarnings(n.span, fmt.Sprintf(warningSkewAdjustDisabled, skew.delta)) return } - jptrace.AddWarning(n.span, fmt.Sprintf(warningMaxDeltaExceeded, a.maxDelta, skew.delta)) + jptrace.AddWarnings(n.span, fmt.Sprintf(warningMaxDeltaExceeded, a.maxDelta, skew.delta)) return } n.span.SetStartTimestamp(pcommon.NewTimestampFromTime(n.span.StartTimestamp().AsTime().Add(skew.delta))) - jptrace.AddWarning(n.span, fmt.Sprintf("This span's timestamps were adjusted by %v", skew.delta)) + jptrace.AddWarnings(n.span, fmt.Sprintf("This span's timestamps were adjusted by %v", skew.delta)) for i := 0; i < n.span.Events().Len(); i++ { event := n.span.Events().At(i) event.SetTimestamp(pcommon.NewTimestampFromTime(event.Timestamp().AsTime().Add(skew.delta))) diff --git a/cmd/query/app/querysvc/adjuster/hash.go b/cmd/query/app/querysvc/adjuster/hash.go index 55d6347ed9b..458258c7c3c 100644 --- a/cmd/query/app/querysvc/adjuster/hash.go +++ b/cmd/query/app/querysvc/adjuster/hash.go @@ -56,7 +56,7 @@ func (s *SpanHashDeduper) Adjust(traces ptrace.Traces) { hashTrace, ) if err != nil { - jptrace.AddWarning(span, fmt.Sprintf("failed to compute hash code: %v", err)) + jptrace.AddWarnings(span, fmt.Sprintf("failed to compute hash code: %v", err)) span.CopyTo(dedupedSpans.AppendEmpty()) continue } diff --git a/cmd/query/app/querysvc/adjuster/libraryattributes.go b/cmd/query/app/querysvc/adjuster/libraryattributes.go index d2d0bf64dfd..418a1206c29 100644 --- a/cmd/query/app/querysvc/adjuster/libraryattributes.go +++ b/cmd/query/app/querysvc/adjuster/libraryattributes.go @@ -59,7 +59,7 @@ func (ResourceAttributesAdjuster) moveAttributes(span ptrace.Span, resource pcom for k, v := range replace { existing, ok := resource.Attributes().Get(k) if ok && existing.AsRaw() != v.AsRaw() { - jptrace.AddWarning(span, "conflicting values between Span and Resource for attribute "+k) + jptrace.AddWarnings(span, "conflicting values between Span and Resource for attribute "+k) continue } v.CopyTo(resource.Attributes().PutEmpty(k)) diff --git a/cmd/query/app/querysvc/adjuster/spaniduniquifier.go b/cmd/query/app/querysvc/adjuster/spaniduniquifier.go index 81720896e18..687fea06162 100644 --- a/cmd/query/app/querysvc/adjuster/spaniduniquifier.go +++ b/cmd/query/app/querysvc/adjuster/spaniduniquifier.go @@ -89,7 +89,7 @@ func (d *spanIDDeduper) uniquifyServerSpanIDs(traces ptrace.Traces) { if span.Kind() == ptrace.SpanKindServer && d.isSharedWithClientSpan(span.SpanID()) { newID, err := d.makeUniqueSpanID() if err != nil { - jptrace.AddWarning(span, err.Error()) + jptrace.AddWarnings(span, err.Error()) continue } oldToNewSpanIDs[span.SpanID()] = newID diff --git a/cmd/query/app/querysvc/adjuster/spanlinks.go b/cmd/query/app/querysvc/adjuster/spanlinks.go index a075be452b8..3fe3cb7ec3a 100644 --- a/cmd/query/app/querysvc/adjuster/spanlinks.go +++ b/cmd/query/app/querysvc/adjuster/spanlinks.go @@ -48,7 +48,7 @@ func (la LinksAdjuster) adjust(span ptrace.Span) { newLink := validLinks.AppendEmpty() link.CopyTo(newLink) } else { - jptrace.AddWarning(span, invalidSpanLinkWarning) + jptrace.AddWarnings(span, invalidSpanLinkWarning) } } validLinks.CopyTo(span.Links()) diff --git a/internal/jptrace/translator.go b/internal/jptrace/translator.go index 7480b2b5075..82524696665 100644 --- a/internal/jptrace/translator.go +++ b/internal/jptrace/translator.go @@ -5,6 +5,7 @@ package jptrace import ( jaegerTranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" "github.com/jaegertracing/jaeger/model" @@ -15,10 +16,19 @@ import ( func ProtoFromTraces(traces ptrace.Traces) []*model.Batch { batches := jaegerTranslator.ProtoFromTraces(traces) spanMap := createSpanMapFromBatches(batches) - transferWarnings(traces, spanMap) + transferWarningsToModelSpans(traces, spanMap) return batches } +// ProtoToTraces converts Jaeger model batches ([]*model.Batch) +// to OpenTelemetry traces (ptrace.Traces). +func ProtoToTraces(batches []*model.Batch) ptrace.Traces { + traces, _ := jaegerTranslator.ProtoToTraces(batches) // never returns an error + spanMap := createSpanMapFromTraces(traces) + transferWarningsToOTLPSpans(batches, spanMap) + return traces +} + func createSpanMapFromBatches(batches []*model.Batch) map[model.SpanID]*model.Span { spanMap := make(map[model.SpanID]*model.Span) for _, batch := range batches { @@ -29,7 +39,23 @@ func createSpanMapFromBatches(batches []*model.Batch) map[model.SpanID]*model.Sp return spanMap } -func transferWarnings(traces ptrace.Traces, spanMap map[model.SpanID]*model.Span) { +func createSpanMapFromTraces(traces ptrace.Traces) map[pcommon.SpanID]ptrace.Span { + spanMap := make(map[pcommon.SpanID]ptrace.Span) + resources := traces.ResourceSpans() + for i := 0; i < resources.Len(); i++ { + scopes := resources.At(i).ScopeSpans() + for j := 0; j < scopes.Len(); j++ { + spans := scopes.At(j).Spans() + for k := 0; k < spans.Len(); k++ { + span := spans.At(k) + spanMap[span.SpanID()] = span + } + } + } + return spanMap +} + +func transferWarningsToModelSpans(traces ptrace.Traces, spanMap map[model.SpanID]*model.Span) { resources := traces.ResourceSpans() for i := 0; i < resources.Len(); i++ { scopes := resources.At(i).ScopeSpans() @@ -38,10 +64,27 @@ func transferWarnings(traces ptrace.Traces, spanMap map[model.SpanID]*model.Span for k := 0; k < spans.Len(); k++ { otelSpan := spans.At(k) warnings := GetWarnings(otelSpan) - span := spanMap[model.SpanIDFromOTEL(otelSpan.SpanID())] - span.Warnings = append(span.Warnings, warnings...) - // filter out the warning tag - span.Tags = filterTags(span.Tags, warningsAttribute) + if len(warnings) == 0 { + continue + } + if span, ok := spanMap[model.SpanIDFromOTEL(otelSpan.SpanID())]; ok { + span.Warnings = append(span.Warnings, warnings...) + // filter out the warning tag + span.Tags = filterTags(span.Tags, warningsAttribute) + } + } + } + } +} + +func transferWarningsToOTLPSpans(batches []*model.Batch, spanMap map[pcommon.SpanID]ptrace.Span) { + for _, batch := range batches { + for _, span := range batch.Spans { + if len(span.Warnings) == 0 { + continue + } + if otelSpan, ok := spanMap[span.SpanID.ToOTELSpanID()]; ok { + AddWarnings(otelSpan, span.Warnings...) } } } diff --git a/internal/jptrace/translator_test.go b/internal/jptrace/translator_test.go index 5ff3129a00f..9e021a35abb 100644 --- a/internal/jptrace/translator_test.go +++ b/internal/jptrace/translator_test.go @@ -20,8 +20,8 @@ func TestProtoFromTraces_AddsWarnings(t *testing.T) { span1 := ss1.Spans().AppendEmpty() span1.SetName("test-span-1") span1.SetSpanID(pcommon.SpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8})) - AddWarning(span1, "test-warning-1") - AddWarning(span1, "test-warning-2") + AddWarnings(span1, "test-warning-1") + AddWarnings(span1, "test-warning-2") span1.Attributes().PutStr("key", "value") ss2 := rs1.ScopeSpans().AppendEmpty() @@ -34,7 +34,7 @@ func TestProtoFromTraces_AddsWarnings(t *testing.T) { span3 := ss3.Spans().AppendEmpty() span3.SetName("test-span-3") span3.SetSpanID(pcommon.SpanID([8]byte{17, 18, 19, 20, 21, 22, 23, 24})) - AddWarning(span3, "test-warning-3") + AddWarnings(span3, "test-warning-3") batches := ProtoFromTraces(traces) @@ -53,3 +53,63 @@ func TestProtoFromTraces_AddsWarnings(t *testing.T) { assert.Equal(t, []string{"test-warning-3"}, batches[1].Spans[0].Warnings) assert.Empty(t, batches[1].Spans[0].Tags) } + +func TestProtoToTraces_AddsWarnings(t *testing.T) { + batch1 := &model.Batch{ + Process: &model.Process{ + ServiceName: "batch-1", + }, + Spans: []*model.Span{ + { + OperationName: "test-span-1", + SpanID: model.NewSpanID(1), + Warnings: []string{"test-warning-1", "test-warning-2"}, + }, + { + OperationName: "test-span-2", + SpanID: model.NewSpanID(2), + }, + }, + } + batch2 := &model.Batch{ + Process: &model.Process{ + ServiceName: "batch-2", + }, + Spans: []*model.Span{ + { + OperationName: "test-span-3", + SpanID: model.NewSpanID(3), + Warnings: []string{"test-warning-3"}, + }, + }, + } + batches := []*model.Batch{batch1, batch2} + traces := ProtoToTraces(batches) + + assert.Equal(t, 2, traces.ResourceSpans().Len()) + + rs1 := traces.ResourceSpans().At(0) + assert.Equal(t, 1, rs1.ScopeSpans().Len()) + ss1 := rs1.ScopeSpans().At(0) + assert.Equal(t, 2, ss1.Spans().Len()) + + span1 := ss1.Spans().At(0) + assert.Equal(t, "test-span-1", span1.Name()) + assert.Equal(t, pcommon.SpanID([8]byte{0, 0, 0, 0, 0, 0, 0, 1}), span1.SpanID()) + assert.Equal(t, []string{"test-warning-1", "test-warning-2"}, GetWarnings(span1)) + + span2 := ss1.Spans().At(1) + assert.Equal(t, "test-span-2", span2.Name()) + assert.Equal(t, pcommon.SpanID([8]byte{0, 0, 0, 0, 0, 0, 0, 2}), span2.SpanID()) + assert.Empty(t, GetWarnings(span2)) + + rs2 := traces.ResourceSpans().At(1) + assert.Equal(t, 1, rs2.ScopeSpans().Len()) + ss3 := rs2.ScopeSpans().At(0) + assert.Equal(t, 1, ss3.Spans().Len()) + + span3 := ss3.Spans().At(0) + assert.Equal(t, "test-span-3", span3.Name()) + assert.Equal(t, pcommon.SpanID([8]byte{0, 0, 0, 0, 0, 0, 0, 3}), span3.SpanID()) + assert.Equal(t, []string{"test-warning-3"}, GetWarnings(span3)) +} diff --git a/internal/jptrace/warning.go b/internal/jptrace/warning.go index 0d96296c214..5247930fc25 100644 --- a/internal/jptrace/warning.go +++ b/internal/jptrace/warning.go @@ -16,14 +16,16 @@ const ( warningsAttribute = "jaeger.internal.warnings" ) -func AddWarning(span ptrace.Span, warning string) { - var warnings pcommon.Slice +func AddWarnings(span ptrace.Span, warnings ...string) { + var w pcommon.Slice if currWarnings, ok := span.Attributes().Get(warningsAttribute); ok { - warnings = currWarnings.Slice() + w = currWarnings.Slice() } else { - warnings = span.Attributes().PutEmptySlice(warningsAttribute) + w = span.Attributes().PutEmptySlice(warningsAttribute) + } + for _, warning := range warnings { + w.AppendEmpty().SetStr(warning) } - warnings.AppendEmpty().SetStr(warning) } func GetWarnings(span ptrace.Span) []string { diff --git a/internal/jptrace/warning_test.go b/internal/jptrace/warning_test.go index b8e3c38032c..9bc62437dd4 100644 --- a/internal/jptrace/warning_test.go +++ b/internal/jptrace/warning_test.go @@ -7,6 +7,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/ptrace" ) @@ -46,7 +47,7 @@ func TestAddWarning(t *testing.T) { warnings.AppendEmpty().SetStr(warn) } } - AddWarning(span, test.newWarn) + AddWarnings(span, test.newWarn) warnings, ok := attrs.Get("jaeger.internal.warnings") assert.True(t, ok) assert.Equal(t, len(test.expected), warnings.Slice().Len()) @@ -57,6 +58,15 @@ func TestAddWarning(t *testing.T) { } } +func TestAddWarning_MultipleWarnings(t *testing.T) { + span := ptrace.NewSpan() + AddWarnings(span, "warning-1", "warning-2") + warnings, ok := span.Attributes().Get("jaeger.internal.warnings") + require.True(t, ok) + require.Equal(t, "warning-1", warnings.Slice().At(0).Str()) + require.Equal(t, "warning-2", warnings.Slice().At(1).Str()) +} + func TestGetWarnings(t *testing.T) { tests := []struct { name string diff --git a/storage_v2/v1adapter/reader.go b/storage_v2/v1adapter/reader.go index a3d2293dd25..04508a1247c 100644 --- a/storage_v2/v1adapter/reader.go +++ b/storage_v2/v1adapter/reader.go @@ -7,10 +7,10 @@ import ( "context" "errors" - model2otel "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" + "github.com/jaegertracing/jaeger/internal/jptrace" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/iter" "github.com/jaegertracing/jaeger/storage/dependencystore" @@ -60,8 +60,8 @@ func (tr *TraceReader) GetTraces( return } batch := &model.Batch{Spans: t.GetSpans()} - tr, err := model2otel.ProtoToTraces([]*model.Batch{batch}) - if !yield([]ptrace.Traces{tr}, err) || err != nil { + tr := jptrace.ProtoToTraces([]*model.Batch{batch}) + if !yield([]ptrace.Traces{tr}, nil) { return } } @@ -105,7 +105,7 @@ func (tr *TraceReader) FindTraces( } for _, trace := range traces { batch := &model.Batch{Spans: trace.GetSpans()} - otelTrace, _ := model2otel.ProtoToTraces([]*model.Batch{batch}) + otelTrace := jptrace.ProtoToTraces([]*model.Batch{batch}) if !yield([]ptrace.Traces{otelTrace}, nil) { return }