From 4f4518238f6a2f4d7c1500e6d24b05809956eaf8 Mon Sep 17 00:00:00 2001 From: Mahad Zaryab <43658574+mahadzaryab1@users.noreply.github.com> Date: Thu, 5 Dec 2024 21:36:46 -0500 Subject: [PATCH] [v2][storage] Implement read path for v2 storage interface (#6170) ## Which problem is this PR solving? - Towards #5079 ## Description of the changes - Implemented the read path for the v2 storage interface. This path currently just wraps a v1 span reader and exposes a static method to access the v1 reader. - Change the jaeger query extension to initialize a v2 storage factory and obtain the v1 span reader from it. - This will unblock the development of more efficient v2 storage implementations, like ClickHouse. ## How was this change tested? - Added unit tests for new functionality ## Checklist - [x] I have read https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md - [x] I have signed all commits - [x] I have added unit tests for the new functionality - [x] I have run lint and test steps successfully - for `jaeger`: `make lint test` - for `jaeger-ui`: `yarn lint` and `yarn test` --------- Signed-off-by: Mahad Zaryab Signed-off-by: Mahad Zaryab <43658574+mahadzaryab1@users.noreply.github.com> Signed-off-by: Yuri Shkuro Signed-off-by: Yuri Shkuro Co-authored-by: Yuri Shkuro Co-authored-by: Yuri Shkuro --- .mockery.yaml | 3 + cmd/all-in-one/main.go | 12 +- cmd/anonymizer/app/query/query_test.go | 4 +- .../internal/extension/jaegerquery/server.go | 18 +- .../extension/jaegerquery/server_test.go | 4 +- cmd/query/app/apiv3/grpc_handler_test.go | 3 +- cmd/query/app/apiv3/http_gateway_test.go | 3 +- cmd/query/app/grpc_handler_test.go | 5 +- cmd/query/app/http_handler_test.go | 7 +- cmd/query/app/querysvc/query_service.go | 32 ++- cmd/query/app/querysvc/query_service_test.go | 75 ++++++- cmd/query/app/server_test.go | 4 +- cmd/query/app/token_propagation_test.go | 4 +- cmd/query/main.go | 9 +- storage_v2/factoryadapter/factory_test.go | 2 + storage_v2/tracestore/mocks/Factory.go | 130 ++++++++++++ storage_v2/tracestore/mocks/Reader.go | 186 ++++++++++++++++++ storage_v2/tracestore/mocks/Writer.go | 52 +++++ 18 files changed, 520 insertions(+), 33 deletions(-) create mode 100644 storage_v2/tracestore/mocks/Factory.go create mode 100644 storage_v2/tracestore/mocks/Reader.go create mode 100644 storage_v2/tracestore/mocks/Writer.go diff --git a/.mockery.yaml b/.mockery.yaml index c700d121cb2..a0f879a5a35 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -73,3 +73,6 @@ packages: github.com/jaegertracing/jaeger/storage/spanstore: config: all: true + github.com/jaegertracing/jaeger/storage_v2/tracestore: + config: + all: true diff --git a/cmd/all-in-one/main.go b/cmd/all-in-one/main.go index 499e7e6c3f9..9b7b89dfba1 100644 --- a/cmd/all-in-one/main.go +++ b/cmd/all-in-one/main.go @@ -39,7 +39,8 @@ import ( "github.com/jaegertracing/jaeger/plugin/storage" "github.com/jaegertracing/jaeger/ports" "github.com/jaegertracing/jaeger/storage/dependencystore" - "github.com/jaegertracing/jaeger/storage/spanstore" + "github.com/jaegertracing/jaeger/storage_v2/factoryadapter" + "github.com/jaegertracing/jaeger/storage_v2/tracestore" ) // all-in-one/main is a standalone full-stack jaeger backend, backed by a memory store @@ -104,7 +105,8 @@ by default uses only in-memory database.`, logger.Fatal("Failed to init storage factory", zap.Error(err)) } - spanReader, err := storageFactory.CreateSpanReader() + v2Factory := factoryadapter.NewFactory(storageFactory) + traceReader, err := v2Factory.CreateTraceReader() if err != nil { logger.Fatal("Failed to create span reader", zap.Error(err)) } @@ -168,7 +170,7 @@ by default uses only in-memory database.`, queryTelset.Metrics = queryMetricsFactory querySrv := startQuery( svc, qOpts, qOpts.BuildQueryServiceOptions(storageFactory, logger), - spanReader, dependencyReader, metricsQueryService, + traceReader, dependencyReader, metricsQueryService, tm, queryTelset, ) @@ -215,13 +217,13 @@ func startQuery( svc *flags.Service, qOpts *queryApp.QueryOptions, queryOpts *querysvc.QueryServiceOptions, - spanReader spanstore.Reader, + traceReader tracestore.Reader, depReader dependencystore.Reader, metricsQueryService querysvc.MetricsQueryService, tm *tenancy.Manager, telset telemetry.Settings, ) *queryApp.Server { - qs := querysvc.NewQueryService(spanReader, depReader, *queryOpts) + qs := querysvc.NewQueryService(traceReader, depReader, *queryOpts) server, err := queryApp.NewServer(context.Background(), qs, metricsQueryService, qOpts, tm, telset) if err != nil { diff --git a/cmd/anonymizer/app/query/query_test.go b/cmd/anonymizer/app/query/query_test.go index 0606c79a3dc..24352622e54 100644 --- a/cmd/anonymizer/app/query/query_test.go +++ b/cmd/anonymizer/app/query/query_test.go @@ -21,6 +21,7 @@ import ( dependencyStoreMocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks" "github.com/jaegertracing/jaeger/storage/spanstore" spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks" + "github.com/jaegertracing/jaeger/storage_v2/factoryadapter" ) var ( @@ -55,11 +56,12 @@ type testServer struct { func newTestServer(t *testing.T) *testServer { spanReader := &spanstoremocks.Reader{} + traceReader := factoryadapter.NewTraceReader(spanReader) metricsReader, err := disabled.NewMetricsReader() require.NoError(t, err) q := querysvc.NewQueryService( - spanReader, + traceReader, &dependencyStoreMocks.Reader{}, querysvc.QueryServiceOptions{}, ) diff --git a/cmd/jaeger/internal/extension/jaegerquery/server.go b/cmd/jaeger/internal/extension/jaegerquery/server.go index 22930d8c0f2..29f836e9c89 100644 --- a/cmd/jaeger/internal/extension/jaegerquery/server.go +++ b/cmd/jaeger/internal/extension/jaegerquery/server.go @@ -72,26 +72,32 @@ func (s *server) Start(ctx context.Context, host component.Host) error { Namespace(metrics.NSOptions{Name: "jaeger"}). Namespace(metrics.NSOptions{Name: "query"}) - f, err := jaegerstorage.GetStorageFactory(s.config.Storage.TracesPrimary, host) + // TODO currently v1 is still needed because of dependency storage + v1Factory, err := jaegerstorage.GetStorageFactory(s.config.Storage.TracesPrimary, host) if err != nil { - return fmt.Errorf("cannot find primary storage %s: %w", s.config.Storage.TracesPrimary, err) + return fmt.Errorf("cannot find v1 factory for primary storage %s: %w", s.config.Storage.TracesPrimary, err) + } + f, err := jaegerstorage.GetStorageFactoryV2(s.config.Storage.TracesPrimary, host) + if err != nil { + return fmt.Errorf("cannot find v2 factory for primary storage %s: %w", s.config.Storage.TracesPrimary, err) } - spanReader, err := f.CreateSpanReader() + traceReader, err := f.CreateTraceReader() if err != nil { - return fmt.Errorf("cannot create span reader: %w", err) + return fmt.Errorf("cannot create trace reader: %w", err) } - depReader, err := f.CreateDependencyReader() + depReader, err := v1Factory.CreateDependencyReader() if err != nil { return fmt.Errorf("cannot create dependencies reader: %w", err) } var opts querysvc.QueryServiceOptions + // TODO archive storage still uses v1 factory if err := s.addArchiveStorage(&opts, host); err != nil { return err } - qs := querysvc.NewQueryService(spanReader, depReader, opts) + qs := querysvc.NewQueryService(traceReader, depReader, opts) mqs, err := s.createMetricReader(host) if err != nil { diff --git a/cmd/jaeger/internal/extension/jaegerquery/server_test.go b/cmd/jaeger/internal/extension/jaegerquery/server_test.go index 4468fa7aa5e..70e60d78768 100644 --- a/cmd/jaeger/internal/extension/jaegerquery/server_test.go +++ b/cmd/jaeger/internal/extension/jaegerquery/server_test.go @@ -150,7 +150,7 @@ func TestServerStart(t *testing.T) { TracesPrimary: "need-factory-error", }, }, - expectedErr: "cannot find primary storage", + expectedErr: "cannot find v1 factory for primary storage", }, { name: "span reader error", @@ -159,7 +159,7 @@ func TestServerStart(t *testing.T) { TracesPrimary: "need-span-reader-error", }, }, - expectedErr: "cannot create span reader", + expectedErr: "cannot create trace reader", }, { name: "dependency error", diff --git a/cmd/query/app/apiv3/grpc_handler_test.go b/cmd/query/app/apiv3/grpc_handler_test.go index 7aaccb7c5d8..b14f1c01abf 100644 --- a/cmd/query/app/apiv3/grpc_handler_test.go +++ b/cmd/query/app/apiv3/grpc_handler_test.go @@ -23,6 +23,7 @@ import ( dependencyStoreMocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks" "github.com/jaegertracing/jaeger/storage/spanstore" spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks" + "github.com/jaegertracing/jaeger/storage_v2/factoryadapter" ) var ( @@ -57,7 +58,7 @@ func newTestServerClient(t *testing.T) *testServerClient { } q := querysvc.NewQueryService( - tsc.reader, + factoryadapter.NewTraceReader(tsc.reader), &dependencyStoreMocks.Reader{}, querysvc.QueryServiceOptions{}, ) diff --git a/cmd/query/app/apiv3/http_gateway_test.go b/cmd/query/app/apiv3/http_gateway_test.go index 6feaba989c6..5f742a0522f 100644 --- a/cmd/query/app/apiv3/http_gateway_test.go +++ b/cmd/query/app/apiv3/http_gateway_test.go @@ -25,6 +25,7 @@ import ( dependencyStoreMocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks" "github.com/jaegertracing/jaeger/storage/spanstore" spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks" + "github.com/jaegertracing/jaeger/storage_v2/factoryadapter" ) func setupHTTPGatewayNoServer( @@ -35,7 +36,7 @@ func setupHTTPGatewayNoServer( reader: &spanstoremocks.Reader{}, } - q := querysvc.NewQueryService(gw.reader, + q := querysvc.NewQueryService(factoryadapter.NewTraceReader(gw.reader), &dependencyStoreMocks.Reader{}, querysvc.QueryServiceOptions{}, ) diff --git a/cmd/query/app/grpc_handler_test.go b/cmd/query/app/grpc_handler_test.go index cca73efdfa9..55dcf2affa7 100644 --- a/cmd/query/app/grpc_handler_test.go +++ b/cmd/query/app/grpc_handler_test.go @@ -33,6 +33,7 @@ import ( metricsmocks "github.com/jaegertracing/jaeger/storage/metricstore/mocks" "github.com/jaegertracing/jaeger/storage/spanstore" spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks" + "github.com/jaegertracing/jaeger/storage_v2/factoryadapter" ) var ( @@ -901,7 +902,7 @@ func initializeTenantedTestServerGRPCWithOptions(t *testing.T, tm *tenancy.Manag require.NoError(t, err) q := querysvc.NewQueryService( - spanReader, + factoryadapter.NewTraceReader(spanReader), dependencyReader, querysvc.QueryServiceOptions{ ArchiveSpanReader: archiveSpanReader, @@ -1165,7 +1166,7 @@ func TestNewGRPCHandlerWithEmptyOptions(t *testing.T) { require.NoError(t, err) q := querysvc.NewQueryService( - &spanstoremocks.Reader{}, + factoryadapter.NewTraceReader(&spanstoremocks.Reader{}), &depsmocks.Reader{}, querysvc.QueryServiceOptions{}) diff --git a/cmd/query/app/http_handler_test.go b/cmd/query/app/http_handler_test.go index e7b4d43f779..dfa54f6701a 100644 --- a/cmd/query/app/http_handler_test.go +++ b/cmd/query/app/http_handler_test.go @@ -42,6 +42,7 @@ import ( metricsmocks "github.com/jaegertracing/jaeger/storage/metricstore/mocks" "github.com/jaegertracing/jaeger/storage/spanstore" spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks" + "github.com/jaegertracing/jaeger/storage_v2/factoryadapter" ) const millisToNanosMultiplier = int64(time.Millisecond / time.Nanosecond) @@ -119,7 +120,8 @@ func initializeTestServerWithOptions( options = append(options, HandlerOptions.Logger(zaptest.NewLogger(t))) readStorage := &spanstoremocks.Reader{} dependencyStorage := &depsmocks.Reader{} - qs := querysvc.NewQueryService(readStorage, dependencyStorage, queryOptions) + traceReader := factoryadapter.NewTraceReader(readStorage) + qs := querysvc.NewQueryService(traceReader, dependencyStorage, queryOptions) r := NewRouter() apiHandler := NewAPIHandler(qs, options...) apiHandler.RegisterRoutes(r) @@ -198,8 +200,9 @@ func TestLogOnServerError(t *testing.T) { zapCore, logs := observer.New(zap.InfoLevel) logger := zap.New(zapCore) readStorage := &spanstoremocks.Reader{} + traceReader := factoryadapter.NewTraceReader(readStorage) dependencyStorage := &depsmocks.Reader{} - qs := querysvc.NewQueryService(readStorage, dependencyStorage, querysvc.QueryServiceOptions{}) + qs := querysvc.NewQueryService(traceReader, dependencyStorage, querysvc.QueryServiceOptions{}) h := NewAPIHandler(qs, HandlerOptions.Logger(logger)) e := errors.New("test error") h.handleError(&httptest.ResponseRecorder{}, e, http.StatusInternalServerError) diff --git a/cmd/query/app/querysvc/query_service.go b/cmd/query/app/querysvc/query_service.go index ee91b45bb58..f627c3c8344 100644 --- a/cmd/query/app/querysvc/query_service.go +++ b/cmd/query/app/querysvc/query_service.go @@ -15,6 +15,8 @@ import ( "github.com/jaegertracing/jaeger/storage" "github.com/jaegertracing/jaeger/storage/dependencystore" "github.com/jaegertracing/jaeger/storage/spanstore" + "github.com/jaegertracing/jaeger/storage_v2/factoryadapter" + "github.com/jaegertracing/jaeger/storage_v2/tracestore" ) var errNoArchiveSpanStorage = errors.New("archive span storage was not configured") @@ -40,15 +42,15 @@ type StorageCapabilities struct { // QueryService contains span utils required by the query-service. type QueryService struct { - spanReader spanstore.Reader + traceReader tracestore.Reader dependencyReader dependencystore.Reader options QueryServiceOptions } // NewQueryService returns a new QueryService. -func NewQueryService(spanReader spanstore.Reader, dependencyReader dependencystore.Reader, options QueryServiceOptions) *QueryService { +func NewQueryService(traceReader tracestore.Reader, dependencyReader dependencystore.Reader, options QueryServiceOptions) *QueryService { qsvc := &QueryService{ - spanReader: spanReader, + traceReader: traceReader, dependencyReader: dependencyReader, options: options, } @@ -61,7 +63,11 @@ func NewQueryService(spanReader spanstore.Reader, dependencyReader dependencysto // GetTrace is the queryService implementation of spanstore.Reader.GetTrace func (qs QueryService) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) { - trace, err := qs.spanReader.GetTrace(ctx, traceID) + spanReader, err := factoryadapter.GetV1Reader(qs.traceReader) + if err != nil { + return nil, err + } + trace, err := spanReader.GetTrace(ctx, traceID) if errors.Is(err, spanstore.ErrTraceNotFound) { if qs.options.ArchiveSpanReader == nil { return nil, err @@ -73,7 +79,11 @@ func (qs QueryService) GetTrace(ctx context.Context, traceID model.TraceID) (*mo // GetServices is the queryService implementation of spanstore.Reader.GetServices func (qs QueryService) GetServices(ctx context.Context) ([]string, error) { - return qs.spanReader.GetServices(ctx) + spanReader, err := factoryadapter.GetV1Reader(qs.traceReader) + if err != nil { + return nil, err + } + return spanReader.GetServices(ctx) } // GetOperations is the queryService implementation of spanstore.Reader.GetOperations @@ -81,12 +91,20 @@ func (qs QueryService) GetOperations( ctx context.Context, query spanstore.OperationQueryParameters, ) ([]spanstore.Operation, error) { - return qs.spanReader.GetOperations(ctx, query) + spanReader, err := factoryadapter.GetV1Reader(qs.traceReader) + if err != nil { + return nil, err + } + return spanReader.GetOperations(ctx, query) } // FindTraces is the queryService implementation of spanstore.Reader.FindTraces func (qs QueryService) FindTraces(ctx context.Context, query *spanstore.TraceQueryParameters) ([]*model.Trace, error) { - return qs.spanReader.FindTraces(ctx, query) + spanReader, err := factoryadapter.GetV1Reader(qs.traceReader) + if err != nil { + return nil, err + } + return spanReader.FindTraces(ctx, query) } // ArchiveTrace is the queryService utility to archive traces. diff --git a/cmd/query/app/querysvc/query_service_test.go b/cmd/query/app/querysvc/query_service_test.go index 582a4568509..3a9f906d2b5 100644 --- a/cmd/query/app/querysvc/query_service_test.go +++ b/cmd/query/app/querysvc/query_service_test.go @@ -12,6 +12,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/zap" "github.com/jaegertracing/jaeger/model" @@ -23,6 +25,8 @@ import ( depsmocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks" "github.com/jaegertracing/jaeger/storage/spanstore" spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks" + "github.com/jaegertracing/jaeger/storage_v2/factoryadapter" + "github.com/jaegertracing/jaeger/storage_v2/tracestore" ) const millisToNanosMultiplier = int64(time.Millisecond / time.Nanosecond) @@ -87,6 +91,7 @@ func withAdjuster() testOption { func initializeTestService(optionAppliers ...testOption) *testQueryService { readStorage := &spanstoremocks.Reader{} + traceReader := factoryadapter.NewTraceReader(readStorage) dependencyStorage := &depsmocks.Reader{} options := QueryServiceOptions{} @@ -100,10 +105,32 @@ func initializeTestService(optionAppliers ...testOption) *testQueryService { optApplier(&tqs, &options) } - tqs.queryService = NewQueryService(readStorage, dependencyStorage, options) + tqs.queryService = NewQueryService(traceReader, dependencyStorage, options) return &tqs } +type fakeReader struct{} + +func (*fakeReader) GetTrace(_ context.Context, _ pcommon.TraceID) (ptrace.Traces, error) { + panic("not implemented") +} + +func (*fakeReader) GetServices(_ context.Context) ([]string, error) { + panic("not implemented") +} + +func (*fakeReader) GetOperations(_ context.Context, _ tracestore.OperationQueryParameters) ([]tracestore.Operation, error) { + panic("not implemented") +} + +func (*fakeReader) FindTraces(_ context.Context, _ tracestore.TraceQueryParameters) ([]ptrace.Traces, error) { + panic("not implemented") +} + +func (*fakeReader) FindTraceIDs(_ context.Context, _ tracestore.TraceQueryParameters) ([]pcommon.TraceID, error) { + panic("not implemented") +} + // Test QueryService.GetTrace() func TestGetTraceSuccess(t *testing.T) { tqs := initializeTestService() @@ -129,6 +156,15 @@ func TestGetTraceNotFound(t *testing.T) { assert.Equal(t, err, spanstore.ErrTraceNotFound) } +func TestGetTrace_V1ReaderNotFound(t *testing.T) { + fr := &fakeReader{} + qs := QueryService{ + traceReader: fr, + } + _, err := qs.GetTrace(context.Background(), mockTraceID) + require.Error(t, err) +} + // Test QueryService.GetTrace() with ArchiveSpanReader func TestGetTraceFromArchiveStorage(t *testing.T) { tqs := initializeTestService(withArchiveSpanReader()) @@ -157,6 +193,15 @@ func TestGetServices(t *testing.T) { assert.Equal(t, expectedServices, actualServices) } +func TestGetServices_V1ReaderNotFound(t *testing.T) { + fr := &fakeReader{} + qs := QueryService{ + traceReader: fr, + } + _, err := qs.GetServices(context.Background()) + require.Error(t, err) +} + // Test QueryService.GetOperations() for success. func TestGetOperations(t *testing.T) { tqs := initializeTestService() @@ -175,6 +220,16 @@ func TestGetOperations(t *testing.T) { assert.Equal(t, expectedOperations, actualOperations) } +func TestGetOperations_V1ReaderNotFound(t *testing.T) { + fr := &fakeReader{} + qs := QueryService{ + traceReader: fr, + } + operationQuery := spanstore.OperationQueryParameters{ServiceName: "abc/trifle"} + _, err := qs.GetOperations(context.Background(), operationQuery) + require.Error(t, err) +} + // Test QueryService.FindTraces() for success. func TestFindTraces(t *testing.T) { tqs := initializeTestService() @@ -196,6 +251,24 @@ func TestFindTraces(t *testing.T) { assert.Len(t, traces, 1) } +func TestFindTraces_V1ReaderNotFound(t *testing.T) { + fr := &fakeReader{} + qs := QueryService{ + traceReader: fr, + } + duration, err := time.ParseDuration("20ms") + require.NoError(t, err) + params := &spanstore.TraceQueryParameters{ + ServiceName: "service", + OperationName: "operation", + StartTimeMax: time.Now(), + DurationMin: duration, + NumTraces: 200, + } + _, err = qs.FindTraces(context.Background(), params) + require.Error(t, err) +} + // Test QueryService.ArchiveTrace() with no ArchiveSpanWriter. func TestArchiveTraceNoOptions(t *testing.T) { tqs := initializeTestService() diff --git a/cmd/query/app/server_test.go b/cmd/query/app/server_test.go index 3f8e3749d46..51db8aeb239 100644 --- a/cmd/query/app/server_test.go +++ b/cmd/query/app/server_test.go @@ -41,6 +41,7 @@ import ( "github.com/jaegertracing/jaeger/proto-gen/api_v2" depsmocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks" spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks" + "github.com/jaegertracing/jaeger/storage_v2/factoryadapter" ) var testCertKeyLocation = "../../../pkg/config/tlscfg/testdata" @@ -322,10 +323,11 @@ type fakeQueryService struct { func makeQuerySvc() *fakeQueryService { spanReader := &spanstoremocks.Reader{} + traceReader := factoryadapter.NewTraceReader(spanReader) dependencyReader := &depsmocks.Reader{} expectedServices := []string{"test"} spanReader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil) - qs := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{}) + qs := querysvc.NewQueryService(traceReader, dependencyReader, querysvc.QueryServiceOptions{}) return &fakeQueryService{ qs: qs, spanReader: spanReader, diff --git a/cmd/query/app/token_propagation_test.go b/cmd/query/app/token_propagation_test.go index 73edd44cf2d..0a48802f37b 100644 --- a/cmd/query/app/token_propagation_test.go +++ b/cmd/query/app/token_propagation_test.go @@ -27,6 +27,7 @@ import ( "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/plugin/storage/es" "github.com/jaegertracing/jaeger/ports" + "github.com/jaegertracing/jaeger/storage_v2/factoryadapter" ) const ( @@ -85,8 +86,9 @@ func runQueryService(t *testing.T, esURL string) *Server { spanReader, err := f.CreateSpanReader() require.NoError(t, err) + traceReader := factoryadapter.NewTraceReader(spanReader) - querySvc := querysvc.NewQueryService(spanReader, nil, querysvc.QueryServiceOptions{}) + querySvc := querysvc.NewQueryService(traceReader, nil, querysvc.QueryServiceOptions{}) server, err := NewServer(context.Background(), querySvc, nil, &QueryOptions{ BearerTokenPropagation: true, diff --git a/cmd/query/main.go b/cmd/query/main.go index fd1227cae4a..5d32b5db10f 100644 --- a/cmd/query/main.go +++ b/cmd/query/main.go @@ -32,6 +32,7 @@ import ( metricsPlugin "github.com/jaegertracing/jaeger/plugin/metricstore" "github.com/jaegertracing/jaeger/plugin/storage" "github.com/jaegertracing/jaeger/ports" + "github.com/jaegertracing/jaeger/storage_v2/factoryadapter" ) func main() { @@ -89,9 +90,11 @@ func main() { if err := storageFactory.Initialize(baseTelset.Metrics, baseTelset.Logger); err != nil { logger.Fatal("Failed to init storage factory", zap.Error(err)) } - spanReader, err := storageFactory.CreateSpanReader() + + v2Factory := factoryadapter.NewFactory(storageFactory) + traceReader, err := v2Factory.CreateTraceReader() if err != nil { - logger.Fatal("Failed to create span reader", zap.Error(err)) + logger.Fatal("Failed to create trace reader", zap.Error(err)) } dependencyReader, err := storageFactory.CreateDependencyReader() if err != nil { @@ -104,7 +107,7 @@ func main() { } queryServiceOptions := queryOpts.BuildQueryServiceOptions(storageFactory, logger) queryService := querysvc.NewQueryService( - spanReader, + traceReader, dependencyReader, *queryServiceOptions) tm := tenancy.NewManager(&queryOpts.Tenancy) diff --git a/storage_v2/factoryadapter/factory_test.go b/storage_v2/factoryadapter/factory_test.go index 146f89ba453..a8e2819ce0f 100644 --- a/storage_v2/factoryadapter/factory_test.go +++ b/storage_v2/factoryadapter/factory_test.go @@ -39,6 +39,7 @@ func TestAdapterClose(t *testing.T) { func TestAdapterCreateTraceReader(t *testing.T) { f1 := new(factoryMocks.Factory) f1.On("CreateSpanReader").Return(new(spanstoreMocks.Reader), nil) + f := NewFactory(f1) _, err := f.CreateTraceReader() require.NoError(t, err) @@ -47,6 +48,7 @@ func TestAdapterCreateTraceReader(t *testing.T) { func TestAdapterCreateTraceReaderError(t *testing.T) { f1 := new(factoryMocks.Factory) f1.On("CreateSpanReader").Return(nil, errors.New("mock error")) + f := NewFactory(f1) _, err := f.CreateTraceReader() require.ErrorContains(t, err, "mock error") diff --git a/storage_v2/tracestore/mocks/Factory.go b/storage_v2/tracestore/mocks/Factory.go new file mode 100644 index 00000000000..922d57ae2c4 --- /dev/null +++ b/storage_v2/tracestore/mocks/Factory.go @@ -0,0 +1,130 @@ +// Copyright (c) The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 +// +// Run 'make generate-mocks' to regenerate. + +// Code generated by mockery. DO NOT EDIT. + +package mocks + +import ( + context "context" + + tracestore "github.com/jaegertracing/jaeger/storage_v2/tracestore" + mock "github.com/stretchr/testify/mock" +) + +// Factory is an autogenerated mock type for the Factory type +type Factory struct { + mock.Mock +} + +// Close provides a mock function with given fields: ctx +func (_m *Factory) Close(ctx context.Context) error { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for Close") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// CreateTraceReader provides a mock function with no fields +func (_m *Factory) CreateTraceReader() (tracestore.Reader, error) { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for CreateTraceReader") + } + + var r0 tracestore.Reader + var r1 error + if rf, ok := ret.Get(0).(func() (tracestore.Reader, error)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() tracestore.Reader); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(tracestore.Reader) + } + } + + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// CreateTraceWriter provides a mock function with no fields +func (_m *Factory) CreateTraceWriter() (tracestore.Writer, error) { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for CreateTraceWriter") + } + + var r0 tracestore.Writer + var r1 error + if rf, ok := ret.Get(0).(func() (tracestore.Writer, error)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() tracestore.Writer); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(tracestore.Writer) + } + } + + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Initialize provides a mock function with given fields: ctx +func (_m *Factory) Initialize(ctx context.Context) error { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for Initialize") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewFactory creates a new instance of Factory. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewFactory(t interface { + mock.TestingT + Cleanup(func()) +}) *Factory { + mock := &Factory{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/storage_v2/tracestore/mocks/Reader.go b/storage_v2/tracestore/mocks/Reader.go new file mode 100644 index 00000000000..a8ef4b103c0 --- /dev/null +++ b/storage_v2/tracestore/mocks/Reader.go @@ -0,0 +1,186 @@ +// Copyright (c) The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 +// +// Run 'make generate-mocks' to regenerate. + +// Code generated by mockery. DO NOT EDIT. + +package mocks + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" + pcommon "go.opentelemetry.io/collector/pdata/pcommon" + + ptrace "go.opentelemetry.io/collector/pdata/ptrace" + + tracestore "github.com/jaegertracing/jaeger/storage_v2/tracestore" +) + +// Reader is an autogenerated mock type for the Reader type +type Reader struct { + mock.Mock +} + +// FindTraceIDs provides a mock function with given fields: ctx, query +func (_m *Reader) FindTraceIDs(ctx context.Context, query tracestore.TraceQueryParameters) ([]pcommon.TraceID, error) { + ret := _m.Called(ctx, query) + + if len(ret) == 0 { + panic("no return value specified for FindTraceIDs") + } + + var r0 []pcommon.TraceID + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, tracestore.TraceQueryParameters) ([]pcommon.TraceID, error)); ok { + return rf(ctx, query) + } + if rf, ok := ret.Get(0).(func(context.Context, tracestore.TraceQueryParameters) []pcommon.TraceID); ok { + r0 = rf(ctx, query) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]pcommon.TraceID) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, tracestore.TraceQueryParameters) error); ok { + r1 = rf(ctx, query) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// FindTraces provides a mock function with given fields: ctx, query +func (_m *Reader) FindTraces(ctx context.Context, query tracestore.TraceQueryParameters) ([]ptrace.Traces, error) { + ret := _m.Called(ctx, query) + + if len(ret) == 0 { + panic("no return value specified for FindTraces") + } + + var r0 []ptrace.Traces + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, tracestore.TraceQueryParameters) ([]ptrace.Traces, error)); ok { + return rf(ctx, query) + } + if rf, ok := ret.Get(0).(func(context.Context, tracestore.TraceQueryParameters) []ptrace.Traces); ok { + r0 = rf(ctx, query) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]ptrace.Traces) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, tracestore.TraceQueryParameters) error); ok { + r1 = rf(ctx, query) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetOperations provides a mock function with given fields: ctx, query +func (_m *Reader) GetOperations(ctx context.Context, query tracestore.OperationQueryParameters) ([]tracestore.Operation, error) { + ret := _m.Called(ctx, query) + + if len(ret) == 0 { + panic("no return value specified for GetOperations") + } + + var r0 []tracestore.Operation + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, tracestore.OperationQueryParameters) ([]tracestore.Operation, error)); ok { + return rf(ctx, query) + } + if rf, ok := ret.Get(0).(func(context.Context, tracestore.OperationQueryParameters) []tracestore.Operation); ok { + r0 = rf(ctx, query) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]tracestore.Operation) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, tracestore.OperationQueryParameters) error); ok { + r1 = rf(ctx, query) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetServices provides a mock function with given fields: ctx +func (_m *Reader) GetServices(ctx context.Context) ([]string, error) { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for GetServices") + } + + var r0 []string + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) ([]string, error)); ok { + return rf(ctx) + } + if rf, ok := ret.Get(0).(func(context.Context) []string); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]string) + } + } + + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetTrace provides a mock function with given fields: ctx, traceID +func (_m *Reader) GetTrace(ctx context.Context, traceID pcommon.TraceID) (ptrace.Traces, error) { + ret := _m.Called(ctx, traceID) + + if len(ret) == 0 { + panic("no return value specified for GetTrace") + } + + var r0 ptrace.Traces + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, pcommon.TraceID) (ptrace.Traces, error)); ok { + return rf(ctx, traceID) + } + if rf, ok := ret.Get(0).(func(context.Context, pcommon.TraceID) ptrace.Traces); ok { + r0 = rf(ctx, traceID) + } else { + r0 = ret.Get(0).(ptrace.Traces) + } + + if rf, ok := ret.Get(1).(func(context.Context, pcommon.TraceID) error); ok { + r1 = rf(ctx, traceID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// NewReader creates a new instance of Reader. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewReader(t interface { + mock.TestingT + Cleanup(func()) +}) *Reader { + mock := &Reader{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/storage_v2/tracestore/mocks/Writer.go b/storage_v2/tracestore/mocks/Writer.go new file mode 100644 index 00000000000..a827057b26f --- /dev/null +++ b/storage_v2/tracestore/mocks/Writer.go @@ -0,0 +1,52 @@ +// Copyright (c) The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 +// +// Run 'make generate-mocks' to regenerate. + +// Code generated by mockery. DO NOT EDIT. + +package mocks + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" + ptrace "go.opentelemetry.io/collector/pdata/ptrace" +) + +// Writer is an autogenerated mock type for the Writer type +type Writer struct { + mock.Mock +} + +// WriteTraces provides a mock function with given fields: ctx, td +func (_m *Writer) WriteTraces(ctx context.Context, td ptrace.Traces) error { + ret := _m.Called(ctx, td) + + if len(ret) == 0 { + panic("no return value specified for WriteTraces") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, ptrace.Traces) error); ok { + r0 = rf(ctx, td) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewWriter creates a new instance of Writer. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewWriter(t interface { + mock.TestingT + Cleanup(func()) +}) *Writer { + mock := &Writer{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +}