Skip to content

Commit

Permalink
[v2][adjuster] Implement model to otlp translator with post processing (
Browse files Browse the repository at this point in the history
#6397)

## Which problem is this PR solving?
- Towards #6344

## Description of the changes
- Implemented a translator in `jptrace` with a function `ProtoToTraces`
that is a wrapper of the upstream `ProtoToTraces` in
opentelemetry-collector-contrib jaeger translator. This function appends
the warnings in model.Span to the `jaeger.internal.warnings` attribute
in the corresponding span in ptrace.Traces.

## How was this change tested?
- Added unit tests

## Checklist
- [x] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [x] I have signed all commits
- [x] I have added unit tests for the new functionality
- [x] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `npm run lint` and `npm run test`

---------

Signed-off-by: Mahad Zaryab <[email protected]>
  • Loading branch information
mahadzaryab1 authored Dec 24, 2024
1 parent 5b1c76b commit 53468aa
Show file tree
Hide file tree
Showing 13 changed files with 153 additions and 43 deletions.
13 changes: 6 additions & 7 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,12 @@ linters-settings:
files:
- "**_test.go"

# TODO: enable after import is not used anywhere
# disallow-otel-contrib-translator:
# deny:
# - pkg: github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger
# desc: "Use jptrace package instead of opentelemetry-collector-contrib/pkg/translator/jaeger"
# files:
# - "!**/jptrace/**"
disallow-otel-contrib-translator:
deny:
- pkg: github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger
desc: "Use jptrace package instead of opentelemetry-collector-contrib/pkg/translator/jaeger"
files:
- "!**/jptrace/**"

goimports:
local-prefixes: github.com/jaegertracing/jaeger
Expand Down
7 changes: 2 additions & 5 deletions cmd/jaeger/internal/integration/span_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ import (
"io"
"time"

jaeger2otlp "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/exporter/otlpexporter"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/internal/jptrace"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/storage/spanstore"
)
Expand Down Expand Up @@ -68,15 +68,12 @@ func (w *spanWriter) Close() error {
}

func (w *spanWriter) WriteSpan(ctx context.Context, span *model.Span) error {
td, err := jaeger2otlp.ProtoToTraces([]*model.Batch{
td := jptrace.ProtoToTraces([]*model.Batch{
{
Spans: []*model.Span{span},
Process: span.Process,
},
})
if err != nil {
return err
}

return w.exporter.ConsumeTraces(ctx, td)
}
5 changes: 2 additions & 3 deletions cmd/query/app/apiv3/otlp_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@
package apiv3

import (
model2otel "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/internal/jptrace"
"github.com/jaegertracing/jaeger/model"
)

func modelToOTLP(spans []*model.Span) ptrace.Traces {
batch := &model.Batch{Spans: spans}
// there is never an error returned from ProtoToTraces
tr, _ := model2otel.ProtoToTraces([]*model.Batch{batch})
tr := jptrace.ProtoToTraces([]*model.Batch{batch})
return tr
}
10 changes: 5 additions & 5 deletions cmd/query/app/querysvc/adjuster/clockskew.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func (a *clockSkewAdjuster) buildNodesMap() {
for k := 0; k < spans.Len(); k++ {
span := spans.At(k)
if _, exists := a.spans[span.SpanID()]; exists {
jptrace.AddWarning(span, warningDuplicateSpanID)
jptrace.AddWarnings(span, warningDuplicateSpanID)
} else {
a.spans[span.SpanID()] = &node{
span: span,
Expand All @@ -129,7 +129,7 @@ func (a *clockSkewAdjuster) buildSubGraphs() {
p.children = append(p.children, n)
} else {
warning := fmt.Sprintf(warningMissingParentSpanID, n.span.ParentSpanID())
jptrace.AddWarning(n.span, warning)
jptrace.AddWarnings(n.span, warning)
// treat spans with invalid parent ID as root spans
a.roots[n.span.SpanID()] = n
}
Expand Down Expand Up @@ -185,14 +185,14 @@ func (a *clockSkewAdjuster) adjustTimestamps(n *node, skew clockSkew) {
}
if absDuration(skew.delta) > a.maxDelta {
if a.maxDelta == 0 {
jptrace.AddWarning(n.span, fmt.Sprintf(warningSkewAdjustDisabled, skew.delta))
jptrace.AddWarnings(n.span, fmt.Sprintf(warningSkewAdjustDisabled, skew.delta))
return
}
jptrace.AddWarning(n.span, fmt.Sprintf(warningMaxDeltaExceeded, a.maxDelta, skew.delta))
jptrace.AddWarnings(n.span, fmt.Sprintf(warningMaxDeltaExceeded, a.maxDelta, skew.delta))
return
}
n.span.SetStartTimestamp(pcommon.NewTimestampFromTime(n.span.StartTimestamp().AsTime().Add(skew.delta)))
jptrace.AddWarning(n.span, fmt.Sprintf("This span's timestamps were adjusted by %v", skew.delta))
jptrace.AddWarnings(n.span, fmt.Sprintf("This span's timestamps were adjusted by %v", skew.delta))
for i := 0; i < n.span.Events().Len(); i++ {
event := n.span.Events().At(i)
event.SetTimestamp(pcommon.NewTimestampFromTime(event.Timestamp().AsTime().Add(skew.delta)))
Expand Down
2 changes: 1 addition & 1 deletion cmd/query/app/querysvc/adjuster/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (s *SpanHashDeduper) Adjust(traces ptrace.Traces) {
hashTrace,
)
if err != nil {
jptrace.AddWarning(span, fmt.Sprintf("failed to compute hash code: %v", err))
jptrace.AddWarnings(span, fmt.Sprintf("failed to compute hash code: %v", err))
span.CopyTo(dedupedSpans.AppendEmpty())
continue
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/query/app/querysvc/adjuster/libraryattributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (ResourceAttributesAdjuster) moveAttributes(span ptrace.Span, resource pcom
for k, v := range replace {
existing, ok := resource.Attributes().Get(k)
if ok && existing.AsRaw() != v.AsRaw() {
jptrace.AddWarning(span, "conflicting values between Span and Resource for attribute "+k)
jptrace.AddWarnings(span, "conflicting values between Span and Resource for attribute "+k)
continue
}
v.CopyTo(resource.Attributes().PutEmpty(k))
Expand Down
2 changes: 1 addition & 1 deletion cmd/query/app/querysvc/adjuster/spaniduniquifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (d *spanIDDeduper) uniquifyServerSpanIDs(traces ptrace.Traces) {
if span.Kind() == ptrace.SpanKindServer && d.isSharedWithClientSpan(span.SpanID()) {
newID, err := d.makeUniqueSpanID()
if err != nil {
jptrace.AddWarning(span, err.Error())
jptrace.AddWarnings(span, err.Error())
continue
}
oldToNewSpanIDs[span.SpanID()] = newID
Expand Down
2 changes: 1 addition & 1 deletion cmd/query/app/querysvc/adjuster/spanlinks.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (la LinksAdjuster) adjust(span ptrace.Span) {
newLink := validLinks.AppendEmpty()
link.CopyTo(newLink)
} else {
jptrace.AddWarning(span, invalidSpanLinkWarning)
jptrace.AddWarnings(span, invalidSpanLinkWarning)
}
}
validLinks.CopyTo(span.Links())
Expand Down
55 changes: 49 additions & 6 deletions internal/jptrace/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package jptrace

import (
jaegerTranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/model"
Expand All @@ -15,10 +16,19 @@ import (
func ProtoFromTraces(traces ptrace.Traces) []*model.Batch {
batches := jaegerTranslator.ProtoFromTraces(traces)
spanMap := createSpanMapFromBatches(batches)
transferWarnings(traces, spanMap)
transferWarningsToModelSpans(traces, spanMap)
return batches
}

// ProtoToTraces converts Jaeger model batches ([]*model.Batch)
// to OpenTelemetry traces (ptrace.Traces).
func ProtoToTraces(batches []*model.Batch) ptrace.Traces {
traces, _ := jaegerTranslator.ProtoToTraces(batches) // never returns an error
spanMap := createSpanMapFromTraces(traces)
transferWarningsToOTLPSpans(batches, spanMap)
return traces
}

func createSpanMapFromBatches(batches []*model.Batch) map[model.SpanID]*model.Span {
spanMap := make(map[model.SpanID]*model.Span)
for _, batch := range batches {
Expand All @@ -29,7 +39,23 @@ func createSpanMapFromBatches(batches []*model.Batch) map[model.SpanID]*model.Sp
return spanMap
}

func transferWarnings(traces ptrace.Traces, spanMap map[model.SpanID]*model.Span) {
func createSpanMapFromTraces(traces ptrace.Traces) map[pcommon.SpanID]ptrace.Span {
spanMap := make(map[pcommon.SpanID]ptrace.Span)
resources := traces.ResourceSpans()
for i := 0; i < resources.Len(); i++ {
scopes := resources.At(i).ScopeSpans()
for j := 0; j < scopes.Len(); j++ {
spans := scopes.At(j).Spans()
for k := 0; k < spans.Len(); k++ {
span := spans.At(k)
spanMap[span.SpanID()] = span
}
}
}
return spanMap
}

func transferWarningsToModelSpans(traces ptrace.Traces, spanMap map[model.SpanID]*model.Span) {
resources := traces.ResourceSpans()
for i := 0; i < resources.Len(); i++ {
scopes := resources.At(i).ScopeSpans()
Expand All @@ -38,10 +64,27 @@ func transferWarnings(traces ptrace.Traces, spanMap map[model.SpanID]*model.Span
for k := 0; k < spans.Len(); k++ {
otelSpan := spans.At(k)
warnings := GetWarnings(otelSpan)
span := spanMap[model.SpanIDFromOTEL(otelSpan.SpanID())]
span.Warnings = append(span.Warnings, warnings...)
// filter out the warning tag
span.Tags = filterTags(span.Tags, warningsAttribute)
if len(warnings) == 0 {
continue
}
if span, ok := spanMap[model.SpanIDFromOTEL(otelSpan.SpanID())]; ok {
span.Warnings = append(span.Warnings, warnings...)
// filter out the warning tag
span.Tags = filterTags(span.Tags, warningsAttribute)
}
}
}
}
}

func transferWarningsToOTLPSpans(batches []*model.Batch, spanMap map[pcommon.SpanID]ptrace.Span) {
for _, batch := range batches {
for _, span := range batch.Spans {
if len(span.Warnings) == 0 {
continue
}
if otelSpan, ok := spanMap[span.SpanID.ToOTELSpanID()]; ok {
AddWarnings(otelSpan, span.Warnings...)
}
}
}
Expand Down
66 changes: 63 additions & 3 deletions internal/jptrace/translator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ func TestProtoFromTraces_AddsWarnings(t *testing.T) {
span1 := ss1.Spans().AppendEmpty()
span1.SetName("test-span-1")
span1.SetSpanID(pcommon.SpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8}))
AddWarning(span1, "test-warning-1")
AddWarning(span1, "test-warning-2")
AddWarnings(span1, "test-warning-1")
AddWarnings(span1, "test-warning-2")
span1.Attributes().PutStr("key", "value")

ss2 := rs1.ScopeSpans().AppendEmpty()
Expand All @@ -34,7 +34,7 @@ func TestProtoFromTraces_AddsWarnings(t *testing.T) {
span3 := ss3.Spans().AppendEmpty()
span3.SetName("test-span-3")
span3.SetSpanID(pcommon.SpanID([8]byte{17, 18, 19, 20, 21, 22, 23, 24}))
AddWarning(span3, "test-warning-3")
AddWarnings(span3, "test-warning-3")

batches := ProtoFromTraces(traces)

Expand All @@ -53,3 +53,63 @@ func TestProtoFromTraces_AddsWarnings(t *testing.T) {
assert.Equal(t, []string{"test-warning-3"}, batches[1].Spans[0].Warnings)
assert.Empty(t, batches[1].Spans[0].Tags)
}

func TestProtoToTraces_AddsWarnings(t *testing.T) {
batch1 := &model.Batch{
Process: &model.Process{
ServiceName: "batch-1",
},
Spans: []*model.Span{
{
OperationName: "test-span-1",
SpanID: model.NewSpanID(1),
Warnings: []string{"test-warning-1", "test-warning-2"},
},
{
OperationName: "test-span-2",
SpanID: model.NewSpanID(2),
},
},
}
batch2 := &model.Batch{
Process: &model.Process{
ServiceName: "batch-2",
},
Spans: []*model.Span{
{
OperationName: "test-span-3",
SpanID: model.NewSpanID(3),
Warnings: []string{"test-warning-3"},
},
},
}
batches := []*model.Batch{batch1, batch2}
traces := ProtoToTraces(batches)

assert.Equal(t, 2, traces.ResourceSpans().Len())

rs1 := traces.ResourceSpans().At(0)
assert.Equal(t, 1, rs1.ScopeSpans().Len())
ss1 := rs1.ScopeSpans().At(0)
assert.Equal(t, 2, ss1.Spans().Len())

span1 := ss1.Spans().At(0)
assert.Equal(t, "test-span-1", span1.Name())
assert.Equal(t, pcommon.SpanID([8]byte{0, 0, 0, 0, 0, 0, 0, 1}), span1.SpanID())
assert.Equal(t, []string{"test-warning-1", "test-warning-2"}, GetWarnings(span1))

span2 := ss1.Spans().At(1)
assert.Equal(t, "test-span-2", span2.Name())
assert.Equal(t, pcommon.SpanID([8]byte{0, 0, 0, 0, 0, 0, 0, 2}), span2.SpanID())
assert.Empty(t, GetWarnings(span2))

rs2 := traces.ResourceSpans().At(1)
assert.Equal(t, 1, rs2.ScopeSpans().Len())
ss3 := rs2.ScopeSpans().At(0)
assert.Equal(t, 1, ss3.Spans().Len())

span3 := ss3.Spans().At(0)
assert.Equal(t, "test-span-3", span3.Name())
assert.Equal(t, pcommon.SpanID([8]byte{0, 0, 0, 0, 0, 0, 0, 3}), span3.SpanID())
assert.Equal(t, []string{"test-warning-3"}, GetWarnings(span3))
}
12 changes: 7 additions & 5 deletions internal/jptrace/warning.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@ const (
warningsAttribute = "jaeger.internal.warnings"
)

func AddWarning(span ptrace.Span, warning string) {
var warnings pcommon.Slice
func AddWarnings(span ptrace.Span, warnings ...string) {
var w pcommon.Slice
if currWarnings, ok := span.Attributes().Get(warningsAttribute); ok {
warnings = currWarnings.Slice()
w = currWarnings.Slice()
} else {
warnings = span.Attributes().PutEmptySlice(warningsAttribute)
w = span.Attributes().PutEmptySlice(warningsAttribute)
}
for _, warning := range warnings {
w.AppendEmpty().SetStr(warning)
}
warnings.AppendEmpty().SetStr(warning)
}

func GetWarnings(span ptrace.Span) []string {
Expand Down
12 changes: 11 additions & 1 deletion internal/jptrace/warning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/ptrace"
)

Expand Down Expand Up @@ -46,7 +47,7 @@ func TestAddWarning(t *testing.T) {
warnings.AppendEmpty().SetStr(warn)
}
}
AddWarning(span, test.newWarn)
AddWarnings(span, test.newWarn)
warnings, ok := attrs.Get("jaeger.internal.warnings")
assert.True(t, ok)
assert.Equal(t, len(test.expected), warnings.Slice().Len())
Expand All @@ -57,6 +58,15 @@ func TestAddWarning(t *testing.T) {
}
}

func TestAddWarning_MultipleWarnings(t *testing.T) {
span := ptrace.NewSpan()
AddWarnings(span, "warning-1", "warning-2")
warnings, ok := span.Attributes().Get("jaeger.internal.warnings")
require.True(t, ok)
require.Equal(t, "warning-1", warnings.Slice().At(0).Str())
require.Equal(t, "warning-2", warnings.Slice().At(1).Str())
}

func TestGetWarnings(t *testing.T) {
tests := []struct {
name string
Expand Down
8 changes: 4 additions & 4 deletions storage_v2/v1adapter/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (
"context"
"errors"

model2otel "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/internal/jptrace"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/iter"
"github.com/jaegertracing/jaeger/storage/dependencystore"
Expand Down Expand Up @@ -60,8 +60,8 @@ func (tr *TraceReader) GetTraces(
return
}
batch := &model.Batch{Spans: t.GetSpans()}
tr, err := model2otel.ProtoToTraces([]*model.Batch{batch})
if !yield([]ptrace.Traces{tr}, err) || err != nil {
tr := jptrace.ProtoToTraces([]*model.Batch{batch})
if !yield([]ptrace.Traces{tr}, nil) {
return
}
}
Expand Down Expand Up @@ -105,7 +105,7 @@ func (tr *TraceReader) FindTraces(
}
for _, trace := range traces {
batch := &model.Batch{Spans: trace.GetSpans()}
otelTrace, _ := model2otel.ProtoToTraces([]*model.Batch{batch})
otelTrace := jptrace.ProtoToTraces([]*model.Batch{batch})
if !yield([]ptrace.Traces{otelTrace}, nil) {
return
}
Expand Down

0 comments on commit 53468aa

Please sign in to comment.