Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v2][storage] Move span reader decorator to storage factories #6280

Merged
merged 22 commits into from
Nov 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
a2ee10e
Add Storage Metrics To Memory Span Reader
mahadzaryab1 Nov 29, 2024
a156666
Add Storage Metrics To Badger Span Reader
mahadzaryab1 Nov 29, 2024
3584e8c
Add Storage Metrics To Cassandra Span Reader
mahadzaryab1 Nov 29, 2024
b25b893
Add Storage Metrics To ElasticSearch Span Reader
mahadzaryab1 Nov 29, 2024
d7d2bc9
Add Storage Metrics To GRPC Span Reader
mahadzaryab1 Nov 29, 2024
b63af21
Remove Decorator From Query Extension
mahadzaryab1 Nov 29, 2024
294c5d0
Namespace Metrics Factory During Initialization In Meta Factory
mahadzaryab1 Nov 29, 2024
f8192a0
Remove Span Reader Decorator In All In One And Query
mahadzaryab1 Nov 29, 2024
6f678c5
Address Feedback From PR Review
mahadzaryab1 Nov 29, 2024
6ac1aea
Standardize Primary And Archive Metrics In Cassandra Factory
mahadzaryab1 Nov 30, 2024
6744399
Standardize Metrics In ES Storage
mahadzaryab1 Nov 30, 2024
0fd8360
Standardize Metrics In Memory Storage
mahadzaryab1 Nov 30, 2024
f7547da
Standardize Metrics In GRPC Storage
mahadzaryab1 Nov 30, 2024
214aac0
Remove Unused Metrics Factory From ES Reader
mahadzaryab1 Nov 30, 2024
e6e8704
Move Metrics Assignments To Initialize
mahadzaryab1 Nov 30, 2024
83aa823
Move Metrics Assignments To Initialize
mahadzaryab1 Nov 30, 2024
46ab305
Fix Test Assertions
mahadzaryab1 Nov 30, 2024
343c90d
Use Different Factory For Sampling
mahadzaryab1 Nov 30, 2024
b9b5972
Use Base Factory For Dep Reader
mahadzaryab1 Nov 30, 2024
a5a673f
Use Primary Factory For Dep Reader
mahadzaryab1 Nov 30, 2024
6d9e80b
Merge branch 'main' into storage-factory-metrics
yurishkuro Nov 30, 2024
d0319f1
Merge branch 'main' into storage-factory-metrics
yurishkuro Nov 30, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import (
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/metricsstore/metricstoremetrics"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage/spanstore/spanstoremetrics"
)

// all-in-one/main is a standalone full-stack jaeger backend, backed by a memory store
Expand Down Expand Up @@ -223,7 +222,6 @@ func startQuery(
tm *tenancy.Manager,
telset telemetry.Settings,
) *queryApp.Server {
spanReader = spanstoremetrics.NewReaderDecorator(spanReader, telset.Metrics)
qs := querysvc.NewQueryService(spanReader, depReader, *queryOpts)

server, err := queryApp.NewServer(context.Background(), qs, metricsQueryService, qOpts, tm, telset)
Expand Down
3 changes: 0 additions & 3 deletions cmd/jaeger/internal/extension/jaegerquery/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/jaegertracing/jaeger/plugin/metrics/disabled"
"github.com/jaegertracing/jaeger/storage/metricsstore"
"github.com/jaegertracing/jaeger/storage/metricsstore/metricstoremetrics"
"github.com/jaegertracing/jaeger/storage/spanstore/spanstoremetrics"
)

var (
Expand Down Expand Up @@ -85,8 +84,6 @@ func (s *server) Start(ctx context.Context, host component.Host) error {
return fmt.Errorf("cannot create span reader: %w", err)
}

spanReader = spanstoremetrics.NewReaderDecorator(spanReader, telset.Metrics)

depReader, err := f.CreateDependencyReader()
if err != nil {
return fmt.Errorf("cannot create dependencies reader: %w", err)
Expand Down
42 changes: 36 additions & 6 deletions cmd/jaeger/internal/extension/jaegerstorage/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,24 +117,54 @@ func newStorageExt(config *Config, telset component.TelemetrySettings) *storageE
func (s *storageExt) Start(_ context.Context, host component.Host) error {
telset := telemetry.FromOtelComponent(s.telset, host)
telset.Metrics = telset.Metrics.Namespace(metrics.NSOptions{Name: "jaeger"})
getMetricsFactory := func(name, kind string) metrics.Factory {
return telset.Metrics.Namespace(metrics.NSOptions{
Name: "storage",
Tags: map[string]string{
"name": name,
"kind": kind,
},
})
}
for storageName, cfg := range s.config.TraceBackends {
s.telset.Logger.Sugar().Infof("Initializing storage '%s'", storageName)
var factory storage.Factory
var err error = errors.New("empty configuration")
switch {
case cfg.Memory != nil:
factory, err = memory.NewFactoryWithConfig(*cfg.Memory, telset.Metrics, s.telset.Logger), nil
factory, err = memory.NewFactoryWithConfig(
*cfg.Memory,
getMetricsFactory(storageName, "memory"),
s.telset.Logger,
), nil
case cfg.Badger != nil:
factory, err = badger.NewFactoryWithConfig(*cfg.Badger, telset.Metrics, s.telset.Logger)
factory, err = badger.NewFactoryWithConfig(
*cfg.Badger,
getMetricsFactory(storageName, "badger"),
s.telset.Logger)
case cfg.GRPC != nil:
grpcTelset := telset
grpcTelset.Metrics = getMetricsFactory(storageName, "grpc")
//nolint: contextcheck
factory, err = grpc.NewFactoryWithConfig(*cfg.GRPC, telset)
factory, err = grpc.NewFactoryWithConfig(*cfg.GRPC, grpcTelset)
case cfg.Cassandra != nil:
factory, err = cassandra.NewFactoryWithConfig(*cfg.Cassandra, telset.Metrics, s.telset.Logger)
factory, err = cassandra.NewFactoryWithConfig(
*cfg.Cassandra,
getMetricsFactory(storageName, "cassandra"),
s.telset.Logger,
)
case cfg.Elasticsearch != nil:
factory, err = es.NewFactoryWithConfig(*cfg.Elasticsearch, telset.Metrics, s.telset.Logger)
factory, err = es.NewFactoryWithConfig(
*cfg.Elasticsearch,
getMetricsFactory(storageName, "elasticsearch"),
s.telset.Logger,
)
case cfg.Opensearch != nil:
factory, err = es.NewFactoryWithConfig(*cfg.Opensearch, telset.Metrics, s.telset.Logger)
factory, err = es.NewFactoryWithConfig(
*cfg.Opensearch,
getMetricsFactory(storageName, "opensearch"),
s.telset.Logger,
)
}
if err != nil {
return fmt.Errorf("failed to initialize storage '%s': %w", storageName, err)
Expand Down
2 changes: 0 additions & 2 deletions cmd/query/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/jaegertracing/jaeger/plugin/storage"
"github.com/jaegertracing/jaeger/ports"
"github.com/jaegertracing/jaeger/storage/metricsstore/metricstoremetrics"
"github.com/jaegertracing/jaeger/storage/spanstore/spanstoremetrics"
)

func main() {
Expand Down Expand Up @@ -95,7 +94,6 @@ func main() {
if err != nil {
logger.Fatal("Failed to create span reader", zap.Error(err))
}
spanReader = spanstoremetrics.NewReaderDecorator(spanReader, metricsFactory)
dependencyReader, err := storageFactory.CreateDependencyReader()
if err != nil {
logger.Fatal("Failed to create dependency reader", zap.Error(err))
Expand Down
14 changes: 9 additions & 5 deletions plugin/storage/badger/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/samplingstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage/spanstore/spanstoremetrics"
)

const (
Expand All @@ -50,10 +51,11 @@ var ( // interface comformance checks

// Factory implements storage.Factory for Badger backend.
type Factory struct {
Config *Config
store *badger.DB
cache *badgerStore.CacheStore
logger *zap.Logger
Config *Config
store *badger.DB
cache *badgerStore.CacheStore
logger *zap.Logger
metricsFactory metrics.Factory

tmpDir string
maintenanceDone chan bool
Expand Down Expand Up @@ -115,6 +117,7 @@ func (f *Factory) configure(config *Config) {
// Initialize implements storage.Factory
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error {
f.logger = logger
f.metricsFactory = metricsFactory

opts := badger.DefaultOptions("")

Expand Down Expand Up @@ -173,7 +176,8 @@ func initializeDir(path string) {

// CreateSpanReader implements storage.Factory
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
return badgerStore.NewTraceReader(f.store, f.cache), nil
tr := badgerStore.NewTraceReader(f.store, f.cache)
return spanstoremetrics.NewReaderDecorator(tr, f.metricsFactory), nil
}

// CreateSpanWriter implements storage.Factory
Expand Down
40 changes: 35 additions & 5 deletions plugin/storage/cassandra/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/samplingstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage/spanstore/spanstoremetrics"
)

const (
Expand All @@ -52,6 +53,7 @@
type Factory struct {
Options *Options

metricsFactory metrics.Factory
primaryMetricsFactory metrics.Factory
archiveMetricsFactory metrics.Factory
logger *zap.Logger
Expand Down Expand Up @@ -138,8 +140,21 @@

// Initialize implements storage.Factory
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error {
f.primaryMetricsFactory = metricsFactory.Namespace(metrics.NSOptions{Name: "cassandra", Tags: nil})
f.archiveMetricsFactory = metricsFactory.Namespace(metrics.NSOptions{Name: "cassandra-archive", Tags: nil})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not keep these two here? Otherwise you're duplicating namespace assignments twice, which means they can get out of sync.

Copy link
Collaborator Author

@mahadzaryab1 mahadzaryab1 Nov 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yurishkuro Done. However, CreateSamplingStore (https://github.com/jaegertracing/jaeger/blob/main/plugin/storage/cassandra/factory.go#L211) and CreateDependencyReader (https://github.com/jaegertracing/jaeger/blob/main/plugin/storage/cassandra/factory.go#L175) will now have the role=primary attached to the metrics they emit. Is this fine? or should they just be emitting metrics under the namespace passed into the storage factory without the role tag?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think sampling store should be kind=cassandra, role=sampling

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yurishkuro Done. What about the dependency reader? I'm using the base factory for now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what would it look like? Dependencies technically were always bundled within the spanstore, not a different "role".

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the metrics would be published under jaeger_storage_*** with kind=cassandra but no role tag.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's fine

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

although since it's using the primary connection / session I would pass it the primary metrics

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

f.metricsFactory = metricsFactory
f.primaryMetricsFactory = metricsFactory.Namespace(
metrics.NSOptions{
Tags: map[string]string{
"role": "primary",
},
},
)
f.archiveMetricsFactory = metricsFactory.Namespace(
metrics.NSOptions{
Tags: map[string]string{
"role": "archive",
},
},
)
f.logger = logger

primarySession, err := f.sessionBuilderFn(&f.primaryConfig)
Expand Down Expand Up @@ -204,7 +219,11 @@

// CreateSpanReader implements storage.Factory
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
return cSpanStore.NewSpanReader(f.primarySession, f.primaryMetricsFactory, f.logger, f.tracer.Tracer("cSpanStore.SpanReader"))
sr, err := cSpanStore.NewSpanReader(f.primarySession, f.primaryMetricsFactory, f.logger, f.tracer.Tracer("cSpanStore.SpanReader"))
if err != nil {
return sr, err
}

Check warning on line 225 in plugin/storage/cassandra/factory.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/cassandra/factory.go#L224-L225

Added lines #L224 - L225 were not covered by tests
return spanstoremetrics.NewReaderDecorator(sr, f.primaryMetricsFactory), nil
}

// CreateSpanWriter implements storage.Factory
Expand All @@ -227,7 +246,11 @@
if f.archiveSession == nil {
return nil, storage.ErrArchiveStorageNotConfigured
}
return cSpanStore.NewSpanReader(f.archiveSession, f.archiveMetricsFactory, f.logger, f.tracer.Tracer("cSpanStore.SpanReader"))
sr, err := cSpanStore.NewSpanReader(f.archiveSession, f.archiveMetricsFactory, f.logger, f.tracer.Tracer("cSpanStore.SpanReader"))
if err != nil {
return sr, err
}

Check warning on line 252 in plugin/storage/cassandra/factory.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/cassandra/factory.go#L251-L252

Added lines #L251 - L252 were not covered by tests
return spanstoremetrics.NewReaderDecorator(sr, f.archiveMetricsFactory), nil
}

// CreateArchiveSpanWriter implements storage.ArchiveFactory
Expand Down Expand Up @@ -255,7 +278,14 @@

// CreateSamplingStore implements storage.SamplingStoreFactory
func (f *Factory) CreateSamplingStore(int /* maxBuckets */) (samplingstore.Store, error) {
return cSamplingStore.New(f.primarySession, f.primaryMetricsFactory, f.logger), nil
samplingMetricsFactory := f.metricsFactory.Namespace(
metrics.NSOptions{
Tags: map[string]string{
"role": "sampling",
},
},
)
return cSamplingStore.New(f.primarySession, samplingMetricsFactory, f.logger), nil
}

func writerOptions(opts *Options) ([]cSpanStore.Option, error) {
Expand Down
50 changes: 36 additions & 14 deletions plugin/storage/es/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/samplingstore"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage/spanstore/spanstoremetrics"
)

const (
Expand All @@ -52,9 +53,10 @@
type Factory struct {
Options *Options

metricsFactory metrics.Factory
logger *zap.Logger
tracer trace.TracerProvider
primaryMetricsFactory metrics.Factory
archiveMetricsFactory metrics.Factory
logger *zap.Logger
tracer trace.TracerProvider

newClientFn func(c *config.Configuration, logger *zap.Logger, metricsFactory metrics.Factory) (es.Client, error)

Expand Down Expand Up @@ -129,7 +131,21 @@

// Initialize implements storage.Factory.
func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error {
f.metricsFactory, f.logger = metricsFactory, logger
f.primaryMetricsFactory = metricsFactory.Namespace(
metrics.NSOptions{
Tags: map[string]string{
"role": "primary",
},
},
)
f.archiveMetricsFactory = metricsFactory.Namespace(
metrics.NSOptions{
Tags: map[string]string{
"role": "archive",
},
},
)
f.logger = logger

primaryClient, err := f.newClientFn(f.primaryConfig, logger, metricsFactory)
if err != nil {
Expand Down Expand Up @@ -180,12 +196,16 @@

// CreateSpanReader implements storage.Factory
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
return createSpanReader(f.getPrimaryClient, f.primaryConfig, false, f.metricsFactory, f.logger, f.tracer)
sr, err := createSpanReader(f.getPrimaryClient, f.primaryConfig, false, f.logger, f.tracer)
if err != nil {
return sr, err
}
return spanstoremetrics.NewReaderDecorator(sr, f.primaryMetricsFactory), nil
}

// CreateSpanWriter implements storage.Factory
func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
return createSpanWriter(f.getPrimaryClient, f.primaryConfig, false, f.metricsFactory, f.logger)
return createSpanWriter(f.getPrimaryClient, f.primaryConfig, false, f.primaryMetricsFactory, f.logger)
}

// CreateDependencyReader implements storage.Factory
Expand All @@ -198,22 +218,25 @@
if !f.archiveConfig.Enabled {
return nil, nil
}
return createSpanReader(f.getArchiveClient, f.archiveConfig, true, f.metricsFactory, f.logger, f.tracer)
sr, err := createSpanReader(f.getArchiveClient, f.archiveConfig, true, f.logger, f.tracer)
if err != nil {
return sr, err
}

Check warning on line 224 in plugin/storage/es/factory.go

View check run for this annotation

Codecov / codecov/patch

plugin/storage/es/factory.go#L223-L224

Added lines #L223 - L224 were not covered by tests
return spanstoremetrics.NewReaderDecorator(sr, f.archiveMetricsFactory), nil
}

// CreateArchiveSpanWriter implements storage.ArchiveFactory
func (f *Factory) CreateArchiveSpanWriter() (spanstore.Writer, error) {
if !f.archiveConfig.Enabled {
return nil, nil
}
return createSpanWriter(f.getArchiveClient, f.archiveConfig, true, f.metricsFactory, f.logger)
return createSpanWriter(f.getArchiveClient, f.archiveConfig, true, f.archiveMetricsFactory, f.logger)
}

func createSpanReader(
clientFn func() es.Client,
cfg *config.Configuration,
archive bool,
mFactory metrics.Factory,
logger *zap.Logger,
tp trace.TracerProvider,
) (spanstore.Reader, error) {
Expand All @@ -232,7 +255,6 @@
Archive: archive,
RemoteReadClusters: cfg.RemoteReadClusters,
Logger: logger,
MetricsFactory: mFactory,
Tracer: tp.Tracer("esSpanStore.SpanReader"),
}), nil
}
Expand Down Expand Up @@ -352,14 +374,14 @@
}

func (f *Factory) onPrimaryPasswordChange() {
f.onClientPasswordChange(f.primaryConfig, &f.primaryClient)
f.onClientPasswordChange(f.primaryConfig, &f.primaryClient, f.primaryMetricsFactory)
}

func (f *Factory) onArchivePasswordChange() {
f.onClientPasswordChange(f.archiveConfig, &f.archiveClient)
f.onClientPasswordChange(f.archiveConfig, &f.archiveClient, f.archiveMetricsFactory)
}

func (f *Factory) onClientPasswordChange(cfg *config.Configuration, client *atomic.Pointer[es.Client]) {
func (f *Factory) onClientPasswordChange(cfg *config.Configuration, client *atomic.Pointer[es.Client], mf metrics.Factory) {
newPassword, err := loadTokenFromFile(cfg.Authentication.BasicAuthentication.PasswordFilePath)
if err != nil {
f.logger.Error("failed to reload password for Elasticsearch client", zap.Error(err))
Expand All @@ -370,7 +392,7 @@
newCfg.Authentication.BasicAuthentication.Password = newPassword
newCfg.Authentication.BasicAuthentication.PasswordFilePath = "" // avoid error that both are set

newClient, err := f.newClientFn(&newCfg, f.logger, f.metricsFactory)
newClient, err := f.newClientFn(&newCfg, f.logger, mf)
if err != nil {
f.logger.Error("failed to recreate Elasticsearch client with new password", zap.Error(err))
return
Expand Down
2 changes: 0 additions & 2 deletions plugin/storage/es/spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/es"
cfg "github.com/jaegertracing/jaeger/pkg/es/config"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/plugin/storage/es/spanstore/dbmodel"
"github.com/jaegertracing/jaeger/storage/spanstore"
)
Expand Down Expand Up @@ -113,7 +112,6 @@ type SpanReaderParams struct {
Archive bool
UseReadWriteAliases bool
RemoteReadClusters []string
MetricsFactory metrics.Factory
Logger *zap.Logger
Tracer trace.Tracer
}
Expand Down
Loading
Loading