diff --git a/cmd/query/app/querysvc/adjuster/hash.go b/cmd/query/app/querysvc/adjuster/hash.go new file mode 100644 index 00000000000..25afbbc1621 --- /dev/null +++ b/cmd/query/app/querysvc/adjuster/hash.go @@ -0,0 +1,83 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package adjuster + +import ( + "fmt" + "hash/fnv" + + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/jaegertracing/jaeger/internal/jptrace" +) + +var _ Adjuster = (*SpanHashDeduper)(nil) + +// SpanHash creates an adjuster that deduplicates spans by removing all but one span +// with the same hash code. This is particularly useful for scenarios where spans +// may be duplicated during archival, such as with ElasticSearch archival. +// +// The hash code is generated by serializing the span into protobuf bytes and applying +// the FNV hashing algorithm to the serialized data. +// +// To ensure consistent hash codes, this adjuster should be executed after +// SortAttributesAndEvents, which normalizes the order of collections within the span. +func SpanHash() SpanHashDeduper { + return SpanHashDeduper{ + marshaler: &ptrace.ProtoMarshaler{}, + } +} + +type SpanHashDeduper struct { + marshaler ptrace.Marshaler +} + +func (s *SpanHashDeduper) Adjust(traces ptrace.Traces) { + spansByHash := make(map[uint64]ptrace.Span) + resourceSpans := traces.ResourceSpans() + for i := 0; i < resourceSpans.Len(); i++ { + rs := resourceSpans.At(i) + scopeSpans := rs.ScopeSpans() + hashTrace := ptrace.NewTraces() + hashResourceSpan := hashTrace.ResourceSpans().AppendEmpty() + hashScopeSpan := hashResourceSpan.ScopeSpans().AppendEmpty() + hashSpan := hashScopeSpan.Spans().AppendEmpty() + rs.Resource().Attributes().CopyTo(hashResourceSpan.Resource().Attributes()) + for j := 0; j < scopeSpans.Len(); j++ { + ss := scopeSpans.At(j) + spans := ss.Spans() + ss.Scope().Attributes().CopyTo(hashScopeSpan.Scope().Attributes()) + dedupedSpans := ptrace.NewSpanSlice() + for k := 0; k < spans.Len(); k++ { + span := spans.At(k) + span.CopyTo(hashSpan) + h, err := s.computeHashCode( + hashTrace, + ) + if err != nil { + jptrace.AddWarning(span, fmt.Sprintf("failed to compute hash code: %v", err)) + span.CopyTo(dedupedSpans.AppendEmpty()) + continue + } + if _, ok := spansByHash[h]; !ok { + spansByHash[h] = span + span.CopyTo(dedupedSpans.AppendEmpty()) + } + } + dedupedSpans.CopyTo(spans) + } + } +} + +func (s *SpanHashDeduper) computeHashCode( + hashTrace ptrace.Traces, +) (uint64, error) { + b, err := s.marshaler.MarshalTraces(hashTrace) + if err != nil { + return 0, err + } + hasher := fnv.New64a() + hasher.Write(b) // the writer in the Hash interface never returns an error + return hasher.Sum64(), nil +} diff --git a/cmd/query/app/querysvc/adjuster/hash_test.go b/cmd/query/app/querysvc/adjuster/hash_test.go new file mode 100644 index 00000000000..dd90d7ab1a3 --- /dev/null +++ b/cmd/query/app/querysvc/adjuster/hash_test.go @@ -0,0 +1,320 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package adjuster + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/jaegertracing/jaeger/internal/jptrace" +) + +func TestSpanHash_EmptySpans(t *testing.T) { + adjuster := SpanHash() + input := ptrace.NewTraces() + expected := ptrace.NewTraces() + adjuster.Adjust(input) + assert.Equal(t, expected, input) +} + +func TestSpanHash_RemovesDuplicateSpans(t *testing.T) { + adjuster := SpanHash() + input := func() ptrace.Traces { + traces := ptrace.NewTraces() + + rs := traces.ResourceSpans().AppendEmpty() + rs.Resource().Attributes().PutStr("key1", "value1") + + ss := rs.ScopeSpans().AppendEmpty() + ss.Scope().Attributes().PutStr("key2", "value2") + + spans := ss.Spans() + + span1 := spans.AppendEmpty() + span1.SetName("span1") + span1.SetTraceID([16]byte{1}) + span1.SetSpanID([8]byte{1}) + + span2 := spans.AppendEmpty() + span2.SetName("span2") + span2.SetTraceID([16]byte{1}) + span2.SetSpanID([8]byte{2}) + + span3 := spans.AppendEmpty() + span3.SetName("span1") + span3.SetTraceID([16]byte{1}) + span3.SetSpanID([8]byte{1}) + + span4 := spans.AppendEmpty() + span4.SetName("span2") + span4.SetTraceID([16]byte{1}) + span4.SetSpanID([8]byte{2}) + + span5 := spans.AppendEmpty() + span5.SetName("span3") + span5.SetTraceID([16]byte{3}) + span5.SetSpanID([8]byte{4}) + + rs2 := traces.ResourceSpans().AppendEmpty() + rs2.Resource().Attributes().PutStr("key1", "value1") + + ss2 := rs2.ScopeSpans().AppendEmpty() + ss2.Scope().Attributes().PutStr("key2", "value2") + + spans2 := ss2.Spans() + + span6 := spans2.AppendEmpty() + span6.SetName("span4") + span6.SetTraceID([16]byte{5}) + span6.SetSpanID([8]byte{6}) + + span7 := spans2.AppendEmpty() + span7.SetName("span3") + span7.SetTraceID([16]byte{3}) + span7.SetSpanID([8]byte{4}) + + return traces + } + expected := func() ptrace.Traces { + traces := ptrace.NewTraces() + + rs := traces.ResourceSpans().AppendEmpty() + rs.Resource().Attributes().PutStr("key1", "value1") + + ss := rs.ScopeSpans().AppendEmpty() + ss.Scope().Attributes().PutStr("key2", "value2") + + spans := ss.Spans() + + span1 := spans.AppendEmpty() + span1.SetName("span1") + span1.SetTraceID([16]byte{1}) + span1.SetSpanID([8]byte{1}) + + span2 := spans.AppendEmpty() + span2.SetName("span2") + span2.SetTraceID([16]byte{1}) + span2.SetSpanID([8]byte{2}) + + span3 := spans.AppendEmpty() + span3.SetName("span3") + span3.SetTraceID([16]byte{3}) + span3.SetSpanID([8]byte{4}) + + rs2 := traces.ResourceSpans().AppendEmpty() + rs2.Resource().Attributes().PutStr("key1", "value1") + + ss2 := rs2.ScopeSpans().AppendEmpty() + ss2.Scope().Attributes().PutStr("key2", "value2") + + spans2 := ss2.Spans() + + span4 := spans2.AppendEmpty() + span4.SetName("span4") + span4.SetTraceID([16]byte{5}) + span4.SetSpanID([8]byte{6}) + + return traces + } + + i := input() + adjuster.Adjust(i) + assert.Equal(t, expected(), i) +} + +func TestSpanHash_NoDuplicateSpans(t *testing.T) { + adjuster := SpanHash() + input := func() ptrace.Traces { + traces := ptrace.NewTraces() + rs := traces.ResourceSpans().AppendEmpty() + ss := rs.ScopeSpans().AppendEmpty() + spans := ss.Spans() + + span1 := spans.AppendEmpty() + span1.SetName("span1") + span1.SetTraceID([16]byte{1}) + span1.SetSpanID([8]byte{1}) + + span2 := spans.AppendEmpty() + span2.SetName("span2") + span2.SetTraceID([16]byte{1}) + span2.SetSpanID([8]byte{2}) + + span3 := spans.AppendEmpty() + span3.SetName("span3") + span3.SetTraceID([16]byte{3}) + span3.SetSpanID([8]byte{4}) + + return traces + } + expected := func() ptrace.Traces { + traces := ptrace.NewTraces() + rs := traces.ResourceSpans().AppendEmpty() + ss := rs.ScopeSpans().AppendEmpty() + spans := ss.Spans() + + span1 := spans.AppendEmpty() + span1.SetName("span1") + span1.SetTraceID([16]byte{1}) + span1.SetSpanID([8]byte{1}) + + span2 := spans.AppendEmpty() + span2.SetName("span2") + span2.SetTraceID([16]byte{1}) + span2.SetSpanID([8]byte{2}) + + span3 := spans.AppendEmpty() + span3.SetName("span3") + span3.SetTraceID([16]byte{3}) + span3.SetSpanID([8]byte{4}) + + return traces + } + + i := input() + adjuster.Adjust(i) + assert.Equal(t, expected(), i) +} + +func TestSpanHash_DuplicateSpansDifferentScopeAttributes(t *testing.T) { + adjuster := SpanHash() + input := func() ptrace.Traces { + traces := ptrace.NewTraces() + rs := traces.ResourceSpans().AppendEmpty() + ss := rs.ScopeSpans().AppendEmpty() + ss.Scope().Attributes().PutStr("key", "value1") + spans := ss.Spans() + + span1 := spans.AppendEmpty() + span1.SetName("span1") + span1.SetTraceID([16]byte{1}) + span1.SetSpanID([8]byte{1}) + + ss2 := rs.ScopeSpans().AppendEmpty() + ss2.Scope().Attributes().PutStr("key", "value2") + spans2 := ss2.Spans() + + span2 := spans2.AppendEmpty() + span2.SetName("span1") + span2.SetTraceID([16]byte{1}) + span2.SetSpanID([8]byte{1}) + + return traces + } + expected := func() ptrace.Traces { + traces := ptrace.NewTraces() + rs := traces.ResourceSpans().AppendEmpty() + ss := rs.ScopeSpans().AppendEmpty() + ss.Scope().Attributes().PutStr("key", "value1") + spans := ss.Spans() + + span1 := spans.AppendEmpty() + span1.SetName("span1") + span1.SetTraceID([16]byte{1}) + span1.SetSpanID([8]byte{1}) + + ss2 := rs.ScopeSpans().AppendEmpty() + ss2.Scope().Attributes().PutStr("key", "value2") + spans2 := ss2.Spans() + + span2 := spans2.AppendEmpty() + span2.SetName("span1") + span2.SetTraceID([16]byte{1}) + span2.SetSpanID([8]byte{1}) + + return traces + } + + i := input() + adjuster.Adjust(i) + assert.Equal(t, expected(), i) +} + +func TestSpanHash_DuplicateSpansDifferentResourceAttributes(t *testing.T) { + adjuster := SpanHash() + input := func() ptrace.Traces { + traces := ptrace.NewTraces() + rs := traces.ResourceSpans().AppendEmpty() + rs.Resource().Attributes().PutStr("key", "value1") + ss := rs.ScopeSpans().AppendEmpty() + spans := ss.Spans() + + span1 := spans.AppendEmpty() + span1.SetName("span1") + span1.SetTraceID([16]byte{1}) + span1.SetSpanID([8]byte{1}) + + rs2 := traces.ResourceSpans().AppendEmpty() + rs2.Resource().Attributes().PutStr("key", "value2") + ss2 := rs2.ScopeSpans().AppendEmpty() + spans2 := ss2.Spans() + + span2 := spans2.AppendEmpty() + span2.SetName("span1") + span2.SetTraceID([16]byte{1}) + span2.SetSpanID([8]byte{1}) + + return traces + } + expected := func() ptrace.Traces { + traces := ptrace.NewTraces() + rs := traces.ResourceSpans().AppendEmpty() + rs.Resource().Attributes().PutStr("key", "value1") + ss := rs.ScopeSpans().AppendEmpty() + spans := ss.Spans() + + span1 := spans.AppendEmpty() + span1.SetName("span1") + span1.SetTraceID([16]byte{1}) + span1.SetSpanID([8]byte{1}) + + rs2 := traces.ResourceSpans().AppendEmpty() + rs2.Resource().Attributes().PutStr("key", "value2") + ss2 := rs2.ScopeSpans().AppendEmpty() + spans2 := ss2.Spans() + + span2 := spans2.AppendEmpty() + span2.SetName("span1") + span2.SetTraceID([16]byte{1}) + span2.SetSpanID([8]byte{1}) + + return traces + } + + i := input() + adjuster.Adjust(i) + assert.Equal(t, expected(), i) +} + +type errorMarshaler struct{} + +func (*errorMarshaler) MarshalTraces(ptrace.Traces) ([]byte, error) { + return nil, assert.AnError +} + +func TestSpanHash_ErrorInMarshaler(t *testing.T) { + adjuster := SpanHashDeduper{ + marshaler: &errorMarshaler{}, + } + + traces := ptrace.NewTraces() + rs := traces.ResourceSpans().AppendEmpty() + ss := rs.ScopeSpans().AppendEmpty() + spans := ss.Spans() + + span := spans.AppendEmpty() + span.SetName("span1") + + adjuster.Adjust(traces) + + gotSpan := traces.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0) + assert.Equal(t, "span1", gotSpan.Name()) + + warnings := jptrace.GetWarnings(gotSpan) + assert.Len(t, warnings, 1) + assert.Contains(t, warnings[0], "failed to compute hash code") + assert.Contains(t, warnings[0], assert.AnError.Error()) +} diff --git a/cmd/query/app/querysvc/adjuster/sort.go b/cmd/query/app/querysvc/adjuster/sort.go index 6ca367710b5..0ed38af3286 100644 --- a/cmd/query/app/querysvc/adjuster/sort.go +++ b/cmd/query/app/querysvc/adjuster/sort.go @@ -14,9 +14,11 @@ var _ Adjuster = (*SortAttributesAndEventsAdjuster)(nil) // SortAttributesAndEvents creates an adjuster that standardizes trace data by sorting elements: // - Resource attributes are sorted lexicographically by their keys. +// - Scope attributes are sorted lexicographically by their keys. // - Span attributes are sorted lexicographically by their keys. // - Span events are sorted lexicographically by their names. // - Attributes within each span event are sorted lexicographically by their keys. +// - Attributes within each span link are sorted lexicographically by their keys. func SortAttributesAndEvents() SortAttributesAndEventsAdjuster { return SortAttributesAndEventsAdjuster{} }