Skip to content

Commit

Permalink
Improve telemetry.Settings (#6275)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
- Part of #5633

## Description of the changes
- Rename `telemetry.Setting` to `telemetry.Settings`
- Create helpers `NoopSettings()` and `FromOtelComponent()`
- Remove `LeveledMeterProvider` which is deprecated in OTEL
- Use `telset` in more places
- Pull out `Initialize()` from `storage.Factory`

## How was this change tested?
- CI

## Checklist
- [ ] I have read
https://github.com/jaegertracing/jaeger/blob/master/CONTRIBUTING_GUIDELINES.md
- [ ] I have signed all commits
- [ ] I have added unit tests for the new functionality
- [ ] I have run lint and test steps successfully
  - for `jaeger`: `make lint test`
  - for `jaeger-ui`: `yarn lint` and `yarn test`

---------

Signed-off-by: Yuri Shkuro <[email protected]>
  • Loading branch information
yurishkuro authored Nov 29, 2024
1 parent 0e397f1 commit cedaeaa
Show file tree
Hide file tree
Showing 23 changed files with 382 additions and 241 deletions.
29 changes: 14 additions & 15 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ import (

"github.com/spf13/cobra"
"github.com/spf13/viper"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"
noopmetric "go.opentelemetry.io/otel/metric/noop"
_ "go.uber.org/automaxprocs"
"go.uber.org/zap"

Expand Down Expand Up @@ -95,8 +93,16 @@ by default uses only in-memory database.`,
logger.Fatal("Failed to initialize tracer", zap.Error(err))
}

baseTelset := telemetry.Settings{
Logger: svc.Logger,
TracerProvider: tracer.OTEL,
Metrics: baseFactory,
MeterProvider: noopmetric.NewMeterProvider(),
ReportStatus: telemetry.HCAdapter(svc.HC()),
}

storageFactory.InitFromViper(v, logger)
if err := storageFactory.Initialize(baseFactory, logger); err != nil {
if err := storageFactory.Initialize(baseTelset.Metrics, baseTelset.Logger); err != nil {
logger.Fatal("Failed to init storage factory", zap.Error(err))
}

Expand Down Expand Up @@ -159,20 +165,13 @@ by default uses only in-memory database.`,
log.Fatal(err)
}

telset := telemetry.Setting{
Logger: svc.Logger,
TracerProvider: tracer.OTEL,
Metrics: queryMetricsFactory,
ReportStatus: telemetry.HCAdapter(svc.HC()),
LeveledMeterProvider: func(_ configtelemetry.Level) metric.MeterProvider {
return noop.NewMeterProvider()
},
}
// query
queryTelset := baseTelset // copy
queryTelset.Metrics = queryMetricsFactory
querySrv := startQuery(
svc, qOpts, qOpts.BuildQueryServiceOptions(storageFactory, logger),
spanReader, dependencyReader, metricsQueryService,
tm, telset,
tm, queryTelset,
)

svc.RunAndThen(func() {
Expand Down Expand Up @@ -222,7 +221,7 @@ func startQuery(
depReader dependencystore.Reader,
metricsQueryService querysvc.MetricsQueryService,
tm *tenancy.Manager,
telset telemetry.Setting,
telset telemetry.Settings,
) *queryApp.Server {
spanReader = spanstoremetrics.NewReaderDecorator(spanReader, telset.Metrics)
qs := querysvc.NewQueryService(spanReader, depReader, *queryOpts)
Expand Down
7 changes: 6 additions & 1 deletion cmd/collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/jaegertracing/jaeger/cmd/internal/status"
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/telemetry"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/pkg/version"
ss "github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider"
Expand Down Expand Up @@ -63,8 +64,12 @@ func main() {
metricsFactory := baseFactory.Namespace(metrics.NSOptions{Name: "collector"})
version.NewInfoMetrics(metricsFactory)

baseTelset := telemetry.NoopSettings()
baseTelset.Logger = svc.Logger
baseTelset.Metrics = baseFactory

storageFactory.InitFromViper(v, logger)
if err := storageFactory.Initialize(baseFactory, logger); err != nil {
if err := storageFactory.Initialize(baseTelset.Metrics, baseTelset.Logger); err != nil {
logger.Fatal("Failed to init storage factory", zap.Error(err))
}
spanWriter, err := storageFactory.CreateSpanWriter()
Expand Down
7 changes: 6 additions & 1 deletion cmd/ingester/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/jaegertracing/jaeger/cmd/internal/status"
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/telemetry"
"github.com/jaegertracing/jaeger/pkg/version"
"github.com/jaegertracing/jaeger/plugin/storage"
"github.com/jaegertracing/jaeger/ports"
Expand Down Expand Up @@ -50,8 +51,12 @@ func main() {
metricsFactory := baseFactory.Namespace(metrics.NSOptions{Name: "ingester"})
version.NewInfoMetrics(metricsFactory)

baseTelset := telemetry.NoopSettings()
baseTelset.Logger = svc.Logger
baseTelset.Metrics = baseFactory

storageFactory.InitFromViper(v, logger)
if err := storageFactory.Initialize(baseFactory, logger); err != nil {
if err := storageFactory.Initialize(baseTelset.Metrics, baseTelset.Logger); err != nil {
logger.Fatal("Failed to init storage factory", zap.Error(err))
}
spanWriter, err := storageFactory.CreateSpanWriter()
Expand Down
49 changes: 25 additions & 24 deletions cmd/jaeger/internal/extension/jaegerquery/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentstatus"
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/collector/extension/extensioncapabilities"

Expand Down Expand Up @@ -53,9 +52,29 @@ func (*server) Dependencies() []component.ID {
}

func (s *server) Start(ctx context.Context, host component.Host) error {
mf := otelmetrics.NewFactory(s.telset.MeterProvider)
baseFactory := mf.Namespace(metrics.NSOptions{Name: "jaeger"})
queryMetricsFactory := baseFactory.Namespace(metrics.NSOptions{Name: "query"})
// TODO OTel-collector does not initialize the tracer currently
// https://github.com/open-telemetry/opentelemetry-collector/issues/7532
//nolint
tracerProvider, err := jtracer.New("jaeger")
if err != nil {
return fmt.Errorf("could not initialize a tracer: %w", err)
}
// make sure to close the tracer if subsequent code exists with error
success := false
defer func(ctx context.Context) {
if success {
s.closeTracer = tracerProvider.Close
} else {
tracerProvider.Close(ctx)
}
}(ctx)

telset := telemetry.FromOtelComponent(s.telset, host)
telset.TracerProvider = tracerProvider.OTEL
telset.Metrics = telset.Metrics.
Namespace(metrics.NSOptions{Name: "jaeger"}).
Namespace(metrics.NSOptions{Name: "query"})

f, err := jaegerstorage.GetStorageFactory(s.config.Storage.TracesPrimary, host)
if err != nil {
return fmt.Errorf("cannot find primary storage %s: %w", s.config.Storage.TracesPrimary, err)
Expand All @@ -66,7 +85,7 @@ func (s *server) Start(ctx context.Context, host component.Host) error {
return fmt.Errorf("cannot create span reader: %w", err)
}

spanReader = spanstoremetrics.NewReaderDecorator(spanReader, queryMetricsFactory)
spanReader = spanstoremetrics.NewReaderDecorator(spanReader, telset.Metrics)

depReader, err := f.CreateDependencyReader()
if err != nil {
Expand All @@ -86,25 +105,6 @@ func (s *server) Start(ctx context.Context, host component.Host) error {

tm := tenancy.NewManager(&s.config.Tenancy)

// TODO OTel-collector does not initialize the tracer currently
// https://github.com/open-telemetry/opentelemetry-collector/issues/7532
//nolint
tracerProvider, err := jtracer.New("jaeger")
if err != nil {
return fmt.Errorf("could not initialize a tracer: %w", err)
}
s.closeTracer = tracerProvider.Close
telset := telemetry.Setting{
Logger: s.telset.Logger,
TracerProvider: tracerProvider.OTEL,
Metrics: queryMetricsFactory,
ReportStatus: func(event *componentstatus.Event) {
componentstatus.ReportStatus(host, event)
},
LeveledMeterProvider: s.telset.LeveledMeterProvider,
Host: host,
}

s.server, err = queryApp.NewServer(
ctx,
// TODO propagate healthcheck updates up to the collector's runtime
Expand All @@ -122,6 +122,7 @@ func (s *server) Start(ctx context.Context, host component.Host) error {
return fmt.Errorf("could not start jaeger-query: %w", err)
}

success = true
return nil
}

Expand Down
28 changes: 16 additions & 12 deletions cmd/jaeger/internal/extension/jaegerquery/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/otel/metric"
noopmetric "go.opentelemetry.io/otel/metric/noop"
nooptrace "go.opentelemetry.io/otel/trace/noop"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"

Expand Down Expand Up @@ -134,7 +133,7 @@ func TestServerStart(t *testing.T) {
expectedErr string
}{
{
name: "Non-empty config with fake storage host",
name: "Real server with non-empty config",
config: &Config{
Storage: Storage{
TracesArchive: "jaeger_storage",
Expand Down Expand Up @@ -204,15 +203,16 @@ func TestServerStart(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Despite using Noop Tracer below, query service also creates jtracer.
// We want to prevent that tracer from sampling anything in this test.
t.Setenv("OTEL_TRACES_SAMPLER", "always_off")
telemetrySettings := component.TelemetrySettings{
Logger: zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())),
LeveledMeterProvider: func(_ configtelemetry.Level) metric.MeterProvider {
return noopmetric.NewMeterProvider()
},
MeterProvider: noopmetric.NewMeterProvider(),
Logger: zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())),
MeterProvider: noopmetric.NewMeterProvider(),
TracerProvider: nooptrace.NewTracerProvider(),
}
tt.config.HTTP.Endpoint = ":0"
tt.config.GRPC.NetAddr.Endpoint = ":0"
tt.config.HTTP.Endpoint = "localhost:0"
tt.config.GRPC.NetAddr.Endpoint = "localhost:0"
tt.config.GRPC.NetAddr.Transport = confignet.TransportTypeTCP
server := newServer(tt.config, telemetrySettings)
err := server.Start(context.Background(), host)
Expand Down Expand Up @@ -297,7 +297,9 @@ func TestServerAddArchiveStorage(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
logger, buf := testutils.NewLogger()
telemetrySettings := component.TelemetrySettings{
Logger: logger,
Logger: logger,
MeterProvider: noopmetric.NewMeterProvider(),
TracerProvider: nooptrace.NewTracerProvider(),
}
server := newServer(tt.config, telemetrySettings)
if tt.extension != nil {
Expand Down Expand Up @@ -347,7 +349,9 @@ func TestServerAddMetricsStorage(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
logger, buf := testutils.NewLogger()
telemetrySettings := component.TelemetrySettings{
Logger: logger,
Logger: logger,
MeterProvider: noopmetric.NewMeterProvider(),
TracerProvider: nooptrace.NewTracerProvider(),
}
server := newServer(tt.config, telemetrySettings)
if tt.extension != nil {
Expand Down
25 changes: 7 additions & 18 deletions cmd/jaeger/internal/extension/jaegerstorage/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,8 @@ import (
"io"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/extension"
"go.opentelemetry.io/otel/metric"

"github.com/jaegertracing/jaeger/internal/metrics/otelmetrics"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/telemetry"
"github.com/jaegertracing/jaeger/plugin/metrics/prometheus"
Expand Down Expand Up @@ -118,34 +115,26 @@ func newStorageExt(config *Config, telset component.TelemetrySettings) *storageE
}

func (s *storageExt) Start(_ context.Context, host component.Host) error {
baseFactory := otelmetrics.NewFactory(s.telset.MeterProvider)
mf := baseFactory.Namespace(metrics.NSOptions{Name: "jaeger"})
telset := telemetry.FromOtelComponent(s.telset, host)
telset.Metrics = telset.Metrics.Namespace(metrics.NSOptions{Name: "jaeger"})
for storageName, cfg := range s.config.TraceBackends {
s.telset.Logger.Sugar().Infof("Initializing storage '%s'", storageName)
var factory storage.Factory
var err error = errors.New("empty configuration")
switch {
case cfg.Memory != nil:
factory, err = memory.NewFactoryWithConfig(*cfg.Memory, mf, s.telset.Logger), nil
factory, err = memory.NewFactoryWithConfig(*cfg.Memory, telset.Metrics, s.telset.Logger), nil
case cfg.Badger != nil:
factory, err = badger.NewFactoryWithConfig(*cfg.Badger, mf, s.telset.Logger)
factory, err = badger.NewFactoryWithConfig(*cfg.Badger, telset.Metrics, s.telset.Logger)
case cfg.GRPC != nil:
telset := telemetry.Setting{
Logger: s.telset.Logger,
Host: host,
Metrics: mf,
LeveledMeterProvider: func(_ configtelemetry.Level) metric.MeterProvider {
return s.telset.MeterProvider
},
}
//nolint: contextcheck
factory, err = grpc.NewFactoryWithConfig(*cfg.GRPC, telset)
case cfg.Cassandra != nil:
factory, err = cassandra.NewFactoryWithConfig(*cfg.Cassandra, mf, s.telset.Logger)
factory, err = cassandra.NewFactoryWithConfig(*cfg.Cassandra, telset.Metrics, s.telset.Logger)
case cfg.Elasticsearch != nil:
factory, err = es.NewFactoryWithConfig(*cfg.Elasticsearch, mf, s.telset.Logger)
factory, err = es.NewFactoryWithConfig(*cfg.Elasticsearch, telset.Metrics, s.telset.Logger)
case cfg.Opensearch != nil:
factory, err = es.NewFactoryWithConfig(*cfg.Opensearch, mf, s.telset.Logger)
factory, err = es.NewFactoryWithConfig(*cfg.Opensearch, telset.Metrics, s.telset.Logger)
}
if err != nil {
return fmt.Errorf("failed to initialize storage '%s': %w", storageName, err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/query/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (qOpts *QueryOptions) InitFromViper(v *viper.Viper, logger *zap.Logger) (*Q
}

// BuildQueryServiceOptions creates a QueryServiceOptions struct with appropriate adjusters and archive config
func (qOpts *QueryOptions) BuildQueryServiceOptions(storageFactory storage.Factory, logger *zap.Logger) *querysvc.QueryServiceOptions {
func (qOpts *QueryOptions) BuildQueryServiceOptions(storageFactory storage.BaseFactory, logger *zap.Logger) *querysvc.QueryServiceOptions {
opts := &querysvc.QueryServiceOptions{}
if !opts.InitArchiveStorage(storageFactory, logger) {
logger.Info("Archive storage not initialized")
Expand Down
2 changes: 1 addition & 1 deletion cmd/query/app/querysvc/query_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (qs QueryService) GetCapabilities() StorageCapabilities {
}

// InitArchiveStorage tries to initialize archive storage reader/writer if storage factory supports them.
func (opts *QueryServiceOptions) InitArchiveStorage(storageFactory storage.Factory, logger *zap.Logger) bool {
func (opts *QueryServiceOptions) InitArchiveStorage(storageFactory storage.BaseFactory, logger *zap.Logger) bool {
archiveFactory, ok := storageFactory.(storage.ArchiveFactory)
if !ok {
logger.Info("Archive storage not supported by the factory")
Expand Down
Loading

0 comments on commit cedaeaa

Please sign in to comment.