diff --git a/.golangci.yml b/.golangci.yml index 2cca15f1d14..bf9ff03b876 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -139,13 +139,12 @@ linters-settings: files: - "**_test.go" - # TODO: enable after import is not used anywhere - # disallow-otel-contrib-translator: - # deny: - # - pkg: github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger - # desc: "Use jptrace package instead of opentelemetry-collector-contrib/pkg/translator/jaeger" - # files: - # - "!**/jptrace/**" + disallow-otel-contrib-translator: + deny: + - pkg: github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger + desc: "Use jptrace package instead of opentelemetry-collector-contrib/pkg/translator/jaeger" + files: + - "!**/jptrace/**" goimports: local-prefixes: github.com/jaegertracing/jaeger diff --git a/cmd/jaeger/internal/integration/span_writer.go b/cmd/jaeger/internal/integration/span_writer.go index da722b1681f..2f718fb5022 100644 --- a/cmd/jaeger/internal/integration/span_writer.go +++ b/cmd/jaeger/internal/integration/span_writer.go @@ -9,7 +9,6 @@ import ( "io" "time" - jaeger2otlp "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/exporter" @@ -17,6 +16,7 @@ import ( "go.opentelemetry.io/collector/exporter/otlpexporter" "go.uber.org/zap" + "github.com/jaegertracing/jaeger/internal/jptrace" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/storage/spanstore" ) @@ -68,15 +68,12 @@ func (w *spanWriter) Close() error { } func (w *spanWriter) WriteSpan(ctx context.Context, span *model.Span) error { - td, err := jaeger2otlp.ProtoToTraces([]*model.Batch{ + td := jptrace.ProtoToTraces([]*model.Batch{ { Spans: []*model.Span{span}, Process: span.Process, }, }) - if err != nil { - return err - } return w.exporter.ConsumeTraces(ctx, td) } diff --git a/cmd/query/app/apiv3/otlp_translator.go b/cmd/query/app/apiv3/otlp_translator.go index 1a39ca79845..e9767720903 100644 --- a/cmd/query/app/apiv3/otlp_translator.go +++ b/cmd/query/app/apiv3/otlp_translator.go @@ -4,15 +4,14 @@ package apiv3 import ( - model2otel "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" "go.opentelemetry.io/collector/pdata/ptrace" + "github.com/jaegertracing/jaeger/internal/jptrace" "github.com/jaegertracing/jaeger/model" ) func modelToOTLP(spans []*model.Span) ptrace.Traces { batch := &model.Batch{Spans: spans} - // there is never an error returned from ProtoToTraces - tr, _ := model2otel.ProtoToTraces([]*model.Batch{batch}) + tr := jptrace.ProtoToTraces([]*model.Batch{batch}) return tr } diff --git a/cmd/query/app/querysvc/adjuster/clockskew.go b/cmd/query/app/querysvc/adjuster/clockskew.go index 70e00fd1238..c3ea335aa2f 100644 --- a/cmd/query/app/querysvc/adjuster/clockskew.go +++ b/cmd/query/app/querysvc/adjuster/clockskew.go @@ -104,7 +104,7 @@ func (a *clockSkewAdjuster) buildNodesMap() { for k := 0; k < spans.Len(); k++ { span := spans.At(k) if _, exists := a.spans[span.SpanID()]; exists { - jptrace.AddWarning(span, warningDuplicateSpanID) + jptrace.AddWarnings(span, warningDuplicateSpanID) } else { a.spans[span.SpanID()] = &node{ span: span, @@ -129,7 +129,7 @@ func (a *clockSkewAdjuster) buildSubGraphs() { p.children = append(p.children, n) } else { warning := fmt.Sprintf(warningMissingParentSpanID, n.span.ParentSpanID()) - jptrace.AddWarning(n.span, warning) + jptrace.AddWarnings(n.span, warning) // treat spans with invalid parent ID as root spans a.roots[n.span.SpanID()] = n } @@ -185,14 +185,14 @@ func (a *clockSkewAdjuster) adjustTimestamps(n *node, skew clockSkew) { } if absDuration(skew.delta) > a.maxDelta { if a.maxDelta == 0 { - jptrace.AddWarning(n.span, fmt.Sprintf(warningSkewAdjustDisabled, skew.delta)) + jptrace.AddWarnings(n.span, fmt.Sprintf(warningSkewAdjustDisabled, skew.delta)) return } - jptrace.AddWarning(n.span, fmt.Sprintf(warningMaxDeltaExceeded, a.maxDelta, skew.delta)) + jptrace.AddWarnings(n.span, fmt.Sprintf(warningMaxDeltaExceeded, a.maxDelta, skew.delta)) return } n.span.SetStartTimestamp(pcommon.NewTimestampFromTime(n.span.StartTimestamp().AsTime().Add(skew.delta))) - jptrace.AddWarning(n.span, fmt.Sprintf("This span's timestamps were adjusted by %v", skew.delta)) + jptrace.AddWarnings(n.span, fmt.Sprintf("This span's timestamps were adjusted by %v", skew.delta)) for i := 0; i < n.span.Events().Len(); i++ { event := n.span.Events().At(i) event.SetTimestamp(pcommon.NewTimestampFromTime(event.Timestamp().AsTime().Add(skew.delta))) diff --git a/cmd/query/app/querysvc/adjuster/hash.go b/cmd/query/app/querysvc/adjuster/hash.go index 55d6347ed9b..458258c7c3c 100644 --- a/cmd/query/app/querysvc/adjuster/hash.go +++ b/cmd/query/app/querysvc/adjuster/hash.go @@ -56,7 +56,7 @@ func (s *SpanHashDeduper) Adjust(traces ptrace.Traces) { hashTrace, ) if err != nil { - jptrace.AddWarning(span, fmt.Sprintf("failed to compute hash code: %v", err)) + jptrace.AddWarnings(span, fmt.Sprintf("failed to compute hash code: %v", err)) span.CopyTo(dedupedSpans.AppendEmpty()) continue } diff --git a/cmd/query/app/querysvc/adjuster/libraryattributes.go b/cmd/query/app/querysvc/adjuster/libraryattributes.go index d2d0bf64dfd..418a1206c29 100644 --- a/cmd/query/app/querysvc/adjuster/libraryattributes.go +++ b/cmd/query/app/querysvc/adjuster/libraryattributes.go @@ -59,7 +59,7 @@ func (ResourceAttributesAdjuster) moveAttributes(span ptrace.Span, resource pcom for k, v := range replace { existing, ok := resource.Attributes().Get(k) if ok && existing.AsRaw() != v.AsRaw() { - jptrace.AddWarning(span, "conflicting values between Span and Resource for attribute "+k) + jptrace.AddWarnings(span, "conflicting values between Span and Resource for attribute "+k) continue } v.CopyTo(resource.Attributes().PutEmpty(k)) diff --git a/cmd/query/app/querysvc/adjuster/spaniduniquifier.go b/cmd/query/app/querysvc/adjuster/spaniduniquifier.go index 81720896e18..687fea06162 100644 --- a/cmd/query/app/querysvc/adjuster/spaniduniquifier.go +++ b/cmd/query/app/querysvc/adjuster/spaniduniquifier.go @@ -89,7 +89,7 @@ func (d *spanIDDeduper) uniquifyServerSpanIDs(traces ptrace.Traces) { if span.Kind() == ptrace.SpanKindServer && d.isSharedWithClientSpan(span.SpanID()) { newID, err := d.makeUniqueSpanID() if err != nil { - jptrace.AddWarning(span, err.Error()) + jptrace.AddWarnings(span, err.Error()) continue } oldToNewSpanIDs[span.SpanID()] = newID diff --git a/cmd/query/app/querysvc/adjuster/spanlinks.go b/cmd/query/app/querysvc/adjuster/spanlinks.go index a075be452b8..3fe3cb7ec3a 100644 --- a/cmd/query/app/querysvc/adjuster/spanlinks.go +++ b/cmd/query/app/querysvc/adjuster/spanlinks.go @@ -48,7 +48,7 @@ func (la LinksAdjuster) adjust(span ptrace.Span) { newLink := validLinks.AppendEmpty() link.CopyTo(newLink) } else { - jptrace.AddWarning(span, invalidSpanLinkWarning) + jptrace.AddWarnings(span, invalidSpanLinkWarning) } } validLinks.CopyTo(span.Links()) diff --git a/internal/jptrace/translator.go b/internal/jptrace/translator.go index 7480b2b5075..82524696665 100644 --- a/internal/jptrace/translator.go +++ b/internal/jptrace/translator.go @@ -5,6 +5,7 @@ package jptrace import ( jaegerTranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" "github.com/jaegertracing/jaeger/model" @@ -15,10 +16,19 @@ import ( func ProtoFromTraces(traces ptrace.Traces) []*model.Batch { batches := jaegerTranslator.ProtoFromTraces(traces) spanMap := createSpanMapFromBatches(batches) - transferWarnings(traces, spanMap) + transferWarningsToModelSpans(traces, spanMap) return batches } +// ProtoToTraces converts Jaeger model batches ([]*model.Batch) +// to OpenTelemetry traces (ptrace.Traces). +func ProtoToTraces(batches []*model.Batch) ptrace.Traces { + traces, _ := jaegerTranslator.ProtoToTraces(batches) // never returns an error + spanMap := createSpanMapFromTraces(traces) + transferWarningsToOTLPSpans(batches, spanMap) + return traces +} + func createSpanMapFromBatches(batches []*model.Batch) map[model.SpanID]*model.Span { spanMap := make(map[model.SpanID]*model.Span) for _, batch := range batches { @@ -29,7 +39,23 @@ func createSpanMapFromBatches(batches []*model.Batch) map[model.SpanID]*model.Sp return spanMap } -func transferWarnings(traces ptrace.Traces, spanMap map[model.SpanID]*model.Span) { +func createSpanMapFromTraces(traces ptrace.Traces) map[pcommon.SpanID]ptrace.Span { + spanMap := make(map[pcommon.SpanID]ptrace.Span) + resources := traces.ResourceSpans() + for i := 0; i < resources.Len(); i++ { + scopes := resources.At(i).ScopeSpans() + for j := 0; j < scopes.Len(); j++ { + spans := scopes.At(j).Spans() + for k := 0; k < spans.Len(); k++ { + span := spans.At(k) + spanMap[span.SpanID()] = span + } + } + } + return spanMap +} + +func transferWarningsToModelSpans(traces ptrace.Traces, spanMap map[model.SpanID]*model.Span) { resources := traces.ResourceSpans() for i := 0; i < resources.Len(); i++ { scopes := resources.At(i).ScopeSpans() @@ -38,10 +64,27 @@ func transferWarnings(traces ptrace.Traces, spanMap map[model.SpanID]*model.Span for k := 0; k < spans.Len(); k++ { otelSpan := spans.At(k) warnings := GetWarnings(otelSpan) - span := spanMap[model.SpanIDFromOTEL(otelSpan.SpanID())] - span.Warnings = append(span.Warnings, warnings...) - // filter out the warning tag - span.Tags = filterTags(span.Tags, warningsAttribute) + if len(warnings) == 0 { + continue + } + if span, ok := spanMap[model.SpanIDFromOTEL(otelSpan.SpanID())]; ok { + span.Warnings = append(span.Warnings, warnings...) + // filter out the warning tag + span.Tags = filterTags(span.Tags, warningsAttribute) + } + } + } + } +} + +func transferWarningsToOTLPSpans(batches []*model.Batch, spanMap map[pcommon.SpanID]ptrace.Span) { + for _, batch := range batches { + for _, span := range batch.Spans { + if len(span.Warnings) == 0 { + continue + } + if otelSpan, ok := spanMap[span.SpanID.ToOTELSpanID()]; ok { + AddWarnings(otelSpan, span.Warnings...) } } } diff --git a/internal/jptrace/translator_test.go b/internal/jptrace/translator_test.go index 5ff3129a00f..9e021a35abb 100644 --- a/internal/jptrace/translator_test.go +++ b/internal/jptrace/translator_test.go @@ -20,8 +20,8 @@ func TestProtoFromTraces_AddsWarnings(t *testing.T) { span1 := ss1.Spans().AppendEmpty() span1.SetName("test-span-1") span1.SetSpanID(pcommon.SpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8})) - AddWarning(span1, "test-warning-1") - AddWarning(span1, "test-warning-2") + AddWarnings(span1, "test-warning-1") + AddWarnings(span1, "test-warning-2") span1.Attributes().PutStr("key", "value") ss2 := rs1.ScopeSpans().AppendEmpty() @@ -34,7 +34,7 @@ func TestProtoFromTraces_AddsWarnings(t *testing.T) { span3 := ss3.Spans().AppendEmpty() span3.SetName("test-span-3") span3.SetSpanID(pcommon.SpanID([8]byte{17, 18, 19, 20, 21, 22, 23, 24})) - AddWarning(span3, "test-warning-3") + AddWarnings(span3, "test-warning-3") batches := ProtoFromTraces(traces) @@ -53,3 +53,63 @@ func TestProtoFromTraces_AddsWarnings(t *testing.T) { assert.Equal(t, []string{"test-warning-3"}, batches[1].Spans[0].Warnings) assert.Empty(t, batches[1].Spans[0].Tags) } + +func TestProtoToTraces_AddsWarnings(t *testing.T) { + batch1 := &model.Batch{ + Process: &model.Process{ + ServiceName: "batch-1", + }, + Spans: []*model.Span{ + { + OperationName: "test-span-1", + SpanID: model.NewSpanID(1), + Warnings: []string{"test-warning-1", "test-warning-2"}, + }, + { + OperationName: "test-span-2", + SpanID: model.NewSpanID(2), + }, + }, + } + batch2 := &model.Batch{ + Process: &model.Process{ + ServiceName: "batch-2", + }, + Spans: []*model.Span{ + { + OperationName: "test-span-3", + SpanID: model.NewSpanID(3), + Warnings: []string{"test-warning-3"}, + }, + }, + } + batches := []*model.Batch{batch1, batch2} + traces := ProtoToTraces(batches) + + assert.Equal(t, 2, traces.ResourceSpans().Len()) + + rs1 := traces.ResourceSpans().At(0) + assert.Equal(t, 1, rs1.ScopeSpans().Len()) + ss1 := rs1.ScopeSpans().At(0) + assert.Equal(t, 2, ss1.Spans().Len()) + + span1 := ss1.Spans().At(0) + assert.Equal(t, "test-span-1", span1.Name()) + assert.Equal(t, pcommon.SpanID([8]byte{0, 0, 0, 0, 0, 0, 0, 1}), span1.SpanID()) + assert.Equal(t, []string{"test-warning-1", "test-warning-2"}, GetWarnings(span1)) + + span2 := ss1.Spans().At(1) + assert.Equal(t, "test-span-2", span2.Name()) + assert.Equal(t, pcommon.SpanID([8]byte{0, 0, 0, 0, 0, 0, 0, 2}), span2.SpanID()) + assert.Empty(t, GetWarnings(span2)) + + rs2 := traces.ResourceSpans().At(1) + assert.Equal(t, 1, rs2.ScopeSpans().Len()) + ss3 := rs2.ScopeSpans().At(0) + assert.Equal(t, 1, ss3.Spans().Len()) + + span3 := ss3.Spans().At(0) + assert.Equal(t, "test-span-3", span3.Name()) + assert.Equal(t, pcommon.SpanID([8]byte{0, 0, 0, 0, 0, 0, 0, 3}), span3.SpanID()) + assert.Equal(t, []string{"test-warning-3"}, GetWarnings(span3)) +} diff --git a/internal/jptrace/warning.go b/internal/jptrace/warning.go index 0d96296c214..5247930fc25 100644 --- a/internal/jptrace/warning.go +++ b/internal/jptrace/warning.go @@ -16,14 +16,16 @@ const ( warningsAttribute = "jaeger.internal.warnings" ) -func AddWarning(span ptrace.Span, warning string) { - var warnings pcommon.Slice +func AddWarnings(span ptrace.Span, warnings ...string) { + var w pcommon.Slice if currWarnings, ok := span.Attributes().Get(warningsAttribute); ok { - warnings = currWarnings.Slice() + w = currWarnings.Slice() } else { - warnings = span.Attributes().PutEmptySlice(warningsAttribute) + w = span.Attributes().PutEmptySlice(warningsAttribute) + } + for _, warning := range warnings { + w.AppendEmpty().SetStr(warning) } - warnings.AppendEmpty().SetStr(warning) } func GetWarnings(span ptrace.Span) []string { diff --git a/internal/jptrace/warning_test.go b/internal/jptrace/warning_test.go index b8e3c38032c..9bc62437dd4 100644 --- a/internal/jptrace/warning_test.go +++ b/internal/jptrace/warning_test.go @@ -7,6 +7,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/ptrace" ) @@ -46,7 +47,7 @@ func TestAddWarning(t *testing.T) { warnings.AppendEmpty().SetStr(warn) } } - AddWarning(span, test.newWarn) + AddWarnings(span, test.newWarn) warnings, ok := attrs.Get("jaeger.internal.warnings") assert.True(t, ok) assert.Equal(t, len(test.expected), warnings.Slice().Len()) @@ -57,6 +58,15 @@ func TestAddWarning(t *testing.T) { } } +func TestAddWarning_MultipleWarnings(t *testing.T) { + span := ptrace.NewSpan() + AddWarnings(span, "warning-1", "warning-2") + warnings, ok := span.Attributes().Get("jaeger.internal.warnings") + require.True(t, ok) + require.Equal(t, "warning-1", warnings.Slice().At(0).Str()) + require.Equal(t, "warning-2", warnings.Slice().At(1).Str()) +} + func TestGetWarnings(t *testing.T) { tests := []struct { name string diff --git a/storage_v2/v1adapter/reader.go b/storage_v2/v1adapter/reader.go index a3d2293dd25..04508a1247c 100644 --- a/storage_v2/v1adapter/reader.go +++ b/storage_v2/v1adapter/reader.go @@ -7,10 +7,10 @@ import ( "context" "errors" - model2otel "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" + "github.com/jaegertracing/jaeger/internal/jptrace" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/iter" "github.com/jaegertracing/jaeger/storage/dependencystore" @@ -60,8 +60,8 @@ func (tr *TraceReader) GetTraces( return } batch := &model.Batch{Spans: t.GetSpans()} - tr, err := model2otel.ProtoToTraces([]*model.Batch{batch}) - if !yield([]ptrace.Traces{tr}, err) || err != nil { + tr := jptrace.ProtoToTraces([]*model.Batch{batch}) + if !yield([]ptrace.Traces{tr}, nil) { return } } @@ -105,7 +105,7 @@ func (tr *TraceReader) FindTraces( } for _, trace := range traces { batch := &model.Batch{Spans: trace.GetSpans()} - otelTrace, _ := model2otel.ProtoToTraces([]*model.Batch{batch}) + otelTrace := jptrace.ProtoToTraces([]*model.Batch{batch}) if !yield([]ptrace.Traces{otelTrace}, nil) { return }