Skip to content

Commit

Permalink
[v2][adjuster] Implement otlp to model translator with post processing (
Browse files Browse the repository at this point in the history
#6394)

## Which problem is this PR solving?
- Towards #6344

## Description of the changes
- Implemented a translator in `jptrace` with a function
`ProtoFromTraces` that is a wrapper of the upstream `ProtoFromTraces` in
opentelemetry-collector-contrib jaeger translator. This function appends
the warnings in `jaeger.internal.warnings` to the corresponding warnings
field in the proto model.

## How was this change tested?
- Added unit tests

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [x] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `npm run lint` and `npm run test`

---------

Signed-off-by: Mahad Zaryab <[email protected]>
  • Loading branch information
mahadzaryab1 authored Dec 23, 2024
1 parent 6bec1ad commit 4ecb086
Show file tree
Hide file tree
Showing 9 changed files with 197 additions and 8 deletions.
8 changes: 8 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,14 @@ linters-settings:
files:
- "**_test.go"

# TODO: enable after import is not used anywhere
# disallow-otel-contrib-translator:
# deny:
# - pkg: github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger
# desc: "Use jptrace package instead of opentelemetry-collector-contrib/pkg/translator/jaeger"
# files:
# - "!**/jptrace/**"

goimports:
local-prefixes: github.com/jaegertracing/jaeger
gosec:
Expand Down
4 changes: 2 additions & 2 deletions cmd/collector/app/handler/otlp_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"context"
"fmt"

otlp2jaeger "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentstatus"
"go.opentelemetry.io/collector/config/confignet"
Expand All @@ -23,6 +22,7 @@ import (

"github.com/jaegertracing/jaeger/cmd/collector/app/flags"
"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/internal/jptrace"
"github.com/jaegertracing/jaeger/pkg/tenancy"
)

Expand Down Expand Up @@ -108,7 +108,7 @@ type consumerDelegate struct {
}

func (c *consumerDelegate) consume(ctx context.Context, td ptrace.Traces) error {
batches := otlp2jaeger.ProtoFromTraces(td)
batches := jptrace.ProtoFromTraces(td)
for _, batch := range batches {
err := c.batchConsumer.consume(ctx, batch)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions cmd/jaeger/internal/processors/adaptivesampling/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import (
"context"
"fmt"

otlp2jaeger "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/samplingstrategy"
"github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/remotesampling"
"github.com/jaegertracing/jaeger/internal/jptrace"
"github.com/jaegertracing/jaeger/internal/metrics/otelmetrics"
"github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider/adaptive"
)
Expand Down Expand Up @@ -65,7 +65,7 @@ func (tp *traceProcessor) close(context.Context) error {
}

func (tp *traceProcessor) processTraces(_ context.Context, td ptrace.Traces) (ptrace.Traces, error) {
batches := otlp2jaeger.ProtoFromTraces(td)
batches := jptrace.ProtoFromTraces(td)
for _, batch := range batches {
for _, span := range batch.Spans {
if span.Process == nil {
Expand Down
4 changes: 2 additions & 2 deletions cmd/query/app/otlp_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ package app
import (
"fmt"

model2otel "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/internal/jptrace"
"github.com/jaegertracing/jaeger/model"
)

Expand All @@ -18,7 +18,7 @@ func otlp2traces(otlpSpans []byte) ([]*model.Trace, error) {
if err != nil {
return nil, fmt.Errorf("cannot unmarshal OTLP : %w", err)
}
jaegerBatches := model2otel.ProtoFromTraces(otlpTraces)
jaegerBatches := jptrace.ProtoFromTraces(otlpTraces)
var traces []*model.Trace
traceMap := make(map[model.TraceID]*model.Trace)
for _, batch := range jaegerBatches {
Expand Down
58 changes: 58 additions & 0 deletions internal/jptrace/translator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package jptrace

import (
jaegerTranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/model"
)

// ProtoFromTraces converts OpenTelemetry traces (ptrace.Traces)
// to Jaeger model batches ([]*model.Batch).
func ProtoFromTraces(traces ptrace.Traces) []*model.Batch {
batches := jaegerTranslator.ProtoFromTraces(traces)
spanMap := createSpanMapFromBatches(batches)
transferWarnings(traces, spanMap)
return batches
}

func createSpanMapFromBatches(batches []*model.Batch) map[model.SpanID]*model.Span {
spanMap := make(map[model.SpanID]*model.Span)
for _, batch := range batches {
for _, span := range batch.Spans {
spanMap[span.SpanID] = span
}
}
return spanMap
}

func transferWarnings(traces ptrace.Traces, spanMap map[model.SpanID]*model.Span) {
resources := traces.ResourceSpans()
for i := 0; i < resources.Len(); i++ {
scopes := resources.At(i).ScopeSpans()
for j := 0; j < scopes.Len(); j++ {
spans := scopes.At(j).Spans()
for k := 0; k < spans.Len(); k++ {
otelSpan := spans.At(k)
warnings := GetWarnings(otelSpan)
span := spanMap[model.SpanIDFromOTEL(otelSpan.SpanID())]
span.Warnings = append(span.Warnings, warnings...)
// filter out the warning tag
span.Tags = filterTags(span.Tags, warningsAttribute)
}
}
}
}

func filterTags(tags []model.KeyValue, keyToRemove string) []model.KeyValue {
var filteredTags []model.KeyValue
for _, tag := range tags {
if tag.Key != keyToRemove {
filteredTags = append(filteredTags, tag)
}
}
return filteredTags
}
55 changes: 55 additions & 0 deletions internal/jptrace/translator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package jptrace

import (
"testing"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/model"
)

func TestProtoFromTraces_AddsWarnings(t *testing.T) {
traces := ptrace.NewTraces()
rs1 := traces.ResourceSpans().AppendEmpty()
ss1 := rs1.ScopeSpans().AppendEmpty()
span1 := ss1.Spans().AppendEmpty()
span1.SetName("test-span-1")
span1.SetSpanID(pcommon.SpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8}))
AddWarning(span1, "test-warning-1")
AddWarning(span1, "test-warning-2")
span1.Attributes().PutStr("key", "value")

ss2 := rs1.ScopeSpans().AppendEmpty()
span2 := ss2.Spans().AppendEmpty()
span2.SetName("test-span-2")
span2.SetSpanID(pcommon.SpanID([8]byte{9, 10, 11, 12, 13, 14, 15, 16}))

rs2 := traces.ResourceSpans().AppendEmpty()
ss3 := rs2.ScopeSpans().AppendEmpty()
span3 := ss3.Spans().AppendEmpty()
span3.SetName("test-span-3")
span3.SetSpanID(pcommon.SpanID([8]byte{17, 18, 19, 20, 21, 22, 23, 24}))
AddWarning(span3, "test-warning-3")

batches := ProtoFromTraces(traces)

assert.Len(t, batches, 2)

assert.Len(t, batches[0].Spans, 2)
assert.Equal(t, "test-span-1", batches[0].Spans[0].OperationName)
assert.Equal(t, []string{"test-warning-1", "test-warning-2"}, batches[0].Spans[0].Warnings)
assert.Equal(t, []model.KeyValue{{Key: "key", VStr: "value"}}, batches[0].Spans[0].Tags)
assert.Equal(t, "test-span-2", batches[0].Spans[1].OperationName)
assert.Empty(t, batches[0].Spans[1].Warnings)
assert.Empty(t, batches[0].Spans[1].Tags)

assert.Len(t, batches[1].Spans, 1)
assert.Equal(t, "test-span-3", batches[1].Spans[0].OperationName)
assert.Equal(t, []string{"test-warning-3"}, batches[1].Spans[0].Warnings)
assert.Empty(t, batches[1].Spans[0].Tags)
}
16 changes: 16 additions & 0 deletions model/ids.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,3 +261,19 @@ func (s *SpanID) UnmarshalJSON(data []byte) error {
func (s *SpanID) UnmarshalJSONPB(_ *jsonpb.Unmarshaler, b []byte) error {
return s.UnmarshalJSON(b)
}

// ToOTELSpanID converts the SpanID to OTEL's representation of a span identitfier.
// This was taken from
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/internal/coreinternal/idutils/big_endian_converter.go.
func (s SpanID) ToOTELSpanID() pcommon.SpanID {
spanID := [8]byte{}
binary.BigEndian.PutUint64(spanID[:], uint64(s))
return pcommon.SpanID(spanID)
}

// ToOTELSpanID converts OTEL's SpanID to the model representation of a span identitfier.
// This was taken from
// https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/internal/coreinternal/idutils/big_endian_converter.go.
func SpanIDFromOTEL(spanID pcommon.SpanID) SpanID {
return SpanID(binary.BigEndian.Uint64(spanID[:]))
}
52 changes: 52 additions & 0 deletions model/ids_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,55 @@ func TestTraceIDFromOTEL(t *testing.T) {
}
require.Equal(t, expected, model.TraceIDFromOTEL(otelTraceID))
}

func TestToOTELSpanID(t *testing.T) {
tests := []struct {
name string
spanID model.SpanID
expected pcommon.SpanID
}{
{
name: "zero span ID",
spanID: model.NewSpanID(0),
expected: pcommon.NewSpanIDEmpty(),
},
{
name: "non-zero span ID",
spanID: model.NewSpanID(1),
expected: pcommon.SpanID([8]byte{0, 0, 0, 0, 0, 0, 0, 1}),
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
actual := test.spanID.ToOTELSpanID()
assert.Equal(t, test.expected, actual)
})
}
}

func TestSpanIDFromOTEL(t *testing.T) {
tests := []struct {
name string
otelSpanID pcommon.SpanID
expected model.SpanID
}{
{
name: "zero span ID",
otelSpanID: pcommon.NewSpanIDEmpty(),
expected: model.NewSpanID(0),
},
{
name: "non-zero span ID",
otelSpanID: pcommon.SpanID([8]byte{0, 0, 0, 0, 0, 0, 0, 1}),
expected: model.NewSpanID(1),
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
actual := model.SpanIDFromOTEL(test.otelSpanID)
assert.Equal(t, test.expected, actual)
})
}
}
4 changes: 2 additions & 2 deletions storage_v2/v1adapter/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"context"
"errors"

otlp2jaeger "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/internal/jptrace"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage_v2/tracestore"
)
Expand All @@ -26,7 +26,7 @@ func NewTraceWriter(spanWriter spanstore.Writer) tracestore.Writer {

// WriteTraces implements tracestore.Writer.
func (t *TraceWriter) WriteTraces(ctx context.Context, td ptrace.Traces) error {
batches := otlp2jaeger.ProtoFromTraces(td)
batches := jptrace.ProtoFromTraces(td)
var errs []error
for _, batch := range batches {
for _, span := range batch.Spans {
Expand Down

0 comments on commit 4ecb086

Please sign in to comment.