Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[metrics][storage] Move metrics reader decorator to metrics storage factory #6287

Merged
merged 10 commits into from
Dec 1, 2024
13 changes: 5 additions & 8 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"github.com/jaegertracing/jaeger/plugin/storage"
"github.com/jaegertracing/jaeger/ports"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/metricsstore/metricstoremetrics"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

Expand Down Expand Up @@ -118,7 +117,7 @@ by default uses only in-memory database.`,
logger.Fatal("Failed to create dependency reader", zap.Error(err))
}

metricsQueryService, err := createMetricsQueryService(metricsReaderFactory, v, logger, queryMetricsFactory)
metricsQueryService, err := createMetricsQueryService(metricsReaderFactory, v, baseTelset)
if err != nil {
logger.Fatal("Failed to create metrics reader", zap.Error(err))
}
Expand Down Expand Up @@ -238,20 +237,18 @@ func startQuery(
func createMetricsQueryService(
metricsReaderFactory *metricsPlugin.Factory,
v *viper.Viper,
logger *zap.Logger,
metricsReaderMetricsFactory metrics.Factory,
telset telemetry.Settings,
) (querysvc.MetricsQueryService, error) {
if err := metricsReaderFactory.Initialize(logger); err != nil {
if err := metricsReaderFactory.Initialize(telset); err != nil {
return nil, fmt.Errorf("failed to init metrics reader factory: %w", err)
}

// Ensure default parameter values are loaded correctly.
metricsReaderFactory.InitFromViper(v, logger)
metricsReaderFactory.InitFromViper(v, telset.Logger)
reader, err := metricsReaderFactory.CreateMetricsReader()
if err != nil {
return nil, fmt.Errorf("failed to create metrics reader: %w", err)
}

// Decorate the metrics reader with metrics instrumentation.
return metricstoremetrics.NewReaderDecorator(reader, metricsReaderMetricsFactory), nil
return reader, nil
}
7 changes: 1 addition & 6 deletions cmd/jaeger/internal/extension/jaegerquery/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,12 @@ import (
"github.com/jaegertracing/jaeger/cmd/jaeger/internal/extension/jaegerstorage"
queryApp "github.com/jaegertracing/jaeger/cmd/query/app"
"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/internal/metrics/otelmetrics"
"github.com/jaegertracing/jaeger/pkg/jtracer"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/telemetry"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/plugin/metrics/disabled"
"github.com/jaegertracing/jaeger/storage/metricsstore"
"github.com/jaegertracing/jaeger/storage/metricsstore/metricstoremetrics"
)

var (
Expand Down Expand Up @@ -156,10 +154,7 @@ func (s *server) createMetricReader(host component.Host) (metricsstore.Reader, e
return nil, fmt.Errorf("cannot create metrics reader %w", err)
}

// Decorate the metrics reader with metrics instrumentation.
mf := otelmetrics.NewFactory(s.telset.MeterProvider)
mf = mf.Namespace(metrics.NSOptions{Name: "jaeger_metricstore"})
return metricstoremetrics.NewReaderDecorator(metricsReader, mf), nil
return metricsReader, nil
}

func (s *server) Shutdown(ctx context.Context) error {
Expand Down
3 changes: 2 additions & 1 deletion cmd/jaeger/internal/extension/jaegerquery/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/internal/grpctest"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/telemetry"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/dependencystore"
Expand Down Expand Up @@ -73,7 +74,7 @@ type fakeMetricsFactory struct {
}

// Initialize implements storage.MetricsFactory.
func (fmf fakeMetricsFactory) Initialize(*zap.Logger) error {
func (fmf fakeMetricsFactory) Initialize(telemetry.Settings) error {
if fmf.name == "need-initialize-error" {
return errors.New("test-error")
}
Expand Down
21 changes: 13 additions & 8 deletions cmd/jaeger/internal/extension/jaegerstorage/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,13 @@ func newStorageExt(config *Config, telset component.TelemetrySettings) *storageE
func (s *storageExt) Start(_ context.Context, host component.Host) error {
telset := telemetry.FromOtelComponent(s.telset, host)
telset.Metrics = telset.Metrics.Namespace(metrics.NSOptions{Name: "jaeger"})
getMetricsFactory := func(name, kind string) metrics.Factory {
scopedMetricsFactory := func(name, kind, role string) metrics.Factory {
return telset.Metrics.Namespace(metrics.NSOptions{
Name: "storage",
Tags: map[string]string{
"name": name,
"kind": kind,
"role": role,
},
})
}
Expand All @@ -134,35 +135,35 @@ func (s *storageExt) Start(_ context.Context, host component.Host) error {
case cfg.Memory != nil:
factory, err = memory.NewFactoryWithConfig(
*cfg.Memory,
getMetricsFactory(storageName, "memory"),
scopedMetricsFactory(storageName, "memory", "tracestore"),
s.telset.Logger,
), nil
case cfg.Badger != nil:
factory, err = badger.NewFactoryWithConfig(
*cfg.Badger,
getMetricsFactory(storageName, "badger"),
scopedMetricsFactory(storageName, "badger", "tracestore"),
s.telset.Logger)
case cfg.GRPC != nil:
grpcTelset := telset
grpcTelset.Metrics = getMetricsFactory(storageName, "grpc")
grpcTelset.Metrics = scopedMetricsFactory(storageName, "grpc", "tracestore")
//nolint: contextcheck
factory, err = grpc.NewFactoryWithConfig(*cfg.GRPC, grpcTelset)
case cfg.Cassandra != nil:
factory, err = cassandra.NewFactoryWithConfig(
*cfg.Cassandra,
getMetricsFactory(storageName, "cassandra"),
scopedMetricsFactory(storageName, "cassandra", "tracestore"),
s.telset.Logger,
)
case cfg.Elasticsearch != nil:
factory, err = es.NewFactoryWithConfig(
*cfg.Elasticsearch,
getMetricsFactory(storageName, "elasticsearch"),
scopedMetricsFactory(storageName, "elasticsearch", "tracestore"),
s.telset.Logger,
)
case cfg.Opensearch != nil:
factory, err = es.NewFactoryWithConfig(
*cfg.Opensearch,
getMetricsFactory(storageName, "opensearch"),
scopedMetricsFactory(storageName, "opensearch", "tracestore"),
s.telset.Logger,
)
}
Expand All @@ -177,7 +178,11 @@ func (s *storageExt) Start(_ context.Context, host component.Host) error {
var metricsFactory storage.MetricsFactory
var err error
if cfg.Prometheus != nil {
metricsFactory, err = prometheus.NewFactoryWithConfig(*cfg.Prometheus, s.telset.Logger)
promTelset := telset
promTelset.Metrics = scopedMetricsFactory(metricStorageName, "prometheus", "metricstore")
metricsFactory, err = prometheus.NewFactoryWithConfig(
*cfg.Prometheus,
promTelset)
}
if err != nil {
return fmt.Errorf("failed to initialize metrics storage '%s': %w", metricStorageName, err)
Expand Down
13 changes: 5 additions & 8 deletions cmd/query/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
metricsPlugin "github.com/jaegertracing/jaeger/plugin/metrics"
"github.com/jaegertracing/jaeger/plugin/storage"
"github.com/jaegertracing/jaeger/ports"
"github.com/jaegertracing/jaeger/storage/metricsstore/metricstoremetrics"
)

func main() {
Expand Down Expand Up @@ -99,7 +98,7 @@ func main() {
logger.Fatal("Failed to create dependency reader", zap.Error(err))
}

metricsQueryService, err := createMetricsQueryService(metricsReaderFactory, v, logger, metricsFactory)
metricsQueryService, err := createMetricsQueryService(metricsReaderFactory, v, baseTelset)
if err != nil {
logger.Fatal("Failed to create metrics query service", zap.Error(err))
}
Expand Down Expand Up @@ -166,20 +165,18 @@ func main() {
func createMetricsQueryService(
metricsReaderFactory *metricsPlugin.Factory,
v *viper.Viper,
logger *zap.Logger,
metricsReaderMetricsFactory metrics.Factory,
telset telemetry.Settings,
) (querysvc.MetricsQueryService, error) {
if err := metricsReaderFactory.Initialize(logger); err != nil {
if err := metricsReaderFactory.Initialize(telset); err != nil {
return nil, fmt.Errorf("failed to init metrics reader factory: %w", err)
}

// Ensure default parameter values are loaded correctly.
metricsReaderFactory.InitFromViper(v, logger)
metricsReaderFactory.InitFromViper(v, telset.Logger)
reader, err := metricsReaderFactory.CreateMetricsReader()
if err != nil {
return nil, fmt.Errorf("failed to create metrics reader: %w", err)
}

// Decorate the metrics reader with metrics instrumentation.
return metricstoremetrics.NewReaderDecorator(reader, metricsReaderMetricsFactory), nil
return reader, nil
}
3 changes: 2 additions & 1 deletion plugin/metrics/disabled/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/spf13/viper"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plugin/metrics is a bad name, we should rename it to plugin/metricstore

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yurishkuro I can do that in a follow-up PR

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can also rename storage/metricsstore to storage/metricstore (single s)

"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/telemetry"
"github.com/jaegertracing/jaeger/plugin"
"github.com/jaegertracing/jaeger/storage/metricsstore"
)
Expand All @@ -30,7 +31,7 @@ func (*Factory) AddFlags(_ *flag.FlagSet) {}
func (*Factory) InitFromViper(_ *viper.Viper, _ *zap.Logger) {}

// Initialize implements storage.MetricsFactory.
func (*Factory) Initialize(_ *zap.Logger) error {
func (*Factory) Initialize(_ telemetry.Settings) error {
return nil
}

Expand Down
5 changes: 3 additions & 2 deletions plugin/metrics/disabled/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,17 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/telemetry"
"github.com/jaegertracing/jaeger/storage"
)

var _ storage.MetricsFactory = new(Factory)

func TestPrometheusFactory(t *testing.T) {
f := NewFactory()
require.NoError(t, f.Initialize(zap.NewNop()))
require.NoError(t, f.Initialize(telemetry.NoopSettings()))

err := f.Initialize(nil)
err := f.Initialize(telemetry.NoopSettings())
require.NoError(t, err)

f.AddFlags(nil)
Expand Down
16 changes: 13 additions & 3 deletions plugin/metrics/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"github.com/spf13/viper"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/telemetry"
"github.com/jaegertracing/jaeger/plugin"
"github.com/jaegertracing/jaeger/plugin/metrics/disabled"
"github.com/jaegertracing/jaeger/plugin/metrics/prometheus"
Expand Down Expand Up @@ -63,9 +65,17 @@ func (*Factory) getFactoryOfType(factoryType string) (storage.MetricsFactory, er
}

// Initialize implements storage.MetricsFactory.
func (f *Factory) Initialize(logger *zap.Logger) error {
for _, factory := range f.factories {
factory.Initialize(logger)
func (f *Factory) Initialize(telset telemetry.Settings) error {
for kind, factory := range f.factories {
scopedTelset := telset
scopedTelset.Metrics = telset.Metrics.Namespace(metrics.NSOptions{
Name: "storage",
Tags: map[string]string{
"kind": kind,
mahadzaryab1 marked this conversation as resolved.
Show resolved Hide resolved
"role": "metricstore",
},
})
factory.Initialize(scopedTelset)
}
return nil
}
Expand Down
3 changes: 2 additions & 1 deletion plugin/metrics/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/telemetry"
"github.com/jaegertracing/jaeger/plugin/metrics/disabled"
"github.com/jaegertracing/jaeger/storage"
"github.com/jaegertracing/jaeger/storage/mocks"
Expand Down Expand Up @@ -53,7 +54,7 @@ func TestCreateMetricsReader(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, f)

require.NoError(t, f.Initialize(zap.NewNop()))
require.NoError(t, f.Initialize(telemetry.NoopSettings()))

reader, err := f.CreateMetricsReader()
require.NoError(t, err)
Expand Down
24 changes: 14 additions & 10 deletions plugin/metrics/prometheus/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,29 @@
"flag"

"github.com/spf13/viper"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/prometheus/config"
"github.com/jaegertracing/jaeger/pkg/telemetry"
"github.com/jaegertracing/jaeger/plugin"
prometheusstore "github.com/jaegertracing/jaeger/plugin/metrics/prometheus/metricsstore"
"github.com/jaegertracing/jaeger/storage/metricsstore"
"github.com/jaegertracing/jaeger/storage/metricsstore/metricstoremetrics"
)

var _ plugin.Configurable = (*Factory)(nil)

// Factory implements storage.Factory and creates storage components backed by memory store.
type Factory struct {
options *Options
logger *zap.Logger
tracer trace.TracerProvider
telset telemetry.Settings
}

// NewFactory creates a new Factory.
func NewFactory() *Factory {
telset := telemetry.NoopSettings()
return &Factory{
tracer: otel.GetTracerProvider(),
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
telset: telset,
options: NewOptions(),
}
}
Expand All @@ -47,19 +47,23 @@
}

// Initialize implements storage.MetricsFactory.
func (f *Factory) Initialize(logger *zap.Logger) error {
f.logger = logger
func (f *Factory) Initialize(telset telemetry.Settings) error {
f.telset = telset
return nil
}

// CreateMetricsReader implements storage.MetricsFactory.
func (f *Factory) CreateMetricsReader() (metricsstore.Reader, error) {
return prometheusstore.NewMetricsReader(f.options.Configuration, f.logger, f.tracer)
mr, err := prometheusstore.NewMetricsReader(f.options.Configuration, f.telset.Logger, f.telset.TracerProvider)
if err != nil {
return mr, err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return mr, err
return nil, err

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a test?

Copy link
Collaborator Author

@mahadzaryab1 mahadzaryab1 Dec 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yurishkuro We'll need to dip into the implementation of prometheusstore.NewMetricsReader to force an error here. Is that fine? Also, since we're just decorating the reader, do we want to force returning a nil if there is an error? My thinking was that we just pass along whatever it is we get without decorating the reader.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can simulate the error easily by passing a TLS config with "foobar" for some of the certificates.

It is convention to return nil, err in case of errors. It's probably what you would get from the factory already, but when you return mr, err you are returning a typed nil, so the nil check may actually fail in the caller.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yurishkuro sounds good! i'll also go back and do the same for the other factories in a follow-up PR

}

Check warning on line 60 in plugin/metrics/prometheus/factory.go

View check run for this annotation

Codecov / codecov/patch

plugin/metrics/prometheus/factory.go#L59-L60

Added lines #L59 - L60 were not covered by tests
return metricstoremetrics.NewReaderDecorator(mr, f.telset.Metrics), nil
}

func NewFactoryWithConfig(
cfg config.Configuration,
logger *zap.Logger,
telset telemetry.Settings,
) (*Factory, error) {
if err := cfg.Validate(); err != nil {
return nil, err
Expand All @@ -68,6 +72,6 @@
f.options = &Options{
Configuration: cfg,
}
f.Initialize(logger)
f.Initialize(telset)
return f, nil
}
9 changes: 5 additions & 4 deletions plugin/metrics/prometheus/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/jaegertracing/jaeger/pkg/config"
promCfg "github.com/jaegertracing/jaeger/pkg/prometheus/config"
"github.com/jaegertracing/jaeger/pkg/telemetry"
"github.com/jaegertracing/jaeger/pkg/testutils"
"github.com/jaegertracing/jaeger/storage"
)
Expand All @@ -22,8 +23,8 @@ var _ storage.MetricsFactory = new(Factory)

func TestPrometheusFactory(t *testing.T) {
f := NewFactory()
require.NoError(t, f.Initialize(zap.NewNop()))
assert.NotNil(t, f.logger)
require.NoError(t, f.Initialize(telemetry.NoopSettings()))
assert.NotNil(t, f.telset)

listener, err := net.Listen("tcp", "localhost:")
require.NoError(t, err)
Expand Down Expand Up @@ -126,15 +127,15 @@ func TestFailedTLSOptions(t *testing.T) {

func TestEmptyFactoryConfig(t *testing.T) {
cfg := promCfg.Configuration{}
_, err := NewFactoryWithConfig(cfg, zap.NewNop())
_, err := NewFactoryWithConfig(cfg, telemetry.NoopSettings())
require.Error(t, err)
}

func TestFactoryConfig(t *testing.T) {
cfg := promCfg.Configuration{
ServerURL: "localhost:1234",
}
_, err := NewFactoryWithConfig(cfg, zap.NewNop())
_, err := NewFactoryWithConfig(cfg, telemetry.NoopSettings())
require.NoError(t, err)
}

Expand Down
Loading
Loading