diff --git a/.golangci.yml b/.golangci.yml index 5427041391e..bf9ff03b876 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -139,6 +139,13 @@ linters-settings: files: - "**_test.go" + disallow-otel-contrib-translator: + deny: + - pkg: github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger + desc: "Use jptrace package instead of opentelemetry-collector-contrib/pkg/translator/jaeger" + files: + - "!**/jptrace/**" + goimports: local-prefixes: github.com/jaegertracing/jaeger gosec: diff --git a/README.md b/README.md index 783edaeb599..b843db4fbf3 100644 --- a/README.md +++ b/README.md @@ -47,19 +47,19 @@ Jaeger is an open source project with open governance. We welcome contributions ## Version Compatibility Guarantees -Occasionally, CLI flags can be deprecated due to, for example, usability improvements or new functionality. +Since Jaeger uses many components from the [OpenTelemetry Collector](https://github.com/open-telemetry/opentelemetry-collector/) we try to maintain configuration compatibility between Jaeger releases. Occasionally, configuration options in Jaeger (or in Jaeger v1 CLI flags) can be deprecated due to usability improvements, new functionality, or changes in our dependencies. In such situations, developers introducing the deprecation are required to follow [these guidelines](./CONTRIBUTING.md#deprecating-cli-flags). -In short, for a deprecated CLI flag, you should expect to see the following message in the `--help` documentation: +In short, for a deprecated configuration option, you should expect to see the following message in the documentation or release notes: ``` (deprecated, will be removed after yyyy-mm-dd or in release vX.Y.Z, whichever is later) ``` A grace period of at least **3 months** or **two minor version bumps** (whichever is later) from the first release -containing the deprecation notice will be provided before the deprecated CLI flag _can_ be deleted. +containing the deprecation notice will be provided before the deprecated configuration option _can_ be deleted. -For example, consider a scenario where v1.28.0 is released on 01-Jun-2021 containing a deprecation notice for a CLI flag. -This flag will remain in a deprecated state until the later of 01-Sep-2021 or v1.30.0 where it _can_ be removed on or after either of those events. +For example, consider a scenario where v2.0.0 is released on 01-Sep-2024 containing a deprecation notice for a configuration option. +This configuration option will remain in a deprecated state until the later of 01-Dec-2024 or v2.2.0 where it _can_ be removed on or after either of those events. It may remain deprecated for longer than the aforementioned grace period. ## Go Version Compatibility Guarantees diff --git a/cmd/collector/app/handler/otlp_receiver.go b/cmd/collector/app/handler/otlp_receiver.go index ba450e4a298..ebde8d2ba38 100644 --- a/cmd/collector/app/handler/otlp_receiver.go +++ b/cmd/collector/app/handler/otlp_receiver.go @@ -7,7 +7,6 @@ import ( "context" "fmt" - otlp2jaeger "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/config/confignet" @@ -23,6 +22,7 @@ import ( "github.com/jaegertracing/jaeger/cmd/collector/app/flags" "github.com/jaegertracing/jaeger/cmd/collector/app/processor" + "github.com/jaegertracing/jaeger/internal/jptrace" "github.com/jaegertracing/jaeger/pkg/tenancy" ) @@ -108,7 +108,7 @@ type consumerDelegate struct { } func (c *consumerDelegate) consume(ctx context.Context, td ptrace.Traces) error { - batches := otlp2jaeger.ProtoFromTraces(td) + batches := jptrace.ProtoFromTraces(td) for _, batch := range batches { err := c.batchConsumer.consume(ctx, batch) if err != nil { diff --git a/cmd/jaeger/internal/integration/span_writer.go b/cmd/jaeger/internal/integration/span_writer.go index da722b1681f..2f718fb5022 100644 --- a/cmd/jaeger/internal/integration/span_writer.go +++ b/cmd/jaeger/internal/integration/span_writer.go @@ -9,7 +9,6 @@ import ( "io" "time" - jaeger2otlp "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/exporter" @@ -17,6 +16,7 @@ import ( "go.opentelemetry.io/collector/exporter/otlpexporter" "go.uber.org/zap" + "github.com/jaegertracing/jaeger/internal/jptrace" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/storage/spanstore" ) @@ -68,15 +68,12 @@ func (w *spanWriter) Close() error { } func (w *spanWriter) WriteSpan(ctx context.Context, span *model.Span) error { - td, err := jaeger2otlp.ProtoToTraces([]*model.Batch{ + td := jptrace.ProtoToTraces([]*model.Batch{ { Spans: []*model.Span{span}, Process: span.Process, }, }) - if err != nil { - return err - } return w.exporter.ConsumeTraces(ctx, td) } diff --git a/cmd/jaeger/internal/processors/adaptivesampling/processor.go b/cmd/jaeger/internal/processors/adaptivesampling/processor.go index 4d14f7f99c4..7a0ca2a0641 100644 --- a/cmd/jaeger/internal/processors/adaptivesampling/processor.go +++ b/cmd/jaeger/internal/processors/adaptivesampling/processor.go @@ -7,12 +7,12 @@ import ( "context" "fmt" - otlp2jaeger "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/ptrace" "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/samplingstrategy" "github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/remotesampling" + "github.com/jaegertracing/jaeger/internal/jptrace" "github.com/jaegertracing/jaeger/internal/metrics/otelmetrics" "github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/adaptive" ) @@ -65,7 +65,7 @@ func (tp *traceProcessor) close(context.Context) error { } func (tp *traceProcessor) processTraces(_ context.Context, td ptrace.Traces) (ptrace.Traces, error) { - batches := otlp2jaeger.ProtoFromTraces(td) + batches := jptrace.ProtoFromTraces(td) for _, batch := range batches { for _, span := range batch.Spans { if span.Process == nil { diff --git a/cmd/query/app/apiv3/otlp_translator.go b/cmd/query/app/apiv3/otlp_translator.go index 1a39ca79845..e9767720903 100644 --- a/cmd/query/app/apiv3/otlp_translator.go +++ b/cmd/query/app/apiv3/otlp_translator.go @@ -4,15 +4,14 @@ package apiv3 import ( - model2otel "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" "go.opentelemetry.io/collector/pdata/ptrace" + "github.com/jaegertracing/jaeger/internal/jptrace" "github.com/jaegertracing/jaeger/model" ) func modelToOTLP(spans []*model.Span) ptrace.Traces { batch := &model.Batch{Spans: spans} - // there is never an error returned from ProtoToTraces - tr, _ := model2otel.ProtoToTraces([]*model.Batch{batch}) + tr := jptrace.ProtoToTraces([]*model.Batch{batch}) return tr } diff --git a/cmd/query/app/otlp_translator.go b/cmd/query/app/otlp_translator.go index 06f7c63e440..daaaabf2aeb 100644 --- a/cmd/query/app/otlp_translator.go +++ b/cmd/query/app/otlp_translator.go @@ -6,9 +6,9 @@ package app import ( "fmt" - model2otel "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" "go.opentelemetry.io/collector/pdata/ptrace" + "github.com/jaegertracing/jaeger/internal/jptrace" "github.com/jaegertracing/jaeger/model" ) @@ -18,7 +18,7 @@ func otlp2traces(otlpSpans []byte) ([]*model.Trace, error) { if err != nil { return nil, fmt.Errorf("cannot unmarshal OTLP : %w", err) } - jaegerBatches := model2otel.ProtoFromTraces(otlpTraces) + jaegerBatches := jptrace.ProtoFromTraces(otlpTraces) var traces []*model.Trace traceMap := make(map[model.TraceID]*model.Trace) for _, batch := range jaegerBatches { diff --git a/cmd/query/app/querysvc/adjuster/clockskew.go b/cmd/query/app/querysvc/adjuster/clockskew.go new file mode 100644 index 00000000000..c3ea335aa2f --- /dev/null +++ b/cmd/query/app/querysvc/adjuster/clockskew.go @@ -0,0 +1,207 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package adjuster + +import ( + "fmt" + "time" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/jaegertracing/jaeger/internal/jptrace" + "github.com/jaegertracing/jaeger/pkg/otelsemconv" +) + +const ( + warningDuplicateSpanID = "duplicate span IDs; skipping clock skew adjustment" + warningMissingParentSpanID = "parent span ID=%s is not in the trace; skipping clock skew adjustment" + warningMaxDeltaExceeded = "max clock skew adjustment delta of %v exceeded; not applying calculated delta of %v" + warningSkewAdjustDisabled = "clock skew adjustment disabled; not applying calculated delta of %v" +) + +// CorrectClockSkew returns an Adjuster that corrects span timestamps for clock skew. +// +// This adjuster modifies the start and log timestamps of child spans that are +// inconsistent with their parent spans due to clock differences between hosts. +// It assumes all spans have unique IDs and should be used after SpanIDUniquifier. +// +// The adjuster determines if two spans belong to the same source by deriving a +// unique string representation of a host based on resource attributes, +// such as `host.id`, `host.ip`, or `host.name`. +// If two spans have the same host key, they are considered to be from +// the same source, and no clock skew adjustment is expected between them. +// +// Parameters: +// - maxDelta: The maximum allowable time adjustment. Adjustments exceeding +// this value will be ignored. +func CorrectClockSkew(maxDelta time.Duration) Adjuster { + return Func(func(traces ptrace.Traces) { + adjuster := &clockSkewAdjuster{ + traces: traces, + maxDelta: maxDelta, + } + adjuster.buildNodesMap() + adjuster.buildSubGraphs() + for _, root := range adjuster.roots { + skew := clockSkew{hostKey: root.hostKey} + adjuster.adjustNode(root, nil, skew) + } + }) +} + +type clockSkewAdjuster struct { + traces ptrace.Traces + maxDelta time.Duration + spans map[pcommon.SpanID]*node + roots map[pcommon.SpanID]*node +} + +type clockSkew struct { + delta time.Duration + hostKey string +} + +type node struct { + span ptrace.Span + children []*node + hostKey string +} + +// hostKey derives a unique string representation of a host based on resource attributes. +// This is used to determine if two spans are from the same host. +func hostKey(resource ptrace.ResourceSpans) string { + if attr, ok := resource.Resource().Attributes().Get(string(otelsemconv.HostIDKey)); ok { + return attr.Str() + } + if attr, ok := resource.Resource().Attributes().Get(string(otelsemconv.HostIPKey)); ok { + if attr.Type() == pcommon.ValueTypeStr { + return attr.Str() + } else if attr.Type() == pcommon.ValueTypeSlice { + ips := attr.Slice() + if ips.Len() > 0 { + return ips.At(0).AsString() + } + } + } + if attr, ok := resource.Resource().Attributes().Get(string(otelsemconv.HostNameKey)); ok { + return attr.Str() + } + return "" +} + +// buildNodesMap creates a mapping of span IDs to their corresponding nodes. +func (a *clockSkewAdjuster) buildNodesMap() { + a.spans = make(map[pcommon.SpanID]*node) + resources := a.traces.ResourceSpans() + for i := 0; i < resources.Len(); i++ { + resource := resources.At(i) + hk := hostKey(resource) + scopes := resource.ScopeSpans() + for j := 0; j < scopes.Len(); j++ { + spans := scopes.At(j).Spans() + for k := 0; k < spans.Len(); k++ { + span := spans.At(k) + if _, exists := a.spans[span.SpanID()]; exists { + jptrace.AddWarnings(span, warningDuplicateSpanID) + } else { + a.spans[span.SpanID()] = &node{ + span: span, + hostKey: hk, + } + } + } + } + } +} + +// finds all spans that have no parent, i.e. where parentID is either 0 +// or points to an ID for which there is no span. +func (a *clockSkewAdjuster) buildSubGraphs() { + a.roots = make(map[pcommon.SpanID]*node) + for _, n := range a.spans { + if n.span.ParentSpanID() == pcommon.NewSpanIDEmpty() { + a.roots[n.span.SpanID()] = n + continue + } + if p, ok := a.spans[n.span.ParentSpanID()]; ok { + p.children = append(p.children, n) + } else { + warning := fmt.Sprintf(warningMissingParentSpanID, n.span.ParentSpanID()) + jptrace.AddWarnings(n.span, warning) + // treat spans with invalid parent ID as root spans + a.roots[n.span.SpanID()] = n + } + } +} + +func (a *clockSkewAdjuster) adjustNode(n *node, parent *node, skew clockSkew) { + if (n.hostKey != skew.hostKey || n.hostKey == "") && parent != nil { + // Node n is from a different host. The parent has already been adjusted, + // so we can compare this node's timestamps against the parent. + skew = clockSkew{ + hostKey: n.hostKey, + delta: a.calculateSkew(n, parent), + } + } + a.adjustTimestamps(n, skew) + for _, child := range n.children { + a.adjustNode(child, n, skew) + } +} + +func (*clockSkewAdjuster) calculateSkew(child *node, parent *node) time.Duration { + parentStartTime := parent.span.StartTimestamp().AsTime() + childStartTime := child.span.StartTimestamp().AsTime() + parentEndTime := parent.span.EndTimestamp().AsTime() + childEndTime := child.span.EndTimestamp().AsTime() + parentDuration := parentEndTime.Sub(parentStartTime) + childDuration := childEndTime.Sub(childStartTime) + + if childDuration > parentDuration { + // When the child lasted longer than the parent, it was either + // async or the parent may have timed out before child responded. + // The only reasonable adjustment we can do in this case is to make + // sure the child does not start before parent. + if childStartTime.Before(parentStartTime) { + return parentStartTime.Sub(childStartTime) + } + return 0 + } + if !childStartTime.Before(parentStartTime) && !childEndTime.After(parentEndTime) { + // child already fits within the parent span, do not adjust + return 0 + } + // Assume that network latency is equally split between req and res. + latency := (parentDuration - childDuration) / 2 + // Goal: parentStartTime + latency = childStartTime + adjustment + return parentStartTime.Add(latency).Sub(childStartTime) +} + +func (a *clockSkewAdjuster) adjustTimestamps(n *node, skew clockSkew) { + if skew.delta == 0 { + return + } + if absDuration(skew.delta) > a.maxDelta { + if a.maxDelta == 0 { + jptrace.AddWarnings(n.span, fmt.Sprintf(warningSkewAdjustDisabled, skew.delta)) + return + } + jptrace.AddWarnings(n.span, fmt.Sprintf(warningMaxDeltaExceeded, a.maxDelta, skew.delta)) + return + } + n.span.SetStartTimestamp(pcommon.NewTimestampFromTime(n.span.StartTimestamp().AsTime().Add(skew.delta))) + jptrace.AddWarnings(n.span, fmt.Sprintf("This span's timestamps were adjusted by %v", skew.delta)) + for i := 0; i < n.span.Events().Len(); i++ { + event := n.span.Events().At(i) + event.SetTimestamp(pcommon.NewTimestampFromTime(event.Timestamp().AsTime().Add(skew.delta))) + } +} + +func absDuration(d time.Duration) time.Duration { + if d < 0 { + return -1 * d + } + return d +} diff --git a/cmd/query/app/querysvc/adjuster/clockskew_test.go b/cmd/query/app/querysvc/adjuster/clockskew_test.go new file mode 100644 index 00000000000..cfa94e0c292 --- /dev/null +++ b/cmd/query/app/querysvc/adjuster/clockskew_test.go @@ -0,0 +1,292 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package adjuster + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/jaegertracing/jaeger/internal/jptrace" +) + +func TestClockSkewAdjuster(t *testing.T) { + type testSpan struct { + id, parent [8]byte + startTime, duration int + events []int // timestamps for logs + host string + adjusted int // start time after adjustment + adjustedEvents []int // adjusted log timestamps + } + + toTime := func(t int) time.Time { + return time.Unix(0, (time.Duration(t) * time.Millisecond).Nanoseconds()) + } + + // helper function that constructs a trace from a list of span prototypes + makeTrace := func(spanPrototypes []testSpan) ptrace.Traces { + trace := ptrace.NewTraces() + for _, spanProto := range spanPrototypes { + traceID := pcommon.TraceID([16]byte{0, 0, 0, 0, 0, 0, 0, 1}) + span := ptrace.NewSpan() + span.SetTraceID(traceID) + span.SetSpanID(spanProto.id) + span.SetParentSpanID(spanProto.parent) + span.SetStartTimestamp(pcommon.NewTimestampFromTime(toTime(spanProto.startTime))) + span.SetEndTimestamp(pcommon.NewTimestampFromTime(toTime(spanProto.startTime + spanProto.duration))) + + events := ptrace.NewSpanEventSlice() + for _, log := range spanProto.events { + event := events.AppendEmpty() + event.SetTimestamp(pcommon.NewTimestampFromTime(toTime(log))) + event.Attributes().PutStr("event", "some event") + } + events.CopyTo(span.Events()) + + resource := ptrace.NewResourceSpans() + resource.Resource().Attributes().PutEmptySlice("host.ip").AppendEmpty().SetStr(spanProto.host) + + span.CopyTo(resource.ScopeSpans().AppendEmpty().Spans().AppendEmpty()) + resource.CopyTo(trace.ResourceSpans().AppendEmpty()) + } + return trace + } + + testCases := []struct { + description string + trace []testSpan + err string + maxAdjust time.Duration + }{ + { + description: "single span with bad parent", + trace: []testSpan{ + {id: [8]byte{1}, parent: [8]byte{0, 0, 0, 0, 0, 0, 0, 99}, startTime: 0, duration: 100, host: "a", adjusted: 0}, + }, + err: "parent span ID=0000000000000063 is not in the trace; skipping clock skew adjustment", // 99 == 0x63 + }, + { + description: "single span with empty host key", + trace: []testSpan{ + {id: [8]byte{1}, parent: [8]byte{0}, startTime: 0, duration: 100, adjusted: 0}, + }, + }, + { + description: "two spans with the same ID", + trace: []testSpan{ + {id: [8]byte{1}, parent: [8]byte{0}, startTime: 0, duration: 100, host: "a", adjusted: 0}, + {id: [8]byte{1}, parent: [8]byte{0}, startTime: 0, duration: 100, host: "a", adjusted: 0}, + }, + err: "duplicate span IDs; skipping clock skew adjustment", + }, + { + description: "parent-child on the same host", + trace: []testSpan{ + {id: [8]byte{1}, parent: [8]byte{0}, startTime: 0, duration: 100, host: "a", adjusted: 0}, + {id: [8]byte{2}, parent: [8]byte{1}, startTime: 10, duration: 50, host: "a", adjusted: 10}, + }, + }, + { + description: "do not adjust parent-child on the same host", + trace: []testSpan{ + {id: [8]byte{1}, parent: [8]byte{0}, startTime: 10, duration: 100, host: "a", adjusted: 10}, + {id: [8]byte{2}, parent: [8]byte{1}, startTime: 0, duration: 50, host: "a", adjusted: 0}, + }, + }, + { + description: "do not adjust child that fits inside parent", + trace: []testSpan{ + {id: [8]byte{1}, parent: [8]byte{0}, startTime: 10, duration: 100, host: "a", adjusted: 10}, + {id: [8]byte{2}, parent: [8]byte{1}, startTime: 20, duration: 50, host: "b", adjusted: 20}, + }, + }, + { + description: "do not adjust child that is longer than parent", + trace: []testSpan{ + {id: [8]byte{1}, parent: [8]byte{0}, startTime: 10, duration: 100, host: "a", adjusted: 10}, + {id: [8]byte{2}, parent: [8]byte{1}, startTime: 20, duration: 150, host: "b", adjusted: 20}, + }, + }, + { + description: "do not apply positive adjustment due to max skew adjustment", + trace: []testSpan{ + {id: [8]byte{1}, parent: [8]byte{0}, startTime: 10, duration: 100, host: "a", adjusted: 10}, + {id: [8]byte{2}, parent: [8]byte{1}, startTime: 0, duration: 50, host: "b", adjusted: 0}, + }, + maxAdjust: 10 * time.Millisecond, + err: "max clock skew adjustment delta of 10ms exceeded; not applying calculated delta of 35ms", + }, + { + description: "do not apply negative adjustment due to max skew adjustment", + trace: []testSpan{ + {id: [8]byte{1}, parent: [8]byte{0}, startTime: 10, duration: 100, host: "a", adjusted: 10}, + {id: [8]byte{2}, parent: [8]byte{1}, startTime: 80, duration: 50, host: "b", adjusted: 80}, + }, + maxAdjust: 10 * time.Millisecond, + err: "max clock skew adjustment delta of 10ms exceeded; not applying calculated delta of -45ms", + }, + { + description: "do not apply adjustment due to disabled adjustment", + trace: []testSpan{ + {id: [8]byte{1}, parent: [8]byte{0}, startTime: 10, duration: 100, host: "a", adjusted: 10}, + {id: [8]byte{2}, parent: [8]byte{1}, startTime: 0, duration: 50, host: "b", adjusted: 0}, + }, + err: "clock skew adjustment disabled; not applying calculated delta of 35ms", + }, + { + description: "adjust child starting before parent", + trace: []testSpan{ + {id: [8]byte{1}, parent: [8]byte{0}, startTime: 10, duration: 100, host: "a", adjusted: 10}, + // latency = (100-50) / 2 = 25 + // delta = (10 - 0) + latency = 35 + { + id: [8]byte{2}, parent: [8]byte{1}, startTime: 0, duration: 50, host: "b", adjusted: 35, + events: []int{5, 10}, adjustedEvents: []int{40, 45}, + }, + }, + maxAdjust: time.Second, + }, + { + description: "adjust child starting before parent even if it is longer", + trace: []testSpan{ + {id: [8]byte{1}, parent: [8]byte{0}, startTime: 10, duration: 100, host: "a", adjusted: 10}, + {id: [8]byte{2}, parent: [8]byte{1}, startTime: 0, duration: 150, host: "b", adjusted: 10}, + }, + maxAdjust: time.Second, + }, + { + description: "adjust child ending after parent but being shorter", + trace: []testSpan{ + {id: [8]byte{1}, parent: [8]byte{0}, startTime: 10, duration: 100, host: "a", adjusted: 10}, + // latency: (100 - 70) / 2 = 15 + // new child start time: 10 + latency = 25, delta = -25 + {id: [8]byte{2}, parent: [8]byte{1}, startTime: 50, duration: 70, host: "b", adjusted: 25}, + // same host 'b', so same delta = -25 + // new start time: 60 + delta = 35 + { + id: [8]byte{3}, parent: [8]byte{2}, startTime: 60, duration: 20, host: "b", adjusted: 35, + events: []int{65, 70}, adjustedEvents: []int{40, 45}, + }, + }, + maxAdjust: time.Second, + }, + } + + for _, tt := range testCases { + testCase := tt // capture loop var + t.Run(testCase.description, func(t *testing.T) { + trace := makeTrace(testCase.trace) + adjuster := CorrectClockSkew(tt.maxAdjust) + adjuster.Adjust(trace) + + var gotErr string + for i, proto := range testCase.trace { + id := proto.id + span := trace.ResourceSpans().At(i).ScopeSpans().At(0).Spans().At(0) + require.EqualValues(t, proto.id, span.SpanID(), "expecting span with span ID = %d", id) + + warnings := jptrace.GetWarnings(span) + if testCase.err == "" { + if proto.adjusted == proto.startTime { + assert.Empty(t, warnings, "no warnings in span %s", span.SpanID) + } else { + assert.Len(t, warnings, 1, "warning about adjutment added to span %s", span.SpanID) + } + } else { + if len(warnings) > 0 { + gotErr = warnings[0] + } + } + + // compare values as int because assert.Equal prints uint64 as hex + assert.Equal( + t, toTime(proto.adjusted).UTC(), span.StartTimestamp().AsTime(), + "adjusted start time of span[ID = %d]", id) + for i, logTs := range proto.adjustedEvents { + assert.Equal( + t, toTime(logTs).UTC(), span.Events().At(i).Timestamp().AsTime(), + "adjusted log timestamp of span[ID = %d], log[%d]", id, i) + } + } + assert.Equal(t, testCase.err, gotErr) + }) + } +} + +func TestHostKey(t *testing.T) { + tests := []struct { + name string + resource ptrace.ResourceSpans + expected string + }{ + { + name: "host.id attribute", + resource: func() ptrace.ResourceSpans { + rs := ptrace.NewResourceSpans() + rs.Resource().Attributes().PutStr("host.id", "host-123") + return rs + }(), + expected: "host-123", + }, + { + name: "string host.ip attribute", + resource: func() ptrace.ResourceSpans { + rs := ptrace.NewResourceSpans() + rs.Resource().Attributes().PutStr("host.ip", "192.168.1.1") + return rs + }(), + expected: "192.168.1.1", + }, + { + name: "slice host.ip attribute", + resource: func() ptrace.ResourceSpans { + rs := ptrace.NewResourceSpans() + addresses := rs.Resource().Attributes().PutEmptySlice("host.ip") + addresses.AppendEmpty().SetStr("192.168.1.1") + addresses.AppendEmpty().SetStr("192.168.1.2") + return rs + }(), + expected: "192.168.1.1", + }, + { + name: "empty host.ip attribute slice", + resource: func() ptrace.ResourceSpans { + rs := ptrace.NewResourceSpans() + rs.Resource().Attributes().PutEmptySlice("host.ip") + return rs + }(), + expected: "", + }, + { + name: "host.name attribute", + resource: func() ptrace.ResourceSpans { + rs := ptrace.NewResourceSpans() + rs.Resource().Attributes().PutStr("host.name", "hostname") + return rs + }(), + expected: "hostname", + }, + { + name: "no relevant attributes", + resource: func() ptrace.ResourceSpans { + rs := ptrace.NewResourceSpans() + rs.Resource().Attributes().PutStr("service.name", "service-123") + return rs + }(), + expected: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := hostKey(tt.resource) + assert.Equal(t, tt.expected, result) + }) + } +} diff --git a/cmd/query/app/querysvc/adjuster/hash.go b/cmd/query/app/querysvc/adjuster/hash.go index 25afbbc1621..458258c7c3c 100644 --- a/cmd/query/app/querysvc/adjuster/hash.go +++ b/cmd/query/app/querysvc/adjuster/hash.go @@ -14,7 +14,7 @@ import ( var _ Adjuster = (*SpanHashDeduper)(nil) -// SpanHash creates an adjuster that deduplicates spans by removing all but one span +// DeduplicateSpans creates an adjuster that deduplicates spans by removing all but one span // with the same hash code. This is particularly useful for scenarios where spans // may be duplicated during archival, such as with ElasticSearch archival. // @@ -23,8 +23,8 @@ var _ Adjuster = (*SpanHashDeduper)(nil) // // To ensure consistent hash codes, this adjuster should be executed after // SortAttributesAndEvents, which normalizes the order of collections within the span. -func SpanHash() SpanHashDeduper { - return SpanHashDeduper{ +func DeduplicateSpans() *SpanHashDeduper { + return &SpanHashDeduper{ marshaler: &ptrace.ProtoMarshaler{}, } } @@ -56,7 +56,7 @@ func (s *SpanHashDeduper) Adjust(traces ptrace.Traces) { hashTrace, ) if err != nil { - jptrace.AddWarning(span, fmt.Sprintf("failed to compute hash code: %v", err)) + jptrace.AddWarnings(span, fmt.Sprintf("failed to compute hash code: %v", err)) span.CopyTo(dedupedSpans.AppendEmpty()) continue } diff --git a/cmd/query/app/querysvc/adjuster/hash_test.go b/cmd/query/app/querysvc/adjuster/hash_test.go index dd90d7ab1a3..d5f97795418 100644 --- a/cmd/query/app/querysvc/adjuster/hash_test.go +++ b/cmd/query/app/querysvc/adjuster/hash_test.go @@ -13,7 +13,7 @@ import ( ) func TestSpanHash_EmptySpans(t *testing.T) { - adjuster := SpanHash() + adjuster := DeduplicateSpans() input := ptrace.NewTraces() expected := ptrace.NewTraces() adjuster.Adjust(input) @@ -21,7 +21,7 @@ func TestSpanHash_EmptySpans(t *testing.T) { } func TestSpanHash_RemovesDuplicateSpans(t *testing.T) { - adjuster := SpanHash() + adjuster := DeduplicateSpans() input := func() ptrace.Traces { traces := ptrace.NewTraces() @@ -126,7 +126,7 @@ func TestSpanHash_RemovesDuplicateSpans(t *testing.T) { } func TestSpanHash_NoDuplicateSpans(t *testing.T) { - adjuster := SpanHash() + adjuster := DeduplicateSpans() input := func() ptrace.Traces { traces := ptrace.NewTraces() rs := traces.ResourceSpans().AppendEmpty() @@ -180,7 +180,7 @@ func TestSpanHash_NoDuplicateSpans(t *testing.T) { } func TestSpanHash_DuplicateSpansDifferentScopeAttributes(t *testing.T) { - adjuster := SpanHash() + adjuster := DeduplicateSpans() input := func() ptrace.Traces { traces := ptrace.NewTraces() rs := traces.ResourceSpans().AppendEmpty() @@ -234,7 +234,7 @@ func TestSpanHash_DuplicateSpansDifferentScopeAttributes(t *testing.T) { } func TestSpanHash_DuplicateSpansDifferentResourceAttributes(t *testing.T) { - adjuster := SpanHash() + adjuster := DeduplicateSpans() input := func() ptrace.Traces { traces := ptrace.NewTraces() rs := traces.ResourceSpans().AppendEmpty() diff --git a/cmd/query/app/querysvc/adjuster/ipattribute.go b/cmd/query/app/querysvc/adjuster/ipattribute.go index a00f93b3254..89c361a0cc3 100644 --- a/cmd/query/app/querysvc/adjuster/ipattribute.go +++ b/cmd/query/app/querysvc/adjuster/ipattribute.go @@ -19,10 +19,10 @@ var ipAttributesToCorrect = map[string]struct{}{ "peer.ipv4": {}, } -// IPAttribute returns an adjuster that replaces numeric "ip" attributes, +// NormalizeIPAttributes returns an adjuster that replaces numeric "ip" attributes, // which usually contain IPv4 packed into uint32, with their string // representation (e.g. "8.8.8.8""). -func IPAttribute() IPAttributeAdjuster { +func NormalizeIPAttributes() IPAttributeAdjuster { return IPAttributeAdjuster{} } diff --git a/cmd/query/app/querysvc/adjuster/ipattribute_test.go b/cmd/query/app/querysvc/adjuster/ipattribute_test.go index d97f1bf35ed..4e5d0917230 100644 --- a/cmd/query/app/querysvc/adjuster/ipattribute_test.go +++ b/cmd/query/app/querysvc/adjuster/ipattribute_test.go @@ -58,7 +58,7 @@ func TestIPAttributeAdjuster(t *testing.T) { } } - IPAttribute().Adjust(traces) + NormalizeIPAttributes().Adjust(traces) resourceSpan := traces.ResourceSpans().At(0) assert.Equal(t, 3, resourceSpan.Resource().Attributes().Len()) diff --git a/cmd/query/app/querysvc/adjuster/resourceattributes.go b/cmd/query/app/querysvc/adjuster/libraryattributes.go similarity index 88% rename from cmd/query/app/querysvc/adjuster/resourceattributes.go rename to cmd/query/app/querysvc/adjuster/libraryattributes.go index 6bedd4ff4e9..418a1206c29 100644 --- a/cmd/query/app/querysvc/adjuster/resourceattributes.go +++ b/cmd/query/app/querysvc/adjuster/libraryattributes.go @@ -21,11 +21,11 @@ var libraryKeys = map[string]struct{}{ string(otelsemconv.TelemetryDistroVersionKey): {}, } -// ResourceAttributes creates an adjuster that moves the OpenTelemetry library +// MoveLibraryAttributes creates an adjuster that moves the OpenTelemetry library // attributes from spans to the parent resource so that the UI can // display them separately under Process. // https://github.com/jaegertracing/jaeger/issues/4534 -func ResourceAttributes() ResourceAttributesAdjuster { +func MoveLibraryAttributes() ResourceAttributesAdjuster { return ResourceAttributesAdjuster{} } @@ -59,7 +59,7 @@ func (ResourceAttributesAdjuster) moveAttributes(span ptrace.Span, resource pcom for k, v := range replace { existing, ok := resource.Attributes().Get(k) if ok && existing.AsRaw() != v.AsRaw() { - jptrace.AddWarning(span, "conflicting values between Span and Resource for attribute "+k) + jptrace.AddWarnings(span, "conflicting values between Span and Resource for attribute "+k) continue } v.CopyTo(resource.Attributes().PutEmpty(k)) diff --git a/cmd/query/app/querysvc/adjuster/resourceattributes_test.go b/cmd/query/app/querysvc/adjuster/libraryattributes_test.go similarity index 97% rename from cmd/query/app/querysvc/adjuster/resourceattributes_test.go rename to cmd/query/app/querysvc/adjuster/libraryattributes_test.go index 591820db4a2..ebcf8801080 100644 --- a/cmd/query/app/querysvc/adjuster/resourceattributes_test.go +++ b/cmd/query/app/querysvc/adjuster/libraryattributes_test.go @@ -25,7 +25,7 @@ func TestResourceAttributesAdjuster_SpanWithLibraryAttributes(t *testing.T) { span.Attributes().PutStr(string(otelsemconv.TelemetryDistroVersionKey), "blah") span.Attributes().PutStr("another_key", "another_value") - adjuster := ResourceAttributes() + adjuster := MoveLibraryAttributes() adjuster.Adjust(traces) resultSpanAttributes := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes() @@ -67,7 +67,7 @@ func TestResourceAttributesAdjuster_SpanWithoutLibraryAttributes(t *testing.T) { span := rs.ScopeSpans().AppendEmpty().Spans().AppendEmpty() span.Attributes().PutStr("random_key", "random_value") - adjuster := ResourceAttributes() + adjuster := MoveLibraryAttributes() adjuster.Adjust(traces) resultSpanAttributes := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes() @@ -85,7 +85,7 @@ func TestResourceAttributesAdjuster_SpanWithConflictingLibraryAttributes(t *test span.Attributes().PutStr("random_key", "random_value") span.Attributes().PutStr(string(otelsemconv.TelemetrySDKLanguageKey), "Java") - adjuster := ResourceAttributes() + adjuster := MoveLibraryAttributes() adjuster.Adjust(traces) resultSpan := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0) @@ -119,7 +119,7 @@ func TestResourceAttributesAdjuster_SpanWithNonConflictingLibraryAttributes(t *t span.Attributes().PutStr("random_key", "random_value") span.Attributes().PutStr(string(otelsemconv.TelemetrySDKLanguageKey), "Go") - adjuster := ResourceAttributes() + adjuster := MoveLibraryAttributes() adjuster.Adjust(traces) resultSpanAttributes := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes() diff --git a/cmd/query/app/querysvc/adjuster/sort.go b/cmd/query/app/querysvc/adjuster/sort.go index 0ed38af3286..96f50de4a49 100644 --- a/cmd/query/app/querysvc/adjuster/sort.go +++ b/cmd/query/app/querysvc/adjuster/sort.go @@ -12,14 +12,14 @@ import ( var _ Adjuster = (*SortAttributesAndEventsAdjuster)(nil) -// SortAttributesAndEvents creates an adjuster that standardizes trace data by sorting elements: +// SortCollections creates an adjuster that standardizes trace data by sorting elements: // - Resource attributes are sorted lexicographically by their keys. // - Scope attributes are sorted lexicographically by their keys. // - Span attributes are sorted lexicographically by their keys. // - Span events are sorted lexicographically by their names. // - Attributes within each span event are sorted lexicographically by their keys. // - Attributes within each span link are sorted lexicographically by their keys. -func SortAttributesAndEvents() SortAttributesAndEventsAdjuster { +func SortCollections() SortAttributesAndEventsAdjuster { return SortAttributesAndEventsAdjuster{} } diff --git a/cmd/query/app/querysvc/adjuster/sort_test.go b/cmd/query/app/querysvc/adjuster/sort_test.go index d9bef8baa37..ba9d849cff8 100644 --- a/cmd/query/app/querysvc/adjuster/sort_test.go +++ b/cmd/query/app/querysvc/adjuster/sort_test.go @@ -11,7 +11,7 @@ import ( ) func TestSortAttributesAndEventsAdjuster(t *testing.T) { - adjuster := SortAttributesAndEvents() + adjuster := SortCollections() input := func() ptrace.Traces { traces := ptrace.NewTraces() rs := traces.ResourceSpans().AppendEmpty() diff --git a/cmd/query/app/querysvc/adjuster/spaniduniquifier.go b/cmd/query/app/querysvc/adjuster/spaniduniquifier.go index b21ba7eeb6d..687fea06162 100644 --- a/cmd/query/app/querysvc/adjuster/spaniduniquifier.go +++ b/cmd/query/app/querysvc/adjuster/spaniduniquifier.go @@ -15,7 +15,7 @@ import ( var errTooManySpans = errors.New("cannot assign unique span ID, too many spans in the trace") -// SpanIDUniquifier returns an adjuster that changes span ids for server +// DeduplicateClientServerSpanIDs returns an adjuster that changes span ids for server // spans (i.e. spans with tag: span.kind == server) if there is another // client span that shares the same span ID. This is needed to deal with // Zipkin-style clients that reuse the same span ID for both client and server @@ -23,7 +23,7 @@ var errTooManySpans = errors.New("cannot assign unique span ID, too many spans i // // Any issues encountered during adjustment are recorded as warnings in the // span. -func SpanIDUniquifier() Adjuster { +func DeduplicateClientServerSpanIDs() Adjuster { return Func(func(traces ptrace.Traces) { adjuster := spanIDDeduper{ spansByID: make(map[pcommon.SpanID][]ptrace.Span), @@ -89,7 +89,7 @@ func (d *spanIDDeduper) uniquifyServerSpanIDs(traces ptrace.Traces) { if span.Kind() == ptrace.SpanKindServer && d.isSharedWithClientSpan(span.SpanID()) { newID, err := d.makeUniqueSpanID() if err != nil { - jptrace.AddWarning(span, err.Error()) + jptrace.AddWarnings(span, err.Error()) continue } oldToNewSpanIDs[span.SpanID()] = newID diff --git a/cmd/query/app/querysvc/adjuster/spaniduniquifier_test.go b/cmd/query/app/querysvc/adjuster/spaniduniquifier_test.go index 5ad6868f443..282e0dcd50e 100644 --- a/cmd/query/app/querysvc/adjuster/spaniduniquifier_test.go +++ b/cmd/query/app/querysvc/adjuster/spaniduniquifier_test.go @@ -46,7 +46,7 @@ func makeTraces() ptrace.Traces { func TestSpanIDUniquifierTriggered(t *testing.T) { traces := makeTraces() - deduper := SpanIDUniquifier() + deduper := DeduplicateClientServerSpanIDs() deduper.Adjust(traces) spans := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans() @@ -73,7 +73,7 @@ func TestSpanIDUniquifierNotTriggered(t *testing.T) { spans.At(2).CopyTo(newSpans.AppendEmpty()) newSpans.CopyTo(spans) - deduper := SpanIDUniquifier() + deduper := DeduplicateClientServerSpanIDs() deduper.Adjust(traces) gotSpans := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans() diff --git a/cmd/query/app/querysvc/adjuster/spanlinks.go b/cmd/query/app/querysvc/adjuster/spanlinks.go index 60730fa5ecf..3fe3cb7ec3a 100644 --- a/cmd/query/app/querysvc/adjuster/spanlinks.go +++ b/cmd/query/app/querysvc/adjuster/spanlinks.go @@ -15,8 +15,8 @@ const ( var _ Adjuster = (*LinksAdjuster)(nil) -// SpanLinks creates an adjuster that removes span links with empty trace IDs. -func SpanLinks() LinksAdjuster { +// RemoveEmptySpanLinks creates an adjuster that removes span links with empty trace IDs. +func RemoveEmptySpanLinks() LinksAdjuster { return LinksAdjuster{} } @@ -48,7 +48,7 @@ func (la LinksAdjuster) adjust(span ptrace.Span) { newLink := validLinks.AppendEmpty() link.CopyTo(newLink) } else { - jptrace.AddWarning(span, invalidSpanLinkWarning) + jptrace.AddWarnings(span, invalidSpanLinkWarning) } } validLinks.CopyTo(span.Links()) diff --git a/cmd/query/app/querysvc/adjuster/spanlinks_test.go b/cmd/query/app/querysvc/adjuster/spanlinks_test.go index 33a8a5a78a6..ed2f69439cc 100644 --- a/cmd/query/app/querysvc/adjuster/spanlinks_test.go +++ b/cmd/query/app/querysvc/adjuster/spanlinks_test.go @@ -31,7 +31,7 @@ func TestLinksAdjuster(t *testing.T) { spanC.Links().AppendEmpty().SetTraceID(pcommon.TraceID([]byte{0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0})) spanC.Links().AppendEmpty().SetTraceID(pcommon.TraceID([]byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0})) - SpanLinks().Adjust(traces) + RemoveEmptySpanLinks().Adjust(traces) spans := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans() gotSpansA := spans.At(0) diff --git a/cmd/query/app/querysvc/adjuster/standard.go b/cmd/query/app/querysvc/adjuster/standard.go new file mode 100644 index 00000000000..cfbb8915192 --- /dev/null +++ b/cmd/query/app/querysvc/adjuster/standard.go @@ -0,0 +1,23 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package adjuster + +import ( + "time" +) + +// StandardAdjusters returns a list of adjusters applied by the query service +// before returning the data to the API clients. +func StandardAdjusters(maxClockSkewAdjust time.Duration) []Adjuster { + return []Adjuster{ + DeduplicateClientServerSpanIDs(), + SortCollections(), + // DeduplicateSpans depends on SortCollections running first + DeduplicateSpans(), + CorrectClockSkew(maxClockSkewAdjust), + NormalizeIPAttributes(), + MoveLibraryAttributes(), + RemoveEmptySpanLinks(), + } +} diff --git a/cmd/query/app/querysvc/adjuster/standard_test.go b/cmd/query/app/querysvc/adjuster/standard_test.go new file mode 100644 index 00000000000..d2fadfe2ec0 --- /dev/null +++ b/cmd/query/app/querysvc/adjuster/standard_test.go @@ -0,0 +1,25 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package adjuster + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestStandardAdjusters(t *testing.T) { + maxClockSkewAdjust := 10 * time.Second + adjusters := StandardAdjusters(maxClockSkewAdjust) + + assert.Len(t, adjusters, 7, "Expected 7 adjusters") + assert.IsType(t, DeduplicateClientServerSpanIDs(), adjusters[0]) + assert.IsType(t, SortCollections(), adjusters[1]) + assert.IsType(t, DeduplicateSpans(), adjusters[2]) + assert.IsType(t, CorrectClockSkew(maxClockSkewAdjust), adjusters[3]) + assert.IsType(t, NormalizeIPAttributes(), adjusters[4]) + assert.IsType(t, MoveLibraryAttributes(), adjusters[5]) + assert.IsType(t, RemoveEmptySpanLinks(), adjusters[6]) +} diff --git a/go.mod b/go.mod index 762d0f365f4..a7eafab345e 100644 --- a/go.mod +++ b/go.mod @@ -98,7 +98,7 @@ require ( golang.org/x/net v0.33.0 golang.org/x/sys v0.28.0 google.golang.org/grpc v1.69.2 - google.golang.org/protobuf v1.36.0 + google.golang.org/protobuf v1.36.1 gopkg.in/yaml.v3 v3.0.1 ) diff --git a/go.sum b/go.sum index 5d8e67a5e7a..d694e4003f1 100644 --- a/go.sum +++ b/go.sum @@ -964,8 +964,8 @@ google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= -google.golang.org/protobuf v1.36.0 h1:mjIs9gYtt56AzC4ZaffQuh88TZurBGhIJMBZGSxNerQ= -google.golang.org/protobuf v1.36.0/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk= +google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/internal/jptrace/translator.go b/internal/jptrace/translator.go new file mode 100644 index 00000000000..82524696665 --- /dev/null +++ b/internal/jptrace/translator.go @@ -0,0 +1,101 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package jptrace + +import ( + jaegerTranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/jaegertracing/jaeger/model" +) + +// ProtoFromTraces converts OpenTelemetry traces (ptrace.Traces) +// to Jaeger model batches ([]*model.Batch). +func ProtoFromTraces(traces ptrace.Traces) []*model.Batch { + batches := jaegerTranslator.ProtoFromTraces(traces) + spanMap := createSpanMapFromBatches(batches) + transferWarningsToModelSpans(traces, spanMap) + return batches +} + +// ProtoToTraces converts Jaeger model batches ([]*model.Batch) +// to OpenTelemetry traces (ptrace.Traces). +func ProtoToTraces(batches []*model.Batch) ptrace.Traces { + traces, _ := jaegerTranslator.ProtoToTraces(batches) // never returns an error + spanMap := createSpanMapFromTraces(traces) + transferWarningsToOTLPSpans(batches, spanMap) + return traces +} + +func createSpanMapFromBatches(batches []*model.Batch) map[model.SpanID]*model.Span { + spanMap := make(map[model.SpanID]*model.Span) + for _, batch := range batches { + for _, span := range batch.Spans { + spanMap[span.SpanID] = span + } + } + return spanMap +} + +func createSpanMapFromTraces(traces ptrace.Traces) map[pcommon.SpanID]ptrace.Span { + spanMap := make(map[pcommon.SpanID]ptrace.Span) + resources := traces.ResourceSpans() + for i := 0; i < resources.Len(); i++ { + scopes := resources.At(i).ScopeSpans() + for j := 0; j < scopes.Len(); j++ { + spans := scopes.At(j).Spans() + for k := 0; k < spans.Len(); k++ { + span := spans.At(k) + spanMap[span.SpanID()] = span + } + } + } + return spanMap +} + +func transferWarningsToModelSpans(traces ptrace.Traces, spanMap map[model.SpanID]*model.Span) { + resources := traces.ResourceSpans() + for i := 0; i < resources.Len(); i++ { + scopes := resources.At(i).ScopeSpans() + for j := 0; j < scopes.Len(); j++ { + spans := scopes.At(j).Spans() + for k := 0; k < spans.Len(); k++ { + otelSpan := spans.At(k) + warnings := GetWarnings(otelSpan) + if len(warnings) == 0 { + continue + } + if span, ok := spanMap[model.SpanIDFromOTEL(otelSpan.SpanID())]; ok { + span.Warnings = append(span.Warnings, warnings...) + // filter out the warning tag + span.Tags = filterTags(span.Tags, warningsAttribute) + } + } + } + } +} + +func transferWarningsToOTLPSpans(batches []*model.Batch, spanMap map[pcommon.SpanID]ptrace.Span) { + for _, batch := range batches { + for _, span := range batch.Spans { + if len(span.Warnings) == 0 { + continue + } + if otelSpan, ok := spanMap[span.SpanID.ToOTELSpanID()]; ok { + AddWarnings(otelSpan, span.Warnings...) + } + } + } +} + +func filterTags(tags []model.KeyValue, keyToRemove string) []model.KeyValue { + var filteredTags []model.KeyValue + for _, tag := range tags { + if tag.Key != keyToRemove { + filteredTags = append(filteredTags, tag) + } + } + return filteredTags +} diff --git a/internal/jptrace/translator_test.go b/internal/jptrace/translator_test.go new file mode 100644 index 00000000000..9e021a35abb --- /dev/null +++ b/internal/jptrace/translator_test.go @@ -0,0 +1,115 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package jptrace + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/jaegertracing/jaeger/model" +) + +func TestProtoFromTraces_AddsWarnings(t *testing.T) { + traces := ptrace.NewTraces() + rs1 := traces.ResourceSpans().AppendEmpty() + ss1 := rs1.ScopeSpans().AppendEmpty() + span1 := ss1.Spans().AppendEmpty() + span1.SetName("test-span-1") + span1.SetSpanID(pcommon.SpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8})) + AddWarnings(span1, "test-warning-1") + AddWarnings(span1, "test-warning-2") + span1.Attributes().PutStr("key", "value") + + ss2 := rs1.ScopeSpans().AppendEmpty() + span2 := ss2.Spans().AppendEmpty() + span2.SetName("test-span-2") + span2.SetSpanID(pcommon.SpanID([8]byte{9, 10, 11, 12, 13, 14, 15, 16})) + + rs2 := traces.ResourceSpans().AppendEmpty() + ss3 := rs2.ScopeSpans().AppendEmpty() + span3 := ss3.Spans().AppendEmpty() + span3.SetName("test-span-3") + span3.SetSpanID(pcommon.SpanID([8]byte{17, 18, 19, 20, 21, 22, 23, 24})) + AddWarnings(span3, "test-warning-3") + + batches := ProtoFromTraces(traces) + + assert.Len(t, batches, 2) + + assert.Len(t, batches[0].Spans, 2) + assert.Equal(t, "test-span-1", batches[0].Spans[0].OperationName) + assert.Equal(t, []string{"test-warning-1", "test-warning-2"}, batches[0].Spans[0].Warnings) + assert.Equal(t, []model.KeyValue{{Key: "key", VStr: "value"}}, batches[0].Spans[0].Tags) + assert.Equal(t, "test-span-2", batches[0].Spans[1].OperationName) + assert.Empty(t, batches[0].Spans[1].Warnings) + assert.Empty(t, batches[0].Spans[1].Tags) + + assert.Len(t, batches[1].Spans, 1) + assert.Equal(t, "test-span-3", batches[1].Spans[0].OperationName) + assert.Equal(t, []string{"test-warning-3"}, batches[1].Spans[0].Warnings) + assert.Empty(t, batches[1].Spans[0].Tags) +} + +func TestProtoToTraces_AddsWarnings(t *testing.T) { + batch1 := &model.Batch{ + Process: &model.Process{ + ServiceName: "batch-1", + }, + Spans: []*model.Span{ + { + OperationName: "test-span-1", + SpanID: model.NewSpanID(1), + Warnings: []string{"test-warning-1", "test-warning-2"}, + }, + { + OperationName: "test-span-2", + SpanID: model.NewSpanID(2), + }, + }, + } + batch2 := &model.Batch{ + Process: &model.Process{ + ServiceName: "batch-2", + }, + Spans: []*model.Span{ + { + OperationName: "test-span-3", + SpanID: model.NewSpanID(3), + Warnings: []string{"test-warning-3"}, + }, + }, + } + batches := []*model.Batch{batch1, batch2} + traces := ProtoToTraces(batches) + + assert.Equal(t, 2, traces.ResourceSpans().Len()) + + rs1 := traces.ResourceSpans().At(0) + assert.Equal(t, 1, rs1.ScopeSpans().Len()) + ss1 := rs1.ScopeSpans().At(0) + assert.Equal(t, 2, ss1.Spans().Len()) + + span1 := ss1.Spans().At(0) + assert.Equal(t, "test-span-1", span1.Name()) + assert.Equal(t, pcommon.SpanID([8]byte{0, 0, 0, 0, 0, 0, 0, 1}), span1.SpanID()) + assert.Equal(t, []string{"test-warning-1", "test-warning-2"}, GetWarnings(span1)) + + span2 := ss1.Spans().At(1) + assert.Equal(t, "test-span-2", span2.Name()) + assert.Equal(t, pcommon.SpanID([8]byte{0, 0, 0, 0, 0, 0, 0, 2}), span2.SpanID()) + assert.Empty(t, GetWarnings(span2)) + + rs2 := traces.ResourceSpans().At(1) + assert.Equal(t, 1, rs2.ScopeSpans().Len()) + ss3 := rs2.ScopeSpans().At(0) + assert.Equal(t, 1, ss3.Spans().Len()) + + span3 := ss3.Spans().At(0) + assert.Equal(t, "test-span-3", span3.Name()) + assert.Equal(t, pcommon.SpanID([8]byte{0, 0, 0, 0, 0, 0, 0, 3}), span3.SpanID()) + assert.Equal(t, []string{"test-warning-3"}, GetWarnings(span3)) +} diff --git a/internal/jptrace/warning.go b/internal/jptrace/warning.go index 0d96296c214..5247930fc25 100644 --- a/internal/jptrace/warning.go +++ b/internal/jptrace/warning.go @@ -16,14 +16,16 @@ const ( warningsAttribute = "jaeger.internal.warnings" ) -func AddWarning(span ptrace.Span, warning string) { - var warnings pcommon.Slice +func AddWarnings(span ptrace.Span, warnings ...string) { + var w pcommon.Slice if currWarnings, ok := span.Attributes().Get(warningsAttribute); ok { - warnings = currWarnings.Slice() + w = currWarnings.Slice() } else { - warnings = span.Attributes().PutEmptySlice(warningsAttribute) + w = span.Attributes().PutEmptySlice(warningsAttribute) + } + for _, warning := range warnings { + w.AppendEmpty().SetStr(warning) } - warnings.AppendEmpty().SetStr(warning) } func GetWarnings(span ptrace.Span) []string { diff --git a/internal/jptrace/warning_test.go b/internal/jptrace/warning_test.go index b8e3c38032c..9bc62437dd4 100644 --- a/internal/jptrace/warning_test.go +++ b/internal/jptrace/warning_test.go @@ -7,6 +7,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/ptrace" ) @@ -46,7 +47,7 @@ func TestAddWarning(t *testing.T) { warnings.AppendEmpty().SetStr(warn) } } - AddWarning(span, test.newWarn) + AddWarnings(span, test.newWarn) warnings, ok := attrs.Get("jaeger.internal.warnings") assert.True(t, ok) assert.Equal(t, len(test.expected), warnings.Slice().Len()) @@ -57,6 +58,15 @@ func TestAddWarning(t *testing.T) { } } +func TestAddWarning_MultipleWarnings(t *testing.T) { + span := ptrace.NewSpan() + AddWarnings(span, "warning-1", "warning-2") + warnings, ok := span.Attributes().Get("jaeger.internal.warnings") + require.True(t, ok) + require.Equal(t, "warning-1", warnings.Slice().At(0).Str()) + require.Equal(t, "warning-2", warnings.Slice().At(1).Str()) +} + func TestGetWarnings(t *testing.T) { tests := []struct { name string diff --git a/model/ids.go b/model/ids.go index 5bf93dc43be..4c7b77c6aa9 100644 --- a/model/ids.go +++ b/model/ids.go @@ -261,3 +261,19 @@ func (s *SpanID) UnmarshalJSON(data []byte) error { func (s *SpanID) UnmarshalJSONPB(_ *jsonpb.Unmarshaler, b []byte) error { return s.UnmarshalJSON(b) } + +// ToOTELSpanID converts the SpanID to OTEL's representation of a span identitfier. +// This was taken from +// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/internal/coreinternal/idutils/big_endian_converter.go. +func (s SpanID) ToOTELSpanID() pcommon.SpanID { + spanID := [8]byte{} + binary.BigEndian.PutUint64(spanID[:], uint64(s)) + return pcommon.SpanID(spanID) +} + +// ToOTELSpanID converts OTEL's SpanID to the model representation of a span identitfier. +// This was taken from +// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/internal/coreinternal/idutils/big_endian_converter.go. +func SpanIDFromOTEL(spanID pcommon.SpanID) SpanID { + return SpanID(binary.BigEndian.Uint64(spanID[:])) +} diff --git a/model/ids_test.go b/model/ids_test.go index 40c6f801a5c..ef2a9a65beb 100644 --- a/model/ids_test.go +++ b/model/ids_test.go @@ -137,3 +137,55 @@ func TestTraceIDFromOTEL(t *testing.T) { } require.Equal(t, expected, model.TraceIDFromOTEL(otelTraceID)) } + +func TestToOTELSpanID(t *testing.T) { + tests := []struct { + name string + spanID model.SpanID + expected pcommon.SpanID + }{ + { + name: "zero span ID", + spanID: model.NewSpanID(0), + expected: pcommon.NewSpanIDEmpty(), + }, + { + name: "non-zero span ID", + spanID: model.NewSpanID(1), + expected: pcommon.SpanID([8]byte{0, 0, 0, 0, 0, 0, 0, 1}), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + actual := test.spanID.ToOTELSpanID() + assert.Equal(t, test.expected, actual) + }) + } +} + +func TestSpanIDFromOTEL(t *testing.T) { + tests := []struct { + name string + otelSpanID pcommon.SpanID + expected model.SpanID + }{ + { + name: "zero span ID", + otelSpanID: pcommon.NewSpanIDEmpty(), + expected: model.NewSpanID(0), + }, + { + name: "non-zero span ID", + otelSpanID: pcommon.SpanID([8]byte{0, 0, 0, 0, 0, 0, 0, 1}), + expected: model.NewSpanID(1), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + actual := model.SpanIDFromOTEL(test.otelSpanID) + assert.Equal(t, test.expected, actual) + }) + } +} diff --git a/pkg/otelsemconv/semconv.go b/pkg/otelsemconv/semconv.go index 9acb3b88e2c..5427dbfcc26 100644 --- a/pkg/otelsemconv/semconv.go +++ b/pkg/otelsemconv/semconv.go @@ -24,6 +24,10 @@ const ( DBSystemKey = semconv.DBSystemKey PeerServiceKey = semconv.PeerServiceKey HTTPResponseStatusCodeKey = semconv.HTTPResponseStatusCodeKey + + HostIDKey = semconv.HostIDKey + HostIPKey = semconv.HostIPKey + HostNameKey = semconv.HostNameKey ) var HTTPResponseStatusCode = semconv.HTTPResponseStatusCode diff --git a/storage_v2/v1adapter/reader.go b/storage_v2/v1adapter/reader.go index a3d2293dd25..04508a1247c 100644 --- a/storage_v2/v1adapter/reader.go +++ b/storage_v2/v1adapter/reader.go @@ -7,10 +7,10 @@ import ( "context" "errors" - model2otel "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" + "github.com/jaegertracing/jaeger/internal/jptrace" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/iter" "github.com/jaegertracing/jaeger/storage/dependencystore" @@ -60,8 +60,8 @@ func (tr *TraceReader) GetTraces( return } batch := &model.Batch{Spans: t.GetSpans()} - tr, err := model2otel.ProtoToTraces([]*model.Batch{batch}) - if !yield([]ptrace.Traces{tr}, err) || err != nil { + tr := jptrace.ProtoToTraces([]*model.Batch{batch}) + if !yield([]ptrace.Traces{tr}, nil) { return } } @@ -105,7 +105,7 @@ func (tr *TraceReader) FindTraces( } for _, trace := range traces { batch := &model.Batch{Spans: trace.GetSpans()} - otelTrace, _ := model2otel.ProtoToTraces([]*model.Batch{batch}) + otelTrace := jptrace.ProtoToTraces([]*model.Batch{batch}) if !yield([]ptrace.Traces{otelTrace}, nil) { return } diff --git a/storage_v2/v1adapter/writer.go b/storage_v2/v1adapter/writer.go index c0a9d4ced99..b16b54ed43e 100644 --- a/storage_v2/v1adapter/writer.go +++ b/storage_v2/v1adapter/writer.go @@ -7,9 +7,9 @@ import ( "context" "errors" - otlp2jaeger "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" "go.opentelemetry.io/collector/pdata/ptrace" + "github.com/jaegertracing/jaeger/internal/jptrace" "github.com/jaegertracing/jaeger/storage/spanstore" "github.com/jaegertracing/jaeger/storage_v2/tracestore" ) @@ -26,7 +26,7 @@ func NewTraceWriter(spanWriter spanstore.Writer) tracestore.Writer { // WriteTraces implements tracestore.Writer. func (t *TraceWriter) WriteTraces(ctx context.Context, td ptrace.Traces) error { - batches := otlp2jaeger.ProtoFromTraces(td) + batches := jptrace.ProtoFromTraces(td) var errs []error for _, batch := range batches { for _, span := range batch.Spans {