From bcad4c0ad7f580d807d43ea33a191cc7e9359de3 Mon Sep 17 00:00:00 2001 From: Alok Kumar Singh Date: Fri, 25 Oct 2024 19:02:54 +0530 Subject: [PATCH] Added go template and rendering with tests Signed-off-by: Alok Kumar Singh --- cmd/jaeger/config-cassandra.yaml | 12 + jaeger-ui | 2 +- pkg/cassandra/config/config.go | 16 +- plugin/storage/cassandra/factory.go | 267 +----------------- plugin/storage/cassandra/schema/schema.go | 200 +++++++++++++ .../storage/cassandra/schema/schema_test.go | 19 ++ .../schema/v004-go-tmpl-test.cql.tmpl | 84 ++++++ .../cassandra/schema/v004-go-tmpl.cql.tmpl | 200 +++++++++++++ 8 files changed, 536 insertions(+), 264 deletions(-) create mode 100644 plugin/storage/cassandra/schema/schema.go create mode 100644 plugin/storage/cassandra/schema/schema_test.go create mode 100644 plugin/storage/cassandra/schema/v004-go-tmpl-test.cql.tmpl create mode 100644 plugin/storage/cassandra/schema/v004-go-tmpl.cql.tmpl diff --git a/cmd/jaeger/config-cassandra.yaml b/cmd/jaeger/config-cassandra.yaml index 0076fa48fd0..582e870de12 100644 --- a/cmd/jaeger/config-cassandra.yaml +++ b/cmd/jaeger/config-cassandra.yaml @@ -24,6 +24,12 @@ extensions: cassandra: schema: keyspace: "jaeger_v1_dc1" + datacenter: "test" + trace_ttl: 172800 + dependencies_ttl: 172800 + replication_factor: 1 + cas_version: 4 + compaction_window: "1m" connection: auth: basic: @@ -35,6 +41,12 @@ extensions: cassandra: schema: keyspace: "jaeger_v1_dc1" + datacenter: "test" + trace_ttl: 172800 + dependencies_ttl: 172800 + replication_factor: 1 + cas_version: 4 + compaction_window: "1m" connection: auth: basic: diff --git a/jaeger-ui b/jaeger-ui index 6dbddd02cc4..3b093f81dec 160000 --- a/jaeger-ui +++ b/jaeger-ui @@ -1 +1 @@ -Subproject commit 6dbddd02cc4ead13d3ab412aad90d5caa0a8907a +Subproject commit 3b093f81dec59c6bb04daad96b1b77bd03a29e4a diff --git a/pkg/cassandra/config/config.go b/pkg/cassandra/config/config.go index 6bab9c75da4..23fa3b0767a 100644 --- a/pkg/cassandra/config/config.go +++ b/pkg/cassandra/config/config.go @@ -51,6 +51,18 @@ type Connection struct { ProtoVersion int `mapstructure:"proto_version"` } +type SchemaConfig struct { + Datacenter string `mapstructure:"datacenter" valid:"optional"` + TraceTTL int `mapstructure:"trace_ttl" valid:"optional"` + DependenciesTTL int `mapstructure:"dependencies_ttl" valid:"optional"` + ReplicationFactor int `mapstructure:"replication_factor" valid:"optional"` + CasVersion int `mapstructure:"cas_version" valid:"optional"` + CompactionWindow string `mapstructure:"compaction_window" valid:"optional"` + Replication string `mapstructure:"replication" valid:"optional"` + CompactionWindowSize int `mapstructure:"compaction_window_size" valid:"optional"` + CompactionWindowUnit string `mapstructure:"compaction_window_unit" valid:"optional"` +} + type Schema struct { // Keyspace contains the namespace where Jaeger data will be stored. Keyspace string `mapstructure:"keyspace"` @@ -58,6 +70,7 @@ type Schema struct { // while connecting to the Cassandra Cluster. This is useful for connecting to clusters, like Azure Cosmos DB, // that do not support SnappyCompression. DisableCompression bool `mapstructure:"disable_compression"` + SchemaConfig } type Query struct { @@ -150,7 +163,8 @@ func (c *Configuration) NewSession() (cassandra.Session, error) { // NewCluster creates a new gocql cluster from the configuration func (c *Configuration) NewCluster() (*gocql.ClusterConfig, error) { cluster := gocql.NewCluster(c.Connection.Servers...) - cluster.Keyspace = c.Schema.Keyspace + // Removing this, since keyspace would be created post builing connection + // cluster.Keyspace = c.Schema.Keyspace cluster.NumConns = c.Connection.ConnectionsPerHost cluster.ConnectTimeout = c.Connection.Timeout cluster.ReconnectInterval = c.Connection.ReconnectInterval diff --git a/plugin/storage/cassandra/factory.go b/plugin/storage/cassandra/factory.go index fa8e0cfe337..fca10dcc2bc 100644 --- a/plugin/storage/cassandra/factory.go +++ b/plugin/storage/cassandra/factory.go @@ -5,16 +5,10 @@ package cassandra import ( - "bytes" "context" - "embed" "errors" "flag" - "fmt" "io" - "os" - "regexp" - "strconv" "github.com/spf13/viper" "go.opentelemetry.io/otel" @@ -30,6 +24,7 @@ import ( cLock "github.com/jaegertracing/jaeger/plugin/pkg/distributedlock/cassandra" cDepStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/dependencystore" cSamplingStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/samplingstore" + "github.com/jaegertracing/jaeger/plugin/storage/cassandra/schema" cSpanStore "github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore" "github.com/jaegertracing/jaeger/plugin/storage/cassandra/spanstore/dbmodel" "github.com/jaegertracing/jaeger/storage" @@ -137,260 +132,8 @@ func (f *Factory) configureFromOptions(o *Options) { } } -const ( - MODE = `MODE` - DATACENTER = `DATACENTER` - TRACE_TTL = `TRACE_TTL` - DEPENDENCIES_TTL = `DEPENDENCIES_TTL` - KEYSPACE = `KEYSPACE` - REPLICATION_FACTOR = `REPLICATION_FACTOR` - VERSION = `VERSION` - COMPACTION_WINDOW = `COMPACTION_WINDOW` -) - -// Parameters required for initilizing the db -type StorageConfigParams struct { - mode string - datacenter string - trace_ttl int - dependencies_ttl int - keyspace string - replication_factor int - replication string - cas_version int - compaction_window_size int - compaction_window_unit string -} - -func constructStorageConfigParams() (*StorageConfigParams, error) { - var err error - - datacenter := os.Getenv(DATACENTER) - replication_factor_string := os.Getenv(REPLICATION_FACTOR) - - var replication_factor, compaction_window_size int - var replication, compaction_window_unit string - - mode := os.Getenv(MODE) - - if mode == "" { - return nil, fmt.Errorf("missing MODE parameter") - } - - if mode != "test" || mode != "prod" { - return nil, fmt.Errorf("invalid MODE=%s, expecting 'prod' or 'test'", mode) - } - - if mode == "test" { - if datacenter == "" { - datacenter = "test" - } - - replication_factor = 1 - if replication_factor_string != "" { - replication_factor, err = strconv.Atoi(replication_factor_string) - if err != nil { - return nil, err - } - } - - replication = fmt.Sprintf("{'class': 'SimpleStrategy', 'replication_factor': '%v'}", replication_factor) - } - - if mode == "prod" { - if datacenter == "" { - return nil, fmt.Errorf("missing DATACENTER parameter for prod mode") - } - - replication_factor = 2 - if replication_factor_string != "" { - replication_factor, err = strconv.Atoi(replication_factor_string) - if err != nil { - return nil, err - } - } - - replication = fmt.Sprintf("{'class': 'NetworkTopologyStrategy', '%s': '%v' }", datacenter, replication_factor) - } - - trace_ttl_string := os.Getenv(TRACE_TTL) - trace_ttl := 172800 - if trace_ttl_string != "" { - trace_ttl, err = strconv.Atoi(trace_ttl_string) - if err != nil { - return nil, err - } - } - - dependencies_ttl_string := os.Getenv(DEPENDENCIES_TTL) - dependencies_ttl := 0 - if dependencies_ttl_string != "" { - dependencies_ttl, err = strconv.Atoi(dependencies_ttl_string) - if err != nil { - return nil, err - } - } - - cas_version_string := os.Getenv(VERSION) - cas_version := 4 - if cas_version_string != "" { - cas_version, err = strconv.Atoi(cas_version_string) - if err != nil { - return nil, err - } - } - - keyspace := os.Getenv(KEYSPACE) - if keyspace == "" { - keyspace = fmt.Sprint("jaeger_v1_%s", datacenter) - } - - var isMatch bool - isMatch, err = regexp.MatchString("[^a-zA-Z0-9_]", keyspace) - if err != nil { - return nil, err - } - - if isMatch { - return nil, fmt.Errorf(`invalid characters in KEYSPACE=%s parameter, please use letters, digits or underscores`, keyspace) - } - - if compaction_window := os.Getenv(COMPACTION_WINDOW); compaction_window != `` { - isMatch, err = regexp.MatchString("^[0-9]+[mhd]$", compaction_window) - if err != nil { - return nil, err - } - - if !isMatch { - return nil, fmt.Errorf("Invalid compaction window size format. Please use numeric value followed by 'm' for minutes, 'h' for hours, or 'd' for days.") - } - - compaction_window_size, err = strconv.Atoi(compaction_window[:len(compaction_window)-1]) - if err != nil { - return nil, err - } - - compaction_window_unit = compaction_window[len(compaction_window)-1:] - } else { - trace_ttl_minutes := trace_ttl / 60 - - compaction_window_size = (trace_ttl_minutes + 30 - 1) / 30 - compaction_window_unit = "m" - } - - switch compaction_window_unit { - case `m`: - compaction_window_unit = `MINUTES` - case `h`: - compaction_window_unit = `HOURS` - case `d`: - compaction_window_unit = `DAYS` - } - - return &StorageConfigParams{ - mode: mode, - datacenter: datacenter, - trace_ttl: trace_ttl, - dependencies_ttl: dependencies_ttl, - keyspace: keyspace, - replication_factor: replication_factor, - replication: replication, - cas_version: cas_version, - compaction_window_size: compaction_window_size, - compaction_window_unit: compaction_window_unit, - }, nil -} - -// Embed all the template files in binaries - -//go:embed schema/v001.cql.tmpl -//go:embed schema/v002.cql.tmpl -//go:embed schema/v003.cql.tmpl -//go:embed schema/v004.cql.tmpl -var schemaFile embed.FS - -func handleTemplateReplacements(data []byte, params *StorageConfigParams) []byte { - templateKeysValuePairs := map[string]string{ - `trace_ttl`: strconv.Itoa(params.trace_ttl), - `dependecies_ttl`: strconv.Itoa(params.dependencies_ttl), - `keyspace`: params.keyspace, - `replication_factor`: strconv.Itoa(params.replication_factor), - `replication`: params.replication, - `cas_version`: strconv.Itoa(params.cas_version), - `compaction_window_size`: strconv.Itoa(params.compaction_window_size), - `compaction_window_unit`: params.compaction_window_unit, - } - - result := data - for key, value := range templateKeysValuePairs { - result = bytes.ReplaceAll(result, []byte(key), []byte(value)) - } - return result -} - -func constructQueriesFromTemplateFiles(session cassandra.Session, params *StorageConfigParams) ([]cassandra.Query, error) { - var queries []cassandra.Query - - schemaFileName := fmt.Sprintf(`schema/v00%s.cql.tmpl`, strconv.Itoa(4)) - schemaData, err := schemaFile.ReadFile(schemaFileName) - if err != nil { - return nil, err - } - - lines := bytes.Split(schemaData, []byte("\n")) - var extractedLines [][]byte - - for _, line := range lines { - // Remove any comments, if at the end of the line - commentIndex := bytes.LastIndex(line, []byte(`--`)) - if commentIndex != -1 { - // remove everything after comment - line = line[0:commentIndex] - } - - if len(line) == 0 { - continue - } - - extractedLines = append(extractedLines, bytes.TrimSpace(handleTemplateReplacements(line, params))) - } - - // Construct individual queries - var queryString string - for _, line := range extractedLines { - queryString += string(line) - if bytes.HasSuffix(line, []byte(";")) { - queries = append(queries, session.Query(queryString)) - queryString = "" - } - } - - if len(queryString) > 0 { - return nil, fmt.Errorf(`Invalid template`) - } - - return queries, nil -} - -func (f *Factory) InitializeDB(session cassandra.Session) error { - params, err := constructStorageConfigParams() - if err != nil { - return err - } - - queries, err := constructQueriesFromTemplateFiles(session, params) - if err != nil { - return err - } - - for _, query := range queries { - err := query.Exec() - if err != nil { - return err - } - } - - return nil +func (f *Factory) initializeDB(session cassandra.Session, cfg *config.Schema) error { + return schema.GenerateSchemaIfNotPresent(session, cfg) } // Initialize implements storage.Factory @@ -406,7 +149,7 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) f.primarySession = primarySession // After creating a session, execute commands to initialize the setup if not already present - if err := f.InitializeDB(primarySession); err != nil { + if err := f.initializeDB(primarySession, &f.Options.Primary.Schema); err != nil { return err } @@ -417,7 +160,7 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) } f.archiveSession = archiveSession - if err := f.InitializeDB(archiveSession); err != nil { + if err := f.initializeDB(archiveSession, &f.Options.Primary.Schema); err != nil { return err } } else { diff --git a/plugin/storage/cassandra/schema/schema.go b/plugin/storage/cassandra/schema/schema.go new file mode 100644 index 00000000000..7f002dd5ca6 --- /dev/null +++ b/plugin/storage/cassandra/schema/schema.go @@ -0,0 +1,200 @@ +package schema + +import ( + "bytes" + "embed" + "fmt" + "regexp" + "strconv" + "text/template" + + "github.com/jaegertracing/jaeger/pkg/cassandra" + "github.com/jaegertracing/jaeger/pkg/cassandra/config" +) + +//go:embed v004-go-tmpl.cql.tmpl +//go:embed v004-go-tmpl-test.cql.tmpl +var schemaFile embed.FS + +func DefaultSchemaConfig() config.Schema { + return config.Schema{ + Keyspace: "jaeger_v2_test", + SchemaConfig: config.SchemaConfig{ + Datacenter: "test", + TraceTTL: 172800, + DependenciesTTL: 0, + ReplicationFactor: 1, + CasVersion: 4, + }, + } +} + +func applyDefaults(cfg *config.Schema) { + defaultSchema := DefaultSchemaConfig() + + if cfg.Keyspace == "" { + cfg.Keyspace = defaultSchema.Keyspace + } + + if cfg.Datacenter == "" { + cfg.Datacenter = defaultSchema.Datacenter + } + + if cfg.TraceTTL == 0 { + cfg.TraceTTL = defaultSchema.TraceTTL + } + + if cfg.ReplicationFactor == 0 { + cfg.ReplicationFactor = defaultSchema.ReplicationFactor + } + + if cfg.CasVersion == 0 { + cfg.CasVersion = 4 + } +} + +// Applies defaults for the configs and contructs other optional parameters from it +func constructCompleteSchemaConfig(cfg *config.Schema) error { + applyDefaults(cfg) + + cfg.Replication = fmt.Sprintf("{'class': 'NetworkTopologyStrategy', '%s': '%v' }", cfg.Datacenter, cfg.ReplicationFactor) + + if cfg.CompactionWindow != "" { + isMatch, err := regexp.MatchString("^[0-9]+[mhd]$", cfg.CompactionWindow) + if err != nil { + return err + } + + if !isMatch { + return fmt.Errorf("Invalid compaction window size format. Please use numeric value followed by 'm' for minutes, 'h' for hours, or 'd' for days") + } + + cfg.CompactionWindowSize, err = strconv.Atoi(cfg.CompactionWindow[:len(cfg.CompactionWindow)-1]) + if err != nil { + return err + } + + cfg.CompactionWindowUnit = cfg.CompactionWindow[len(cfg.CompactionWindow)-1:] + } else { + traceTTLMinutes := cfg.TraceTTL / 60 + + cfg.CompactionWindowSize = (traceTTLMinutes + 30 - 1) / 30 + cfg.CompactionWindowUnit = "m" + } + + switch cfg.CompactionWindowUnit { + case `m`: + cfg.CompactionWindowUnit = `MINUTES` + case `h`: + cfg.CompactionWindowUnit = `HOURS` + case `d`: + cfg.CompactionWindowUnit = `DAYS` + default: + return fmt.Errorf("Invalid compaction window unit. If can be among {m|h|d}") + } + + return nil +} + +func getQueryFileAsBytes(fileName string, cfg *config.Schema) ([]byte, error) { + + tmpl, err := template.ParseFS(schemaFile, fileName) + if err != nil { + return nil, err + } + + var result bytes.Buffer + err = tmpl.Execute(&result, cfg) + + if err != nil { + return nil, err + } + + return result.Bytes(), nil +} + +func getQueriesFromBytes(queryFile []byte) ([]string, error) { + lines := bytes.Split(queryFile, []byte("\n")) + + var extractedLines [][]byte + + for _, line := range lines { + // Remove any comments, if at the end of the line + commentIndex := bytes.Index(line, []byte(`--`)) + if commentIndex != -1 { + // remove everything after comment + line = line[0:commentIndex] + } + + if len(line) == 0 { + continue + } + + extractedLines = append(extractedLines, bytes.TrimSpace(line)) + } + + var queries []string + + // Construct individual queries strings + var queryString string + for _, line := range extractedLines { + queryString += string(line) + if bytes.HasSuffix(line, []byte(";")) { + queries = append(queries, queryString) + queryString = "" + } + } + + if len(queryString) > 0 { + return nil, fmt.Errorf(`Invalid template`) + } + + return queries, nil +} + +func getCassandraQueriesFromQueryStrings(session cassandra.Session, queries []string) []cassandra.Query { + var casQueries []cassandra.Query + + for _, query := range queries { + casQueries = append(casQueries, session.Query(query)) + } + + return casQueries +} + +func contructSchemaQueries(session cassandra.Session, cfg *config.Schema) ([]cassandra.Query, error) { + err := constructCompleteSchemaConfig(cfg) + if err != nil { + return nil, err + } + + queryFile, err := getQueryFileAsBytes(`v004-go-tmpl.cql.tmpl`, cfg) + if err != nil { + return nil, err + } + + queryStrings, err := getQueriesFromBytes(queryFile) + if err != nil { + return nil, err + } + + casQueries := getCassandraQueriesFromQueryStrings(session, queryStrings) + + return casQueries, nil +} + +func GenerateSchemaIfNotPresent(session cassandra.Session, cfg *config.Schema) error { + casQueries, err := contructSchemaQueries(session, cfg) + if err != nil { + return err + } + + for _, query := range casQueries { + err := query.Exec() + if err != nil { + return err + } + } + + return nil +} diff --git a/plugin/storage/cassandra/schema/schema_test.go b/plugin/storage/cassandra/schema/schema_test.go new file mode 100644 index 00000000000..b21f25f7812 --- /dev/null +++ b/plugin/storage/cassandra/schema/schema_test.go @@ -0,0 +1,19 @@ +package schema + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestTemplateRendering(t *testing.T) { + cfg := DefaultSchemaConfig() + res, err := getQueryFileAsBytes(`v004-go-tmpl-test.cql.tmpl`, &cfg) + require.NoError(t, err) + + queryStrings, err := getQueriesFromBytes(res) + require.NoError(t, err) + + assert.Equal(t, 9, len(queryStrings)) +} diff --git a/plugin/storage/cassandra/schema/v004-go-tmpl-test.cql.tmpl b/plugin/storage/cassandra/schema/v004-go-tmpl-test.cql.tmpl new file mode 100644 index 00000000000..5f37104959f --- /dev/null +++ b/plugin/storage/cassandra/schema/v004-go-tmpl-test.cql.tmpl @@ -0,0 +1,84 @@ +-- There are total 9 queries here + +CREATE KEYSPACE IF NOT EXISTS {{.Keyspace}} WITH replication = {{.Replication}}; + +CREATE TYPE IF NOT EXISTS {{.Keyspace}}.keyvalue ( + key text, + value_type text, + value_string text, + value_bool boolean, + value_long bigint, + value_double double, + value_binary blob +); + +CREATE TYPE IF NOT EXISTS {{.Keyspace}}.log ( + ts bigint, -- microseconds since epoch + fields frozen>> +); + +CREATE TYPE IF NOT EXISTS {{.Keyspace}}.span_ref ( + ref_type text, + trace_id blob, + span_id bigint +); + +CREATE TYPE IF NOT EXISTS {{.Keyspace}}.process ( + service_name text, + tags frozen>> +); + +CREATE TABLE IF NOT EXISTS {{.Keyspace}}.service_names ( + service_name text, + PRIMARY KEY (service_name) +) + WITH compaction = { + 'min_threshold': '4', + 'max_threshold': '32', + 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' + } + AND default_time_to_live = {{.TraceTTL}} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +-- a bucketing strategy may have to be added for tag queries +-- we can make this table even better by adding a timestamp to it +CREATE TABLE IF NOT EXISTS {{.Keyspace}}.tag_index ( + service_name text, + tag_key text, + tag_value text, + start_time bigint, -- microseconds since epoch + trace_id blob, + span_id bigint, + PRIMARY KEY ((service_name, tag_key, tag_value), start_time, trace_id, span_id) +) + WITH CLUSTERING ORDER BY (start_time DESC) + AND compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND default_time_to_live = {{.TraceTTL}} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TYPE IF NOT EXISTS {{.Keyspace}}.dependency ( + parent text, + child text, + call_count bigint, + source text +); + +-- compaction strategy is intentionally different as compared to other tables due to the size of dependencies data +CREATE TABLE IF NOT EXISTS {{.Keyspace}}.dependencies_v2 ( + ts_bucket timestamp, + ts timestamp, + dependencies list>, + PRIMARY KEY (ts_bucket, ts) +) WITH CLUSTERING ORDER BY (ts DESC) + AND compaction = { + 'min_threshold': '4', + 'max_threshold': '32', + 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' + } + AND default_time_to_live = {{.DependenciesTTL}}; \ No newline at end of file diff --git a/plugin/storage/cassandra/schema/v004-go-tmpl.cql.tmpl b/plugin/storage/cassandra/schema/v004-go-tmpl.cql.tmpl new file mode 100644 index 00000000000..5a08e3f9dc0 --- /dev/null +++ b/plugin/storage/cassandra/schema/v004-go-tmpl.cql.tmpl @@ -0,0 +1,200 @@ +CREATE KEYSPACE IF NOT EXISTS {{.Keyspace}} WITH replication = {{.Replication}}; + +CREATE TYPE IF NOT EXISTS {{.Keyspace}}.keyvalue ( + key text, + value_type text, + value_string text, + value_bool boolean, + value_long bigint, + value_double double, + value_binary blob +); + +CREATE TYPE IF NOT EXISTS {{.Keyspace}}.log ( + ts bigint, -- microseconds since epoch + fields frozen>> +); + +CREATE TYPE IF NOT EXISTS {{.Keyspace}}.span_ref ( + ref_type text, + trace_id blob, + span_id bigint +); + +CREATE TYPE IF NOT EXISTS {{.Keyspace}}.process ( + service_name text, + tags frozen>> +); + +-- Notice we have span_hash. This exists only for zipkin backwards compat. Zipkin allows spans with the same ID. +-- Note: Cassandra re-orders non-PK columns alphabetically, so the table looks differently in CQLSH "describe table". +-- start_time is bigint instead of timestamp as we require microsecond precision +CREATE TABLE IF NOT EXISTS {{.Keyspace}}.traces ( + trace_id blob, + span_id bigint, + span_hash bigint, + parent_id bigint, + operation_name text, + flags int, + start_time bigint, -- microseconds since epoch + duration bigint, -- microseconds + tags list>, + logs list>, + refs list>, + process frozen, + PRIMARY KEY (trace_id, span_id, span_hash) +) + WITH compaction = { + 'compaction_window_size': '{{.CompactionWindowSize}}', + 'compaction_window_unit': '{{.CompactionWindowUnit}}', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND default_time_to_live = {{.TraceTTL}} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TABLE IF NOT EXISTS {{.Keyspace}}.service_names ( + service_name text, + PRIMARY KEY (service_name) +) + WITH compaction = { + 'min_threshold': '4', + 'max_threshold': '32', + 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' + } + AND default_time_to_live = {{.TraceTTL}} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TABLE IF NOT EXISTS {{.Keyspace}}.operation_names_v2 ( + service_name text, + span_kind text, + operation_name text, + PRIMARY KEY ((service_name), span_kind, operation_name) +) + WITH compaction = { + 'min_threshold': '4', + 'max_threshold': '32', + 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' + } + AND default_time_to_live = {{.TraceTTL}} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +-- index of trace IDs by service + operation names, sorted by span start_time. +CREATE TABLE IF NOT EXISTS {{.Keyspace}}.service_operation_index ( + service_name text, + operation_name text, + start_time bigint, -- microseconds since epoch + trace_id blob, + PRIMARY KEY ((service_name, operation_name), start_time) +) WITH CLUSTERING ORDER BY (start_time DESC) + AND compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND default_time_to_live = {{.TraceTTL}} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TABLE IF NOT EXISTS {{.Keyspace}}.service_name_index ( + service_name text, + bucket int, + start_time bigint, -- microseconds since epoch + trace_id blob, + PRIMARY KEY ((service_name, bucket), start_time) +) WITH CLUSTERING ORDER BY (start_time DESC) + AND compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND default_time_to_live = {{.TraceTTL}} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TABLE IF NOT EXISTS {{.Keyspace}}.duration_index ( + service_name text, -- service name + operation_name text, -- operation name, or blank for queries without span name + bucket timestamp, -- time bucket, - the start_time of the given span rounded to an hour + duration bigint, -- span duration, in microseconds + start_time bigint, -- microseconds since epoch + trace_id blob, + PRIMARY KEY ((service_name, operation_name, bucket), duration, start_time, trace_id) +) WITH CLUSTERING ORDER BY (duration DESC, start_time DESC) + AND compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND default_time_to_live = {{.TraceTTL}} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +-- a bucketing strategy may have to be added for tag queries +-- we can make this table even better by adding a timestamp to it +CREATE TABLE IF NOT EXISTS {{.Keyspace}}.tag_index ( + service_name text, + tag_key text, + tag_value text, + start_time bigint, -- microseconds since epoch + trace_id blob, + span_id bigint, + PRIMARY KEY ((service_name, tag_key, tag_value), start_time, trace_id, span_id) +) + WITH CLUSTERING ORDER BY (start_time DESC) + AND compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND default_time_to_live = {{.TraceTTL}} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TYPE IF NOT EXISTS {{.Keyspace}}.dependency ( + parent text, + child text, + call_count bigint, + source text +); + +-- compaction strategy is intentionally different as compared to other tables due to the size of dependencies data +CREATE TABLE IF NOT EXISTS {{.Keyspace}}.dependencies_v2 ( + ts_bucket timestamp, + ts timestamp, + dependencies list>, + PRIMARY KEY (ts_bucket, ts) +) WITH CLUSTERING ORDER BY (ts DESC) + AND compaction = { + 'min_threshold': '4', + 'max_threshold': '32', + 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' + } + AND default_time_to_live = {{.DependenciesTTL}}; + +-- adaptive sampling tables +-- ./plugin/storage/cassandra/samplingstore/storage.go +CREATE TABLE IF NOT EXISTS {{.Keyspace}}.operation_throughput ( + bucket int, + ts timeuuid, + throughput text, + PRIMARY KEY(bucket, ts) +) WITH CLUSTERING ORDER BY (ts desc); + +CREATE TABLE IF NOT EXISTS {{.Keyspace}}.sampling_probabilities ( + bucket int, + ts timeuuid, + hostname text, + probabilities text, + PRIMARY KEY(bucket, ts) +) WITH CLUSTERING ORDER BY (ts desc); + +-- distributed lock +-- ./plugin/pkg/distributedlock/cassandra/lock.go +CREATE TABLE IF NOT EXISTS {{.Keyspace}}.leases ( + name text, + owner text, + PRIMARY KEY (name) +); \ No newline at end of file