Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v2][adjuster] Implement model to otlp translator with post processing #6397

Merged
merged 8 commits into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,12 @@ linters-settings:
files:
- "**_test.go"

# TODO: enable after import is not used anywhere
# disallow-otel-contrib-translator:
# deny:
# - pkg: github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger
# desc: "Use jptrace package instead of opentelemetry-collector-contrib/pkg/translator/jaeger"
# files:
# - "!**/jptrace/**"
disallow-otel-contrib-translator:
deny:
- pkg: github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger
desc: "Use jptrace package instead of opentelemetry-collector-contrib/pkg/translator/jaeger"
files:
- "!**/jptrace/**"

goimports:
local-prefixes: github.com/jaegertracing/jaeger
Expand Down
7 changes: 2 additions & 5 deletions cmd/jaeger/internal/integration/span_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ import (
"io"
"time"

jaeger2otlp "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/exporter/otlpexporter"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/internal/jptrace"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/storage/spanstore"
)
Expand Down Expand Up @@ -68,15 +68,12 @@ func (w *spanWriter) Close() error {
}

func (w *spanWriter) WriteSpan(ctx context.Context, span *model.Span) error {
td, err := jaeger2otlp.ProtoToTraces([]*model.Batch{
td := jptrace.ProtoToTraces([]*model.Batch{
{
Spans: []*model.Span{span},
Process: span.Process,
},
})
if err != nil {
return err
}

return w.exporter.ConsumeTraces(ctx, td)
}
5 changes: 2 additions & 3 deletions cmd/query/app/apiv3/otlp_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@
package apiv3

import (
model2otel "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/internal/jptrace"
"github.com/jaegertracing/jaeger/model"
)

func modelToOTLP(spans []*model.Span) ptrace.Traces {
batch := &model.Batch{Spans: spans}
// there is never an error returned from ProtoToTraces
tr, _ := model2otel.ProtoToTraces([]*model.Batch{batch})
tr := jptrace.ProtoToTraces([]*model.Batch{batch})
return tr
}
49 changes: 43 additions & 6 deletions internal/jptrace/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package jptrace

import (
jaegerTranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/model"
Expand All @@ -15,10 +16,19 @@ import (
func ProtoFromTraces(traces ptrace.Traces) []*model.Batch {
batches := jaegerTranslator.ProtoFromTraces(traces)
spanMap := createSpanMapFromBatches(batches)
transferWarnings(traces, spanMap)
transferWarningsToModelSpans(traces, spanMap)
return batches
}

// ProtoToTraces converts Jaeger model batches ([]*model.Batch)
// to OpenTelemetry traces (ptrace.Traces).
func ProtoToTraces(batches []*model.Batch) ptrace.Traces {
traces, _ := jaegerTranslator.ProtoToTraces(batches) // never returns an error
spanMap := createSpanMapFromTraces(traces)
transferWarningsToOTLPSpans(batches, spanMap)
return traces
}

func createSpanMapFromBatches(batches []*model.Batch) map[model.SpanID]*model.Span {
spanMap := make(map[model.SpanID]*model.Span)
for _, batch := range batches {
Expand All @@ -29,7 +39,23 @@ func createSpanMapFromBatches(batches []*model.Batch) map[model.SpanID]*model.Sp
return spanMap
}

func transferWarnings(traces ptrace.Traces, spanMap map[model.SpanID]*model.Span) {
func createSpanMapFromTraces(traces ptrace.Traces) map[pcommon.SpanID]ptrace.Span {
spanMap := make(map[pcommon.SpanID]ptrace.Span)
resources := traces.ResourceSpans()
for i := 0; i < resources.Len(); i++ {
scopes := resources.At(i).ScopeSpans()
for j := 0; j < scopes.Len(); j++ {
spans := scopes.At(j).Spans()
for k := 0; k < spans.Len(); k++ {
span := spans.At(k)
spanMap[span.SpanID()] = span
}
}
}
return spanMap
}

func transferWarningsToModelSpans(traces ptrace.Traces, spanMap map[model.SpanID]*model.Span) {
resources := traces.ResourceSpans()
for i := 0; i < resources.Len(); i++ {
scopes := resources.At(i).ScopeSpans()
Expand All @@ -38,10 +64,21 @@ func transferWarnings(traces ptrace.Traces, spanMap map[model.SpanID]*model.Span
for k := 0; k < spans.Len(); k++ {
otelSpan := spans.At(k)
warnings := GetWarnings(otelSpan)
span := spanMap[model.SpanIDFromOTEL(otelSpan.SpanID())]
span.Warnings = append(span.Warnings, warnings...)
// filter out the warning tag
span.Tags = filterTags(span.Tags, warningsAttribute)
if span, ok := spanMap[model.SpanIDFromOTEL(otelSpan.SpanID())]; ok && len(warnings) > 0 {
mahadzaryab1 marked this conversation as resolved.
Show resolved Hide resolved
span.Warnings = append(span.Warnings, warnings...)
// filter out the warning tag
span.Tags = filterTags(span.Tags, warningsAttribute)
}
}
}
}
}

func transferWarningsToOTLPSpans(batches []*model.Batch, spanMap map[pcommon.SpanID]ptrace.Span) {
for _, batch := range batches {
for _, span := range batch.Spans {
if otelSpan, ok := spanMap[span.SpanID.ToOTELSpanID()]; ok && len(span.Warnings) > 0 {
mahadzaryab1 marked this conversation as resolved.
Show resolved Hide resolved
AddWarning(otelSpan, span.Warnings...)
}
}
}
Expand Down
60 changes: 60 additions & 0 deletions internal/jptrace/translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,63 @@ func TestProtoFromTraces_AddsWarnings(t *testing.T) {
assert.Equal(t, []string{"test-warning-3"}, batches[1].Spans[0].Warnings)
assert.Empty(t, batches[1].Spans[0].Tags)
}

func TestProtoToTraces_AddsWarnings(t *testing.T) {
batch1 := &model.Batch{
Process: &model.Process{
ServiceName: "batch-1",
},
Spans: []*model.Span{
{
OperationName: "test-span-1",
SpanID: model.NewSpanID(1),
Warnings: []string{"test-warning-1", "test-warning-2"},
},
{
OperationName: "test-span-2",
SpanID: model.NewSpanID(2),
},
},
}
batch2 := &model.Batch{
Process: &model.Process{
ServiceName: "batch-2",
},
Spans: []*model.Span{
{
OperationName: "test-span-3",
SpanID: model.NewSpanID(3),
Warnings: []string{"test-warning-3"},
},
},
}
batches := []*model.Batch{batch1, batch2}
traces := ProtoToTraces(batches)

assert.Equal(t, 2, traces.ResourceSpans().Len())

rs1 := traces.ResourceSpans().At(0)
assert.Equal(t, 1, rs1.ScopeSpans().Len())
ss1 := rs1.ScopeSpans().At(0)
assert.Equal(t, 2, ss1.Spans().Len())

span1 := ss1.Spans().At(0)
assert.Equal(t, "test-span-1", span1.Name())
assert.Equal(t, pcommon.SpanID([8]byte{0, 0, 0, 0, 0, 0, 0, 1}), span1.SpanID())
assert.Equal(t, []string{"test-warning-1", "test-warning-2"}, GetWarnings(span1))

span2 := ss1.Spans().At(1)
assert.Equal(t, "test-span-2", span2.Name())
assert.Equal(t, pcommon.SpanID([8]byte{0, 0, 0, 0, 0, 0, 0, 2}), span2.SpanID())
assert.Empty(t, GetWarnings(span2))

rs2 := traces.ResourceSpans().At(1)
assert.Equal(t, 1, rs2.ScopeSpans().Len())
ss3 := rs2.ScopeSpans().At(0)
assert.Equal(t, 1, ss3.Spans().Len())

span3 := ss3.Spans().At(0)
assert.Equal(t, "test-span-3", span3.Name())
assert.Equal(t, pcommon.SpanID([8]byte{0, 0, 0, 0, 0, 0, 0, 3}), span3.SpanID())
assert.Equal(t, []string{"test-warning-3"}, GetWarnings(span3))
}
12 changes: 7 additions & 5 deletions internal/jptrace/warning.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@ const (
warningsAttribute = "jaeger.internal.warnings"
)

func AddWarning(span ptrace.Span, warning string) {
var warnings pcommon.Slice
func AddWarning(span ptrace.Span, warnings ...string) {
mahadzaryab1 marked this conversation as resolved.
Show resolved Hide resolved
var w pcommon.Slice
if currWarnings, ok := span.Attributes().Get(warningsAttribute); ok {
warnings = currWarnings.Slice()
w = currWarnings.Slice()
} else {
warnings = span.Attributes().PutEmptySlice(warningsAttribute)
w = span.Attributes().PutEmptySlice(warningsAttribute)
}
for _, warning := range warnings {
w.AppendEmpty().SetStr(warning)
}
warnings.AppendEmpty().SetStr(warning)
}

func GetWarnings(span ptrace.Span) []string {
Expand Down
10 changes: 10 additions & 0 deletions internal/jptrace/warning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/ptrace"
)

Expand Down Expand Up @@ -57,6 +58,15 @@ func TestAddWarning(t *testing.T) {
}
}

func TestAddWarning_MultipleWarnings(t *testing.T) {
span := ptrace.NewSpan()
AddWarning(span, "warning-1", "warning-2")
warnings, ok := span.Attributes().Get("jaeger.internal.warnings")
require.True(t, ok)
require.Equal(t, "warning-1", warnings.Slice().At(0).Str())
require.Equal(t, "warning-2", warnings.Slice().At(1).Str())
}

func TestGetWarnings(t *testing.T) {
tests := []struct {
name string
Expand Down
8 changes: 4 additions & 4 deletions storage_v2/v1adapter/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (
"context"
"errors"

model2otel "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/internal/jptrace"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/iter"
"github.com/jaegertracing/jaeger/storage/dependencystore"
Expand Down Expand Up @@ -60,8 +60,8 @@ func (tr *TraceReader) GetTraces(
return
}
batch := &model.Batch{Spans: t.GetSpans()}
tr, err := model2otel.ProtoToTraces([]*model.Batch{batch})
if !yield([]ptrace.Traces{tr}, err) || err != nil {
tr := jptrace.ProtoToTraces([]*model.Batch{batch})
if !yield([]ptrace.Traces{tr}, nil) {
return
}
}
Expand Down Expand Up @@ -105,7 +105,7 @@ func (tr *TraceReader) FindTraces(
}
for _, trace := range traces {
batch := &model.Batch{Spans: trace.GetSpans()}
otelTrace, _ := model2otel.ProtoToTraces([]*model.Batch{batch})
otelTrace := jptrace.ProtoToTraces([]*model.Batch{batch})
if !yield([]ptrace.Traces{otelTrace}, nil) {
return
}
Expand Down
Loading