Skip to content

Commit

Permalink
Added go template and rendering with tests
Browse files Browse the repository at this point in the history
Signed-off-by: Alok Kumar Singh <[email protected]>
  • Loading branch information
akstron committed Oct 25, 2024
1 parent 26962f9 commit bcad4c0
Show file tree
Hide file tree
Showing 8 changed files with 536 additions and 264 deletions.
12 changes: 12 additions & 0 deletions cmd/jaeger/config-cassandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ extensions:
cassandra:
schema:
keyspace: "jaeger_v1_dc1"
datacenter: "test"
trace_ttl: 172800
dependencies_ttl: 172800
replication_factor: 1
cas_version: 4
compaction_window: "1m"
connection:
auth:
basic:
Expand All @@ -35,6 +41,12 @@ extensions:
cassandra:
schema:
keyspace: "jaeger_v1_dc1"
datacenter: "test"
trace_ttl: 172800
dependencies_ttl: 172800
replication_factor: 1
cas_version: 4
compaction_window: "1m"
connection:
auth:
basic:
Expand Down
2 changes: 1 addition & 1 deletion jaeger-ui
Submodule jaeger-ui updated 31 files
+3 −9 .github/workflows/Dockerfile
+4 −4 .github/workflows/codeql.yml
+8 −7 .github/workflows/lint-build.yml
+7 −7 .github/workflows/release.yml
+3 −3 .github/workflows/s390x-build.yaml
+4 −4 .github/workflows/scorecard.yml
+5 −5 .github/workflows/unit-tests.yml
+1 −1 .npmrc
+1 −2 .prettierignore
+10 −6 BUILD.md
+0 −10 CHANGELOG.md
+3 −3 CONTRIBUTING.md
+14 −12 README.md
+2 −5 RELEASE.md
+0 −22,092 package-lock.json
+23 −20 package.json
+8 −7 packages/jaeger-ui/package.json
+1 −1 packages/jaeger-ui/src/components/App/TopNav.css
+8,474 −8,183 packages/jaeger-ui/src/components/Monitor/ServicesView/operationDetailsTable/__snapshots__/index.test.js.snap
+1 −1 packages/jaeger-ui/src/components/Monitor/ServicesView/operationDetailsTable/index.css
+1 −1 packages/jaeger-ui/src/components/SearchTracePage/FileLoader.tsx
+1 −1 packages/jaeger-ui/src/components/SearchTracePage/SearchResults/ResultItemTitle.tsx
+1 −1 packages/jaeger-ui/src/components/SearchTracePage/__snapshots__/FileLoader.test.js.snap
+2 −2 packages/jaeger-ui/src/components/TracePage/index.tsx
+2 −2 packages/jaeger-ui/src/components/common/DetailsCard/__snapshots__/DetailList.test.js.snap
+1 −5 packages/jaeger-ui/src/types/index.tsx
+1 −1 packages/jaeger-ui/src/utils/tracking/README.md
+1 −1 packages/plexus/BUILD.md
+10 −10 packages/plexus/package.json
+5 −7 packages/plexus/src/DirectedGraph/builtins/Node.tsx
+11,070 −0 yarn.lock
16 changes: 15 additions & 1 deletion pkg/cassandra/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,26 @@ type Connection struct {
ProtoVersion int `mapstructure:"proto_version"`
}

type SchemaConfig struct {
Datacenter string `mapstructure:"datacenter" valid:"optional"`
TraceTTL int `mapstructure:"trace_ttl" valid:"optional"`
DependenciesTTL int `mapstructure:"dependencies_ttl" valid:"optional"`
ReplicationFactor int `mapstructure:"replication_factor" valid:"optional"`
CasVersion int `mapstructure:"cas_version" valid:"optional"`
CompactionWindow string `mapstructure:"compaction_window" valid:"optional"`
Replication string `mapstructure:"replication" valid:"optional"`
CompactionWindowSize int `mapstructure:"compaction_window_size" valid:"optional"`
CompactionWindowUnit string `mapstructure:"compaction_window_unit" valid:"optional"`
}

type Schema struct {
// Keyspace contains the namespace where Jaeger data will be stored.
Keyspace string `mapstructure:"keyspace"`
// DisableCompression, if set to true, disables the use of the default Snappy Compression
// while connecting to the Cassandra Cluster. This is useful for connecting to clusters, like Azure Cosmos DB,
// that do not support SnappyCompression.
DisableCompression bool `mapstructure:"disable_compression"`
SchemaConfig
}

type Query struct {
Expand Down Expand Up @@ -150,7 +163,8 @@ func (c *Configuration) NewSession() (cassandra.Session, error) {
// NewCluster creates a new gocql cluster from the configuration
func (c *Configuration) NewCluster() (*gocql.ClusterConfig, error) {
cluster := gocql.NewCluster(c.Connection.Servers...)
cluster.Keyspace = c.Schema.Keyspace
// Removing this, since keyspace would be created post builing connection
// cluster.Keyspace = c.Schema.Keyspace
cluster.NumConns = c.Connection.ConnectionsPerHost
cluster.ConnectTimeout = c.Connection.Timeout
cluster.ReconnectInterval = c.Connection.ReconnectInterval
Expand Down
267 changes: 5 additions & 262 deletions plugin/storage/cassandra/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,10 @@
package cassandra

import (
"bytes"
"context"
"embed"
"errors"
"flag"
"fmt"
"io"
"os"
"regexp"
"strconv"

"github.com/spf13/viper"
"go.opentelemetry.io/otel"
Expand All @@ -30,6 +24,7 @@ import (
cLock "github.com/jaegertracing/jaeger/plugin/pkg/distributedlock/cassandra"
cDepStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/dependencystore"
cSamplingStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/samplingstore"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra/schema"
cSpanStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore"
"github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore/dbmodel"
"github.com/jaegertracing/jaeger/storage"
Expand Down Expand Up @@ -137,260 +132,8 @@ func (f *Factory) configureFromOptions(o *Options) {
}
}

const (
MODE = `MODE`
DATACENTER = `DATACENTER`
TRACE_TTL = `TRACE_TTL`
DEPENDENCIES_TTL = `DEPENDENCIES_TTL`
KEYSPACE = `KEYSPACE`
REPLICATION_FACTOR = `REPLICATION_FACTOR`
VERSION = `VERSION`
COMPACTION_WINDOW = `COMPACTION_WINDOW`
)

// Parameters required for initilizing the db
type StorageConfigParams struct {
mode string
datacenter string
trace_ttl int
dependencies_ttl int
keyspace string
replication_factor int
replication string
cas_version int
compaction_window_size int
compaction_window_unit string
}

func constructStorageConfigParams() (*StorageConfigParams, error) {
var err error

datacenter := os.Getenv(DATACENTER)
replication_factor_string := os.Getenv(REPLICATION_FACTOR)

var replication_factor, compaction_window_size int
var replication, compaction_window_unit string

mode := os.Getenv(MODE)

if mode == "" {
return nil, fmt.Errorf("missing MODE parameter")
}

if mode != "test" || mode != "prod" {
return nil, fmt.Errorf("invalid MODE=%s, expecting 'prod' or 'test'", mode)
}

if mode == "test" {
if datacenter == "" {
datacenter = "test"
}

replication_factor = 1
if replication_factor_string != "" {
replication_factor, err = strconv.Atoi(replication_factor_string)
if err != nil {
return nil, err
}
}

replication = fmt.Sprintf("{'class': 'SimpleStrategy', 'replication_factor': '%v'}", replication_factor)
}

if mode == "prod" {
if datacenter == "" {
return nil, fmt.Errorf("missing DATACENTER parameter for prod mode")
}

replication_factor = 2
if replication_factor_string != "" {
replication_factor, err = strconv.Atoi(replication_factor_string)
if err != nil {
return nil, err
}
}

replication = fmt.Sprintf("{'class': 'NetworkTopologyStrategy', '%s': '%v' }", datacenter, replication_factor)
}

trace_ttl_string := os.Getenv(TRACE_TTL)
trace_ttl := 172800
if trace_ttl_string != "" {
trace_ttl, err = strconv.Atoi(trace_ttl_string)
if err != nil {
return nil, err
}
}

dependencies_ttl_string := os.Getenv(DEPENDENCIES_TTL)
dependencies_ttl := 0
if dependencies_ttl_string != "" {
dependencies_ttl, err = strconv.Atoi(dependencies_ttl_string)
if err != nil {
return nil, err
}
}

cas_version_string := os.Getenv(VERSION)
cas_version := 4
if cas_version_string != "" {
cas_version, err = strconv.Atoi(cas_version_string)
if err != nil {
return nil, err
}
}

keyspace := os.Getenv(KEYSPACE)
if keyspace == "" {
keyspace = fmt.Sprint("jaeger_v1_%s", datacenter)
}

var isMatch bool
isMatch, err = regexp.MatchString("[^a-zA-Z0-9_]", keyspace)
if err != nil {
return nil, err
}

if isMatch {
return nil, fmt.Errorf(`invalid characters in KEYSPACE=%s parameter, please use letters, digits or underscores`, keyspace)
}

if compaction_window := os.Getenv(COMPACTION_WINDOW); compaction_window != `` {
isMatch, err = regexp.MatchString("^[0-9]+[mhd]$", compaction_window)
if err != nil {
return nil, err
}

if !isMatch {
return nil, fmt.Errorf("Invalid compaction window size format. Please use numeric value followed by 'm' for minutes, 'h' for hours, or 'd' for days.")
}

compaction_window_size, err = strconv.Atoi(compaction_window[:len(compaction_window)-1])
if err != nil {
return nil, err
}

compaction_window_unit = compaction_window[len(compaction_window)-1:]
} else {
trace_ttl_minutes := trace_ttl / 60

compaction_window_size = (trace_ttl_minutes + 30 - 1) / 30
compaction_window_unit = "m"
}

switch compaction_window_unit {
case `m`:
compaction_window_unit = `MINUTES`
case `h`:
compaction_window_unit = `HOURS`
case `d`:
compaction_window_unit = `DAYS`
}

return &StorageConfigParams{
mode: mode,
datacenter: datacenter,
trace_ttl: trace_ttl,
dependencies_ttl: dependencies_ttl,
keyspace: keyspace,
replication_factor: replication_factor,
replication: replication,
cas_version: cas_version,
compaction_window_size: compaction_window_size,
compaction_window_unit: compaction_window_unit,
}, nil
}

// Embed all the template files in binaries

//go:embed schema/v001.cql.tmpl
//go:embed schema/v002.cql.tmpl
//go:embed schema/v003.cql.tmpl
//go:embed schema/v004.cql.tmpl
var schemaFile embed.FS

func handleTemplateReplacements(data []byte, params *StorageConfigParams) []byte {
templateKeysValuePairs := map[string]string{
`trace_ttl`: strconv.Itoa(params.trace_ttl),
`dependecies_ttl`: strconv.Itoa(params.dependencies_ttl),
`keyspace`: params.keyspace,
`replication_factor`: strconv.Itoa(params.replication_factor),
`replication`: params.replication,
`cas_version`: strconv.Itoa(params.cas_version),
`compaction_window_size`: strconv.Itoa(params.compaction_window_size),
`compaction_window_unit`: params.compaction_window_unit,
}

result := data
for key, value := range templateKeysValuePairs {
result = bytes.ReplaceAll(result, []byte(key), []byte(value))
}
return result
}

func constructQueriesFromTemplateFiles(session cassandra.Session, params *StorageConfigParams) ([]cassandra.Query, error) {
var queries []cassandra.Query

schemaFileName := fmt.Sprintf(`schema/v00%s.cql.tmpl`, strconv.Itoa(4))
schemaData, err := schemaFile.ReadFile(schemaFileName)
if err != nil {
return nil, err
}

lines := bytes.Split(schemaData, []byte("\n"))
var extractedLines [][]byte

for _, line := range lines {
// Remove any comments, if at the end of the line
commentIndex := bytes.LastIndex(line, []byte(`--`))
if commentIndex != -1 {
// remove everything after comment
line = line[0:commentIndex]
}

if len(line) == 0 {
continue
}

extractedLines = append(extractedLines, bytes.TrimSpace(handleTemplateReplacements(line, params)))
}

// Construct individual queries
var queryString string
for _, line := range extractedLines {
queryString += string(line)
if bytes.HasSuffix(line, []byte(";")) {
queries = append(queries, session.Query(queryString))
queryString = ""
}
}

if len(queryString) > 0 {
return nil, fmt.Errorf(`Invalid template`)
}

return queries, nil
}

func (f *Factory) InitializeDB(session cassandra.Session) error {
params, err := constructStorageConfigParams()
if err != nil {
return err
}

queries, err := constructQueriesFromTemplateFiles(session, params)
if err != nil {
return err
}

for _, query := range queries {
err := query.Exec()
if err != nil {
return err
}
}

return nil
func (f *Factory) initializeDB(session cassandra.Session, cfg *config.Schema) error {
return schema.GenerateSchemaIfNotPresent(session, cfg)
}

// Initialize implements storage.Factory
Expand All @@ -406,7 +149,7 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
f.primarySession = primarySession

// After creating a session, execute commands to initialize the setup if not already present
if err := f.InitializeDB(primarySession); err != nil {
if err := f.initializeDB(primarySession, &f.Options.Primary.Schema); err != nil {
return err
}

Expand All @@ -417,7 +160,7 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
}
f.archiveSession = archiveSession

if err := f.InitializeDB(archiveSession); err != nil {
if err := f.initializeDB(archiveSession, &f.Options.Primary.Schema); err != nil {
return err
}
} else {
Expand Down
Loading

0 comments on commit bcad4c0

Please sign in to comment.