Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve telemetry.Settings #6275

Merged
merged 19 commits into from
Nov 29, 2024
29 changes: 14 additions & 15 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ import (

"github.com/spf13/cobra"
"github.com/spf13/viper"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
noopmetric "go.opentelemetry.io/otel/metric/noop"
_ "go.uber.org/automaxprocs"
"go.uber.org/zap"

Expand Down Expand Up @@ -95,8 +93,16 @@ by default uses only in-memory database.`,
logger.Fatal("Failed to initialize tracer", zap.Error(err))
}

baseTelset := telemetry.Settings{
Logger: svc.Logger,
TracerProvider: tracer.OTEL,
Metrics: baseFactory,
MeterProvider: noopmetric.NewMeterProvider(),
ReportStatus: telemetry.HCAdapter(svc.HC()),
}

storageFactory.InitFromViper(v, logger)
if err := storageFactory.Initialize(baseFactory, logger); err != nil {
if err := storageFactory.Initialize(baseTelset.Metrics, baseTelset.Logger); err != nil {
logger.Fatal("Failed to init storage factory", zap.Error(err))
}

Expand Down Expand Up @@ -159,20 +165,13 @@ by default uses only in-memory database.`,
log.Fatal(err)
}

telset := telemetry.Setting{
Logger: svc.Logger,
TracerProvider: tracer.OTEL,
Metrics: queryMetricsFactory,
ReportStatus: telemetry.HCAdapter(svc.HC()),
LeveledMeterProvider: func(_ configtelemetry.Level) metric.MeterProvider {
return noop.NewMeterProvider()
Comment on lines -167 to -168
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be replaced with a meter provider? or do we not need to pass anything here because its noop?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added noop to L100, but afaik we don't have a way to go from internal metrics.Factory to OTEL API. We would need to extend the Service (cmd/internal/flags) to initialize OTEL SDK. Main reason I didn't push for that before is that OTEL SDK performance sucks compared to Prometheus SDK, since OTEL does not support bound instrument and we're using them everywhere.

},
}
// query
queryTelset := baseTelset // copy
queryTelset.Metrics = queryMetricsFactory
querySrv := startQuery(
svc, qOpts, qOpts.BuildQueryServiceOptions(storageFactory, logger),
spanReader, dependencyReader, metricsQueryService,
tm, telset,
tm, queryTelset,
)

svc.RunAndThen(func() {
Expand Down Expand Up @@ -222,7 +221,7 @@ func startQuery(
depReader dependencystore.Reader,
metricsQueryService querysvc.MetricsQueryService,
tm *tenancy.Manager,
telset telemetry.Setting,
telset telemetry.Settings,
) *queryApp.Server {
spanReader = spanstoremetrics.NewReaderDecorator(spanReader, telset.Metrics)
qs := querysvc.NewQueryService(spanReader, depReader, *queryOpts)
Expand Down
7 changes: 6 additions & 1 deletion cmd/collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/jaegertracing/jaeger/cmd/internal/status"
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/telemetry"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/pkg/version"
ss "github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider"
Expand Down Expand Up @@ -63,8 +64,12 @@ func main() {
metricsFactory := baseFactory.Namespace(metrics.NSOptions{Name: "collector"})
version.NewInfoMetrics(metricsFactory)

baseTelset := telemetry.NoopSettings()
baseTelset.Logger = svc.Logger
baseTelset.Metrics = baseFactory

storageFactory.InitFromViper(v, logger)
if err := storageFactory.Initialize(baseFactory, logger); err != nil {
if err := storageFactory.Initialize(baseTelset.Metrics, baseTelset.Logger); err != nil {
logger.Fatal("Failed to init storage factory", zap.Error(err))
}
spanWriter, err := storageFactory.CreateSpanWriter()
Expand Down
7 changes: 6 additions & 1 deletion cmd/ingester/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/jaegertracing/jaeger/cmd/internal/status"
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/telemetry"
"github.com/jaegertracing/jaeger/pkg/version"
"github.com/jaegertracing/jaeger/plugin/storage"
"github.com/jaegertracing/jaeger/ports"
Expand Down Expand Up @@ -50,8 +51,12 @@ func main() {
metricsFactory := baseFactory.Namespace(metrics.NSOptions{Name: "ingester"})
version.NewInfoMetrics(metricsFactory)

baseTelset := telemetry.NoopSettings()
baseTelset.Logger = svc.Logger
baseTelset.Metrics = baseFactory

storageFactory.InitFromViper(v, logger)
if err := storageFactory.Initialize(baseFactory, logger); err != nil {
if err := storageFactory.Initialize(baseTelset.Metrics, baseTelset.Logger); err != nil {
logger.Fatal("Failed to init storage factory", zap.Error(err))
}
spanWriter, err := storageFactory.CreateSpanWriter()
Expand Down
49 changes: 25 additions & 24 deletions cmd/jaeger/internal/extension/jaegerquery/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentstatus"
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/extension/extensioncapabilities"

Expand Down Expand Up @@ -53,9 +52,29 @@
}

func (s *server) Start(ctx context.Context, host component.Host) error {
mf := otelmetrics.NewFactory(s.telset.MeterProvider)
baseFactory := mf.Namespace(metrics.NSOptions{Name: "jaeger"})
queryMetricsFactory := baseFactory.Namespace(metrics.NSOptions{Name: "query"})
// TODO OTel-collector does not initialize the tracer currently
// https://github.com/open-telemetry/opentelemetry-collector/issues/7532
//nolint
tracerProvider, err := jtracer.New("jaeger")
if err != nil {
return fmt.Errorf("could not initialize a tracer: %w", err)
}

Check warning on line 61 in cmd/jaeger/internal/extension/jaegerquery/server.go

View check run for this annotation

Codecov / codecov/patch

cmd/jaeger/internal/extension/jaegerquery/server.go#L60-L61

Added lines #L60 - L61 were not covered by tests
// make sure to close the tracer if subsequent code exists with error
success := false
defer func(ctx context.Context) {
if success {
s.closeTracer = tracerProvider.Close
} else {
tracerProvider.Close(ctx)
}
}(ctx)

telset := telemetry.FromOtelComponent(s.telset, host)
telset.TracerProvider = tracerProvider.OTEL
telset.Metrics = telset.Metrics.
Namespace(metrics.NSOptions{Name: "jaeger"}).
Namespace(metrics.NSOptions{Name: "query"})

f, err := jaegerstorage.GetStorageFactory(s.config.Storage.TracesPrimary, host)
if err != nil {
return fmt.Errorf("cannot find primary storage %s: %w", s.config.Storage.TracesPrimary, err)
Expand All @@ -66,7 +85,7 @@
return fmt.Errorf("cannot create span reader: %w", err)
}

spanReader = spanstoremetrics.NewReaderDecorator(spanReader, queryMetricsFactory)
spanReader = spanstoremetrics.NewReaderDecorator(spanReader, telset.Metrics)

depReader, err := f.CreateDependencyReader()
if err != nil {
Expand All @@ -86,25 +105,6 @@

tm := tenancy.NewManager(&s.config.Tenancy)

// TODO OTel-collector does not initialize the tracer currently
// https://github.com/open-telemetry/opentelemetry-collector/issues/7532
//nolint
tracerProvider, err := jtracer.New("jaeger")
if err != nil {
return fmt.Errorf("could not initialize a tracer: %w", err)
}
s.closeTracer = tracerProvider.Close
telset := telemetry.Setting{
Logger: s.telset.Logger,
TracerProvider: tracerProvider.OTEL,
Metrics: queryMetricsFactory,
ReportStatus: func(event *componentstatus.Event) {
componentstatus.ReportStatus(host, event)
},
LeveledMeterProvider: s.telset.LeveledMeterProvider,
Host: host,
}

s.server, err = queryApp.NewServer(
ctx,
// TODO propagate healthcheck updates up to the collector's runtime
Expand All @@ -122,6 +122,7 @@
return fmt.Errorf("could not start jaeger-query: %w", err)
}

success = true
return nil
}

Expand Down
28 changes: 16 additions & 12 deletions cmd/jaeger/internal/extension/jaegerquery/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/otel/metric"
noopmetric "go.opentelemetry.io/otel/metric/noop"
nooptrace "go.opentelemetry.io/otel/trace/noop"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"

Expand Down Expand Up @@ -134,7 +133,7 @@ func TestServerStart(t *testing.T) {
expectedErr string
}{
{
name: "Non-empty config with fake storage host",
name: "Real server with non-empty config",
config: &Config{
Storage: Storage{
TracesArchive: "jaeger_storage",
Expand Down Expand Up @@ -204,15 +203,16 @@ func TestServerStart(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Despite using Noop Tracer below, query service also creates jtracer.
// We want to prevent that tracer from sampling anything in this test.
t.Setenv("OTEL_TRACES_SAMPLER", "always_off")
telemetrySettings := component.TelemetrySettings{
Logger: zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())),
LeveledMeterProvider: func(_ configtelemetry.Level) metric.MeterProvider {
return noopmetric.NewMeterProvider()
},
MeterProvider: noopmetric.NewMeterProvider(),
Logger: zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())),
MeterProvider: noopmetric.NewMeterProvider(),
TracerProvider: nooptrace.NewTracerProvider(),
}
tt.config.HTTP.Endpoint = ":0"
tt.config.GRPC.NetAddr.Endpoint = ":0"
tt.config.HTTP.Endpoint = "localhost:0"
tt.config.GRPC.NetAddr.Endpoint = "localhost:0"
tt.config.GRPC.NetAddr.Transport = confignet.TransportTypeTCP
server := newServer(tt.config, telemetrySettings)
err := server.Start(context.Background(), host)
Expand Down Expand Up @@ -297,7 +297,9 @@ func TestServerAddArchiveStorage(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
logger, buf := testutils.NewLogger()
telemetrySettings := component.TelemetrySettings{
Logger: logger,
Logger: logger,
MeterProvider: noopmetric.NewMeterProvider(),
TracerProvider: nooptrace.NewTracerProvider(),
}
server := newServer(tt.config, telemetrySettings)
if tt.extension != nil {
Expand Down Expand Up @@ -347,7 +349,9 @@ func TestServerAddMetricsStorage(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
logger, buf := testutils.NewLogger()
telemetrySettings := component.TelemetrySettings{
Logger: logger,
Logger: logger,
MeterProvider: noopmetric.NewMeterProvider(),
TracerProvider: nooptrace.NewTracerProvider(),
}
server := newServer(tt.config, telemetrySettings)
if tt.extension != nil {
Expand Down
25 changes: 7 additions & 18 deletions cmd/jaeger/internal/extension/jaegerstorage/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,8 @@ import (
"io"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/otel/metric"

"github.com/jaegertracing/jaeger/internal/metrics/otelmetrics"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/telemetry"
"github.com/jaegertracing/jaeger/plugin/metrics/prometheus"
Expand Down Expand Up @@ -118,34 +115,26 @@ func newStorageExt(config *Config, telset component.TelemetrySettings) *storageE
}

func (s *storageExt) Start(_ context.Context, host component.Host) error {
baseFactory := otelmetrics.NewFactory(s.telset.MeterProvider)
mf := baseFactory.Namespace(metrics.NSOptions{Name: "jaeger"})
telset := telemetry.FromOtelComponent(s.telset, host)
telset.Metrics = telset.Metrics.Namespace(metrics.NSOptions{Name: "jaeger"})
for storageName, cfg := range s.config.TraceBackends {
s.telset.Logger.Sugar().Infof("Initializing storage '%s'", storageName)
var factory storage.Factory
var err error = errors.New("empty configuration")
switch {
case cfg.Memory != nil:
factory, err = memory.NewFactoryWithConfig(*cfg.Memory, mf, s.telset.Logger), nil
factory, err = memory.NewFactoryWithConfig(*cfg.Memory, telset.Metrics, s.telset.Logger), nil
case cfg.Badger != nil:
factory, err = badger.NewFactoryWithConfig(*cfg.Badger, mf, s.telset.Logger)
factory, err = badger.NewFactoryWithConfig(*cfg.Badger, telset.Metrics, s.telset.Logger)
case cfg.GRPC != nil:
telset := telemetry.Setting{
Logger: s.telset.Logger,
Host: host,
Metrics: mf,
LeveledMeterProvider: func(_ configtelemetry.Level) metric.MeterProvider {
return s.telset.MeterProvider
},
}
//nolint: contextcheck
factory, err = grpc.NewFactoryWithConfig(*cfg.GRPC, telset)
case cfg.Cassandra != nil:
factory, err = cassandra.NewFactoryWithConfig(*cfg.Cassandra, mf, s.telset.Logger)
factory, err = cassandra.NewFactoryWithConfig(*cfg.Cassandra, telset.Metrics, s.telset.Logger)
case cfg.Elasticsearch != nil:
factory, err = es.NewFactoryWithConfig(*cfg.Elasticsearch, mf, s.telset.Logger)
factory, err = es.NewFactoryWithConfig(*cfg.Elasticsearch, telset.Metrics, s.telset.Logger)
case cfg.Opensearch != nil:
factory, err = es.NewFactoryWithConfig(*cfg.Opensearch, mf, s.telset.Logger)
factory, err = es.NewFactoryWithConfig(*cfg.Opensearch, telset.Metrics, s.telset.Logger)
}
if err != nil {
return fmt.Errorf("failed to initialize storage '%s': %w", storageName, err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/query/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (qOpts *QueryOptions) InitFromViper(v *viper.Viper, logger *zap.Logger) (*Q
}

// BuildQueryServiceOptions creates a QueryServiceOptions struct with appropriate adjusters and archive config
func (qOpts *QueryOptions) BuildQueryServiceOptions(storageFactory storage.Factory, logger *zap.Logger) *querysvc.QueryServiceOptions {
func (qOpts *QueryOptions) BuildQueryServiceOptions(storageFactory storage.BaseFactory, logger *zap.Logger) *querysvc.QueryServiceOptions {
opts := &querysvc.QueryServiceOptions{}
if !opts.InitArchiveStorage(storageFactory, logger) {
logger.Info("Archive storage not initialized")
Expand Down
2 changes: 1 addition & 1 deletion cmd/query/app/querysvc/query_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (qs QueryService) GetCapabilities() StorageCapabilities {
}

// InitArchiveStorage tries to initialize archive storage reader/writer if storage factory supports them.
func (opts *QueryServiceOptions) InitArchiveStorage(storageFactory storage.Factory, logger *zap.Logger) bool {
func (opts *QueryServiceOptions) InitArchiveStorage(storageFactory storage.BaseFactory, logger *zap.Logger) bool {
archiveFactory, ok := storageFactory.(storage.ArchiveFactory)
if !ok {
logger.Info("Archive storage not supported by the factory")
Expand Down
Loading
Loading