From 77c253be537b10621da2b10cff4b67b005d0137b Mon Sep 17 00:00:00 2001 From: Xin Hao Zhang Date: Fri, 9 Aug 2024 15:50:06 -0400 Subject: [PATCH 1/4] insights: ensure releasing to Insight pool clears slice Ensure that when we return objects into the Insight pool that we release the statmeents in the slice. This fixes an issue in the logic that returned this object to the pool which did not nil the `Statements` slice, making it possible for these slices to grow in capacity in the node's lifetime, holding onto garbage Statement objects. This is a lead up to #128199 which will likely remove this pool and reuse the existing statmentBuf pool. Epic: none Release note: None --- pkg/sql/sqlstats/insights/pool.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pkg/sql/sqlstats/insights/pool.go b/pkg/sql/sqlstats/insights/pool.go index 91093654891c..ef2c21285303 100644 --- a/pkg/sql/sqlstats/insights/pool.go +++ b/pkg/sql/sqlstats/insights/pool.go @@ -16,6 +16,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/clusterunique" ) +// TODO (xinhaoz): Remove this pool (#128199). The insights object +// can use the existing statementBuf pool for the statements slice. var insightPool = sync.Pool{ New: func() interface{} { return new(Insight) @@ -32,6 +34,9 @@ func makeInsight(sessionID clusterunique.ID, transaction *Transaction) *Insight } func releaseInsight(insight *Insight) { + for i := range insight.Statements { + insight.Statements[i] = nil + } insight.Statements = insight.Statements[:0] *insight = Insight{Statements: insight.Statements} insightPool.Put(insight) From 0671da8a26ceff6d5909b5f1f93e9fa1aecb4914 Mon Sep 17 00:00:00 2001 From: Xin Hao Zhang Date: Fri, 9 Aug 2024 16:10:34 -0400 Subject: [PATCH 2/4] insights: add testing knobs Add insights specific testing knobs. We'll add some knobs in later commts. Epic: none Release note: None --- pkg/base/testing_knobs.go | 1 + pkg/server/server_sql.go | 6 +++++ pkg/sql/conn_executor.go | 2 +- pkg/sql/exec_util.go | 2 ++ pkg/sql/sqlstats/insights/BUILD.bazel | 1 + pkg/sql/sqlstats/insights/ingester_test.go | 10 ++++----- pkg/sql/sqlstats/insights/insights.go | 4 ++-- pkg/sql/sqlstats/insights/insights_test.go | 2 +- pkg/sql/sqlstats/insights/registry.go | 22 +++++++++++-------- pkg/sql/sqlstats/insights/test_utils.go | 18 +++++++++++++++ pkg/sql/sqlstats/sslocal/sql_stats_test.go | 6 ++--- .../ssmemstorage/ss_mem_writer_test.go | 4 ++-- 12 files changed, 55 insertions(+), 23 deletions(-) create mode 100644 pkg/sql/sqlstats/insights/test_utils.go diff --git a/pkg/base/testing_knobs.go b/pkg/base/testing_knobs.go index 54b3f29d7734..f145195e9cc7 100644 --- a/pkg/base/testing_knobs.go +++ b/pkg/base/testing_knobs.go @@ -59,4 +59,5 @@ type TestingKnobs struct { KeyVisualizer ModuleTestingKnobs TenantCapabilitiesTestingKnobs ModuleTestingKnobs TableStatsKnobs ModuleTestingKnobs + Insights ModuleTestingKnobs } diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 2b9206216f41..bd7c7ac30af5 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -108,6 +108,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slprovider" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" + "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/insights" "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics" "github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilegecache" @@ -1136,6 +1137,11 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { if externalConnKnobs := cfg.TestingKnobs.ExternalConnection; externalConnKnobs != nil { execCfg.ExternalConnectionTestingKnobs = externalConnKnobs.(*externalconn.TestingKnobs) } + + if insightsKnobs := cfg.TestingKnobs.Insights; insightsKnobs != nil { + execCfg.InsightsTestingKnobs = insightsKnobs.(*insights.TestingKnobs) + + } var tableStatsTestingKnobs *stats.TableStatsTestingKnobs if tableStatsKnobs := cfg.TestingKnobs.TableStatsKnobs; tableStatsKnobs != nil { tableStatsTestingKnobs = tableStatsKnobs.(*stats.TableStatsTestingKnobs) diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index a57100f25ff5..7aff15791897 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -415,7 +415,7 @@ type ServerMetrics struct { func NewServer(cfg *ExecutorConfig, pool *mon.BytesMonitor) *Server { metrics := makeMetrics(false /* internal */) serverMetrics := makeServerMetrics(cfg) - insightsProvider := insights.New(cfg.Settings, serverMetrics.InsightsMetrics) + insightsProvider := insights.New(cfg.Settings, serverMetrics.InsightsMetrics, cfg.InsightsTestingKnobs) // TODO(117690): Unify StmtStatsEnable and TxnStatsEnable into a single cluster setting. sqlstats.TxnStatsEnable.SetOnChange(&cfg.Settings.SV, func(_ context.Context) { if !sqlstats.TxnStatsEnable.Get(&cfg.Settings.SV) { diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index f7f8b86f61d1..ac563b4ff92b 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -103,6 +103,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sessionphase" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" + "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/insights" "github.com/cockroachdb/cockroach/pkg/sql/stats" "github.com/cockroachdb/cockroach/pkg/sql/stmtdiagnostics" "github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilegecache" @@ -1302,6 +1303,7 @@ type ExecutorConfig struct { UnusedIndexRecommendationsKnobs *idxusage.UnusedIndexRecommendationTestingKnobs ExternalConnectionTestingKnobs *externalconn.TestingKnobs EventLogTestingKnobs *EventLogTestingKnobs + InsightsTestingKnobs *insights.TestingKnobs // HistogramWindowInterval is (server.Config).HistogramWindowInterval. HistogramWindowInterval time.Duration diff --git a/pkg/sql/sqlstats/insights/BUILD.bazel b/pkg/sql/sqlstats/insights/BUILD.bazel index fe161b45623b..c1cb1b123dbd 100644 --- a/pkg/sql/sqlstats/insights/BUILD.bazel +++ b/pkg/sql/sqlstats/insights/BUILD.bazel @@ -13,6 +13,7 @@ go_library( "provider.go", "registry.go", "store.go", + "test_utils.go", ], embed = [":insights_go_proto"], importpath = "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/insights", diff --git a/pkg/sql/sqlstats/insights/ingester_test.go b/pkg/sql/sqlstats/insights/ingester_test.go index ec36f28b4480..6db09d08ad52 100644 --- a/pkg/sql/sqlstats/insights/ingester_test.go +++ b/pkg/sql/sqlstats/insights/ingester_test.go @@ -79,7 +79,7 @@ func TestIngester(t *testing.T) { newRegistry(st, &fakeDetector{ stubEnabled: true, stubIsSlow: true, - }, store), + }, store, nil), ) ingester.Start(ctx, stopper) @@ -134,7 +134,7 @@ func TestIngester_Clear(t *testing.T) { newRegistry(settings, &fakeDetector{ stubEnabled: true, stubIsSlow: true, - }, store)) + }, store, nil)) // Fill the ingester's buffer with some data. This sets us up to // call Clear() with guaranteed data in the buffer, so we can assert @@ -181,7 +181,7 @@ func TestIngester_Disabled(t *testing.T) { // the underlying registry is currently disabled. st := cluster.MakeTestingClusterSettings() - ingester := newConcurrentBufferIngester(newRegistry(st, &fakeDetector{}, newStore(st))) + ingester := newConcurrentBufferIngester(newRegistry(st, &fakeDetector{}, newStore(st), nil)) ingester.ObserveStatement(clusterunique.ID{}, &Statement{}) ingester.ObserveTransaction(clusterunique.ID{}, &Transaction{}) require.Equal(t, event{}, ingester.guard.eventBuffer[0]) @@ -200,7 +200,7 @@ func TestIngester_DoesNotBlockWhenReceivingManyObservationsAfterShutdown(t *test defer stopper.Stop(ctx) st := cluster.MakeTestingClusterSettings() - registry := newRegistry(st, &fakeDetector{stubEnabled: true}, newStore(st)) + registry := newRegistry(st, &fakeDetector{stubEnabled: true}, newStore(st), nil) ingester := newConcurrentBufferIngester(registry) ingester.Start(ctx, stopper) @@ -259,7 +259,7 @@ func TestIngesterBlockedForceSync(t *testing.T) { defer stopper.Stop(ctx) st := cluster.MakeTestingClusterSettings() - registry := newRegistry(st, &fakeDetector{stubEnabled: true}, newStore(st)) + registry := newRegistry(st, &fakeDetector{stubEnabled: true}, newStore(st), nil) ingester := newConcurrentBufferIngester(registry) // We queue up a bunch of sync operations because it's unclear how diff --git a/pkg/sql/sqlstats/insights/insights.go b/pkg/sql/sqlstats/insights/insights.go index 9a92e3ff10bc..4840950223f2 100644 --- a/pkg/sql/sqlstats/insights/insights.go +++ b/pkg/sql/sqlstats/insights/insights.go @@ -138,7 +138,7 @@ type PercentileValues struct { } // New builds a new Provider. -func New(st *cluster.Settings, metrics Metrics) *Provider { +func New(st *cluster.Settings, metrics Metrics, knobs *TestingKnobs) *Provider { store := newStore(st) anomalyDetector := newAnomalyDetector(st, metrics) @@ -148,7 +148,7 @@ func New(st *cluster.Settings, metrics Metrics) *Provider { newRegistry(st, &compositeDetector{detectors: []detector{ &latencyThresholdDetector{st: st}, anomalyDetector, - }}, store), + }}, store, knobs), ), anomalyDetector: anomalyDetector, } diff --git a/pkg/sql/sqlstats/insights/insights_test.go b/pkg/sql/sqlstats/insights/insights_test.go index f9dfbe1d8f9d..55c322d63273 100644 --- a/pkg/sql/sqlstats/insights/insights_test.go +++ b/pkg/sql/sqlstats/insights/insights_test.go @@ -45,7 +45,7 @@ func BenchmarkInsights(b *testing.B) { // down, guiding us as we tune buffer sizes, etc. for _, numSessions := range []int{1, 10, 100, 1000, 10000} { b.Run(fmt.Sprintf("numSessions=%d", numSessions), func(b *testing.B) { - provider := insights.New(settings, insights.NewMetrics()) + provider := insights.New(settings, insights.NewMetrics(), nil) provider.Start(ctx, stopper) // Spread the b.N work across the simulated SQL sessions, so that we diff --git a/pkg/sql/sqlstats/insights/registry.go b/pkg/sql/sqlstats/insights/registry.go index 8ee12d7ecace..56c7f1076aa2 100644 --- a/pkg/sql/sqlstats/insights/registry.go +++ b/pkg/sql/sqlstats/insights/registry.go @@ -23,10 +23,11 @@ import ( // statement execution to determine which statements are outliers and // writes insights into the provided sink. type lockingRegistry struct { - statements map[clusterunique.ID]*statementBuf - detector detector - causes *causes - store *LockingStore + statements map[clusterunique.ID]*statementBuf + detector detector + causes *causes + store *LockingStore + testingKnobs *TestingKnobs } func (r *lockingRegistry) Clear() { @@ -195,11 +196,14 @@ func (r *lockingRegistry) enabled() bool { return r.detector.enabled() } -func newRegistry(st *cluster.Settings, detector detector, store *LockingStore) *lockingRegistry { +func newRegistry( + st *cluster.Settings, detector detector, store *LockingStore, knobs *TestingKnobs, +) *lockingRegistry { return &lockingRegistry{ - statements: make(map[clusterunique.ID]*statementBuf), - detector: detector, - causes: &causes{st: st}, - store: store, + statements: make(map[clusterunique.ID]*statementBuf), + detector: detector, + causes: &causes{st: st}, + store: store, + testingKnobs: knobs, } } diff --git a/pkg/sql/sqlstats/insights/test_utils.go b/pkg/sql/sqlstats/insights/test_utils.go new file mode 100644 index 000000000000..1a329b8badb3 --- /dev/null +++ b/pkg/sql/sqlstats/insights/test_utils.go @@ -0,0 +1,18 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package insights + +// TestingKnobs provides hooks and testingKnobs for unit tests. +type TestingKnobs struct { +} + +// ModuleTestingKnobs implements base.ModuleTestingKnobs interface. +func (*TestingKnobs) ModuleTestingKnobs() {} diff --git a/pkg/sql/sqlstats/sslocal/sql_stats_test.go b/pkg/sql/sqlstats/sslocal/sql_stats_test.go index 2cc3ab648e87..54df0395baf8 100644 --- a/pkg/sql/sqlstats/sslocal/sql_stats_test.go +++ b/pkg/sql/sqlstats/sslocal/sql_stats_test.go @@ -448,7 +448,7 @@ func TestExplicitTxnFingerprintAccounting(t *testing.T) { Settings: st, }) - insightsProvider := insights.New(st, insights.NewMetrics()) + insightsProvider := insights.New(st, insights.NewMetrics(), nil) sqlStats := sslocal.New( st, sqlstats.MaxMemSQLStatsStmtFingerprints, @@ -576,7 +576,7 @@ func TestAssociatingStmtStatsWithTxnFingerprint(t *testing.T) { require.NoError(t, err) // Construct the SQL Stats machinery. - insightsProvider := insights.New(st, insights.NewMetrics()) + insightsProvider := insights.New(st, insights.NewMetrics(), nil) sqlStats := sslocal.New( st, sqlstats.MaxMemSQLStatsStmtFingerprints, @@ -1725,7 +1725,7 @@ func TestSQLStats_ConsumeStats(t *testing.T) { Name: "test", Settings: st, }) - insightsProvider := insights.New(st, insights.NewMetrics()) + insightsProvider := insights.New(st, insights.NewMetrics(), nil) sqlStats := sslocal.New( st, diff --git a/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer_test.go b/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer_test.go index ed3f755bea3e..0241065fd5c0 100644 --- a/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer_test.go +++ b/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer_test.go @@ -48,7 +48,7 @@ func TestRecordStatement(t *testing.T) { testMonitor(ctx, "test-mon", settings), "test-app", knobs, - insights.New(settings, insights.NewMetrics()).Anomalies(), + insights.New(settings, insights.NewMetrics(), nil).Anomalies(), ) // Record a statement, ensure no insights are generated. statsKey := appstatspb.StatementStatisticsKey{ @@ -82,7 +82,7 @@ func TestRecordTransaction(t *testing.T) { testMonitor(ctx, "test-mon", settings), "test-app", knobs, - insights.New(settings, insights.NewMetrics()).Anomalies(), + insights.New(settings, insights.NewMetrics(), nil).Anomalies(), ) // Record a transaction, ensure no insights are generated. require.NoError(t, memContainer.RecordTransaction(ctx, appstatspb.TransactionFingerprintID(123), sqlstats.RecordedTxnStats{})) From e0431e0b19f4901b5d82197580356fba7a7c7d6d Mon Sep 17 00:00:00 2001 From: Xin Hao Zhang Date: Mon, 5 Aug 2024 11:01:37 -0400 Subject: [PATCH 3/4] sqlstats/insights: free memory allocated per session on session close The insights locking registry buffers statement insight objects by session id until receiving a transaction insight, when the buffer is emptied. These buffers can leak if the session is closed midway through a transaction since the registry will never receive a transaction event for the session. This commit ensures we clear any memory allocated in insights for a session by sending an event to clear the container's session entry, if it exists, on session close. A testing knob was added, OnCloseSession, which is called when the locking registry clears a session. Epic: none Fixes: #128213 Release note (bug fix): Fixes a memory leak where statement insight objects may leak if the session is closed without the transaction finishing. --- pkg/sql/conn_executor.go | 4 +- pkg/sql/sqlstats/insights/ingester.go | 12 ++++ .../sqlstats/insights/integration/BUILD.bazel | 1 + .../insights/integration/insights_test.go | 42 ++++++++++++++ pkg/sql/sqlstats/insights/registry.go | 12 ++++ pkg/sql/sqlstats/insights/registry_test.go | 57 +++++++++++++++---- pkg/sql/sqlstats/insights/test_utils.go | 5 ++ pkg/sql/sqlstats/sslocal/BUILD.bazel | 1 + .../sslocal/sslocal_stats_collector.go | 13 ++++- 9 files changed, 132 insertions(+), 15 deletions(-) diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 7aff15791897..e278db80622e 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -645,7 +645,7 @@ func (s *Server) GetIndexUsageStatsController() *idxusage.Controller { return s.indexUsageStatsController } -// GetInsightsReader returns the insights.Reader for the current sql.Server's +// GetInsightsReader returns the insights store for the current sql.Server's // detected execution insights. func (s *Server) GetInsightsReader() *insights.LockingStore { return s.insights.Store() @@ -1267,7 +1267,7 @@ func (ex *connExecutor) close(ctx context.Context, closeType closeType) { } // Free any memory used by the stats collector. - ex.statsCollector.Free(ctx) + ex.statsCollector.Close(ctx, ex.planner.extendedEvalCtx.SessionID) var payloadErr error if closeType == normalClose { diff --git a/pkg/sql/sqlstats/insights/ingester.go b/pkg/sql/sqlstats/insights/ingester.go index 64c342c10e18..d4572705826f 100644 --- a/pkg/sql/sqlstats/insights/ingester.go +++ b/pkg/sql/sqlstats/insights/ingester.go @@ -145,6 +145,8 @@ func (i *ConcurrentBufferIngester) ingest(events *eventBuffer) { i.registry.ObserveStatement(e.sessionID, e.statement) } else if e.transaction != nil { i.registry.ObserveTransaction(e.sessionID, e.transaction) + } else if e.sessionID != (clusterunique.ID{}) { + i.registry.clearSession(e.sessionID) } events[idx] = event{} } @@ -178,6 +180,16 @@ func (i *ConcurrentBufferIngester) ObserveTransaction( }) } +// ClearSession sends a signal to the underlying registry to clear any cached +// data associated with the given sessionID. This is an async operation. +func (i *ConcurrentBufferIngester) ClearSession(sessionID clusterunique.ID) { + i.guard.AtomicWrite(func(writerIdx int64) { + i.guard.eventBuffer[writerIdx] = event{ + sessionID: sessionID, + } + }) +} + func newConcurrentBufferIngester(registry *lockingRegistry) *ConcurrentBufferIngester { i := &ConcurrentBufferIngester{ // A channel size of 1 is sufficient to avoid unnecessarily diff --git a/pkg/sql/sqlstats/insights/integration/BUILD.bazel b/pkg/sql/sqlstats/insights/integration/BUILD.bazel index 7bc45adf7dca..e8abbcbb0118 100644 --- a/pkg/sql/sqlstats/insights/integration/BUILD.bazel +++ b/pkg/sql/sqlstats/insights/integration/BUILD.bazel @@ -12,6 +12,7 @@ go_test( "//pkg/settings/cluster", "//pkg/sql", "//pkg/sql/appstatspb", + "//pkg/sql/clusterunique", "//pkg/sql/contention", "//pkg/sql/sessiondata", "//pkg/sql/sqlstats/insights", diff --git a/pkg/sql/sqlstats/insights/integration/insights_test.go b/pkg/sql/sqlstats/insights/integration/insights_test.go index 4cdccd39433a..9ce3ac524e7d 100644 --- a/pkg/sql/sqlstats/insights/integration/insights_test.go +++ b/pkg/sql/sqlstats/insights/integration/insights_test.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/appstatspb" + "github.com/cockroachdb/cockroach/pkg/sql/clusterunique" "github.com/cockroachdb/cockroach/pkg/sql/contention" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/insights" @@ -957,3 +958,44 @@ func TestInsightsIndexRecommendationIntegration(t *testing.T) { return nil }, 1*time.Second) } + +// TestInsightsClearsPerSessionMemory ensures that memory allocated +// for a session is freed when that session is closed. +func TestInsightsClearsPerSessionMemory(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + clearedSessionID := clusterunique.ID{} + ts := serverutils.StartServerOnly(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Insights: &insights.TestingKnobs{ + OnSessionClear: func(sessionID clusterunique.ID) { + clearedSessionID = sessionID + }, + }, + }, + }) + defer ts.Stopper().Stop(ctx) + s := ts.ApplicationLayer() + conn1 := sqlutils.MakeSQLRunner(s.SQLConn(t)) + conn2 := sqlutils.MakeSQLRunner(s.SQLConn(t)) + + var sessionID1 string + conn1.QueryRow(t, "SHOW session_id").Scan(&sessionID1) + + // Start a transaction and cancel the session - ensure that the memory is freed. + conn1.Exec(t, "BEGIN") + for i := 0; i < 5; i++ { + conn1.Exec(t, "SELECT 1") + } + + conn2.Exec(t, "CANCEL SESSION $1", sessionID1) + + testutils.SucceedsSoon(t, func() error { + if clearedSessionID.String() != sessionID1 { + return fmt.Errorf("expected session id %s, found %s", sessionID1, clearedSessionID) + } + return nil + }) +} diff --git a/pkg/sql/sqlstats/insights/registry.go b/pkg/sql/sqlstats/insights/registry.go index 56c7f1076aa2..e972ab979080 100644 --- a/pkg/sql/sqlstats/insights/registry.go +++ b/pkg/sql/sqlstats/insights/registry.go @@ -186,6 +186,18 @@ func (r *lockingRegistry) ObserveTransaction(sessionID clusterunique.ID, transac r.store.addInsight(insight) } +// clearSession removes the session from the registry and releases the +// associated statement buffer. +func (r *lockingRegistry) clearSession(sessionID clusterunique.ID) { + if b, ok := r.statements[sessionID]; ok { + delete(r.statements, sessionID) + b.release() + if r.testingKnobs != nil && r.testingKnobs.OnSessionClear != nil { + r.testingKnobs.OnSessionClear(sessionID) + } + } +} + // TODO(todd): // // Once we can handle sufficient throughput to live on the hot diff --git a/pkg/sql/sqlstats/insights/registry_test.go b/pkg/sql/sqlstats/insights/registry_test.go index 62b8b4287140..cb5252d673f5 100644 --- a/pkg/sql/sqlstats/insights/registry_test.go +++ b/pkg/sql/sqlstats/insights/registry_test.go @@ -58,7 +58,7 @@ func TestRegistry(t *testing.T) { st := cluster.MakeTestingClusterSettings() LatencyThreshold.Override(ctx, &st.SV, 1*time.Second) store := newStore(st) - registry := newRegistry(st, &latencyThresholdDetector{st: st}, store) + registry := newRegistry(st, &latencyThresholdDetector{st: st}, store, nil) registry.ObserveStatement(session.ID, statement) registry.ObserveTransaction(session.ID, transaction) @@ -96,7 +96,7 @@ func TestRegistry(t *testing.T) { st := cluster.MakeTestingClusterSettings() LatencyThreshold.Override(ctx, &st.SV, 1*time.Second) store := newStore(st) - registry := newRegistry(st, &latencyThresholdDetector{st: st}, store) + registry := newRegistry(st, &latencyThresholdDetector{st: st}, store, nil) registry.ObserveStatement(session.ID, statement) // Transaction status is set during transaction stats recorded based on // if the transaction committed. We'll inject the failure here to align @@ -138,7 +138,7 @@ func TestRegistry(t *testing.T) { st := cluster.MakeTestingClusterSettings() LatencyThreshold.Override(ctx, &st.SV, 0) store := newStore(st) - registry := newRegistry(st, &latencyThresholdDetector{st: st}, store) + registry := newRegistry(st, &latencyThresholdDetector{st: st}, store, nil) registry.ObserveStatement(session.ID, statement) registry.ObserveTransaction(session.ID, transaction) @@ -162,7 +162,7 @@ func TestRegistry(t *testing.T) { LatencyInSeconds: 0.5, } store := newStore(st) - registry := newRegistry(st, &latencyThresholdDetector{st: st}, store) + registry := newRegistry(st, &latencyThresholdDetector{st: st}, store, nil) registry.ObserveStatement(session.ID, statement2) registry.ObserveTransaction(session.ID, transaction) @@ -195,7 +195,7 @@ func TestRegistry(t *testing.T) { st := cluster.MakeTestingClusterSettings() LatencyThreshold.Override(ctx, &st.SV, 1*time.Second) store := newStore(st) - registry := newRegistry(st, &latencyThresholdDetector{st: st}, store) + registry := newRegistry(st, &latencyThresholdDetector{st: st}, store, nil) registry.ObserveStatement(session.ID, statement) registry.ObserveStatement(otherSession.ID, otherStatement) registry.ObserveTransaction(session.ID, transaction) @@ -246,7 +246,7 @@ func TestRegistry(t *testing.T) { st := cluster.MakeTestingClusterSettings() LatencyThreshold.Override(ctx, &st.SV, 1*time.Second) store := newStore(st) - registry := newRegistry(st, &latencyThresholdDetector{st: st}, store) + registry := newRegistry(st, &latencyThresholdDetector{st: st}, store, nil) registry.ObserveStatement(session.ID, statement) registry.ObserveStatement(session.ID, siblingStatement) registry.ObserveTransaction(session.ID, transaction) @@ -276,7 +276,7 @@ func TestRegistry(t *testing.T) { t.Run("txn with no stmts", func(t *testing.T) { transaction := &Transaction{ID: uuid.MakeV4()} st := cluster.MakeTestingClusterSettings() - registry := newRegistry(st, &latencyThresholdDetector{st: st}, newStore(st)) + registry := newRegistry(st, &latencyThresholdDetector{st: st}, newStore(st), nil) require.NotPanics(t, func() { registry.ObserveTransaction(session.ID, transaction) }) }) @@ -284,7 +284,7 @@ func TestRegistry(t *testing.T) { transaction := &Transaction{ID: uuid.MakeV4()} st := cluster.MakeTestingClusterSettings() store := newStore(st) - registry := newRegistry(st, &latencyThresholdDetector{st: st}, store) + registry := newRegistry(st, &latencyThresholdDetector{st: st}, store, nil) contentionDuration := 10 * time.Second statement := &Statement{ Status: Statement_Completed, @@ -349,7 +349,7 @@ func TestRegistry(t *testing.T) { st := cluster.MakeTestingClusterSettings() LatencyThreshold.Override(ctx, &st.SV, 1*time.Second) store := newStore(st) - registry := newRegistry(st, &latencyThresholdDetector{st: st}, store) + registry := newRegistry(st, &latencyThresholdDetector{st: st}, store, nil) registry.ObserveStatement(session.ID, statementNotIgnored) registry.ObserveStatement(session.ID, statementIgnoredSet) registry.ObserveStatement(session.ID, statementIgnoredExplain) @@ -389,7 +389,7 @@ func TestInsightsRegistry_Clear(t *testing.T) { st := cluster.MakeTestingClusterSettings() LatencyThreshold.Override(ctx, &st.SV, 1*time.Second) store := newStore(st) - registry := newRegistry(st, &latencyThresholdDetector{st: st}, store) + registry := newRegistry(st, &latencyThresholdDetector{st: st}, store, nil) // Create some test data. sessionA := Session{ID: clusterunique.IDFromBytes([]byte("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"))} sessionB := Session{ID: clusterunique.IDFromBytes([]byte("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"))} @@ -403,9 +403,46 @@ func TestInsightsRegistry_Clear(t *testing.T) { registry.ObserveStatement(sessionA.ID, statement) registry.ObserveStatement(sessionB.ID, statement) expLenStmts := 2 + // No need to acquire the lock here, as the registry is not attached to anything. require.Len(t, registry.statements, expLenStmts) // Now clear the cache, assert it's cleared. registry.Clear() require.Empty(t, registry.statements) }) } + +func TestInsightsRegistry_ClearSession(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // Initialize the registry. + st := cluster.MakeTestingClusterSettings() + store := newStore(st) + registry := newRegistry(st, &latencyThresholdDetector{st: st}, store, nil) + + // Create some test data. + sessionA := Session{ID: clusterunique.IDFromBytes([]byte("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"))} + sessionB := Session{ID: clusterunique.IDFromBytes([]byte("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"))} + statement := &Statement{ + Status: Statement_Completed, + ID: clusterunique.IDFromBytes([]byte("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")), + FingerprintID: appstatspb.StmtFingerprintID(100), + LatencyInSeconds: 2, + } + + // Record the test data, assert it's cached. + registry.ObserveStatement(sessionA.ID, statement) + registry.ObserveStatement(sessionB.ID, statement) + // No need to acquire the lock here, as the registry is not attached to anything. + require.Len(t, registry.statements, 2) + + // Clear the cache, assert it's cleared. + registry.clearSession(sessionA.ID) + + // sessionA should be removed, sessionB should still be present. + b, ok := registry.statements[sessionA.ID] + require.False(t, ok) + require.Nil(t, b) + require.Len(t, registry.statements, 1) + require.NotEmpty(t, registry.statements[sessionB.ID]) +} diff --git a/pkg/sql/sqlstats/insights/test_utils.go b/pkg/sql/sqlstats/insights/test_utils.go index 1a329b8badb3..778a5c82d6dc 100644 --- a/pkg/sql/sqlstats/insights/test_utils.go +++ b/pkg/sql/sqlstats/insights/test_utils.go @@ -10,8 +10,13 @@ package insights +import "github.com/cockroachdb/cockroach/pkg/sql/clusterunique" + // TestingKnobs provides hooks and testingKnobs for unit tests. type TestingKnobs struct { + // OnSessionClear is a callback that is triggered when the locking + // registry clears a session entry. + OnSessionClear func(sessionID clusterunique.ID) } // ModuleTestingKnobs implements base.ModuleTestingKnobs interface. diff --git a/pkg/sql/sqlstats/sslocal/BUILD.bazel b/pkg/sql/sqlstats/sslocal/BUILD.bazel index 5030b695b68e..cf9813d4236e 100644 --- a/pkg/sql/sqlstats/sslocal/BUILD.bazel +++ b/pkg/sql/sqlstats/sslocal/BUILD.bazel @@ -19,6 +19,7 @@ go_library( "//pkg/settings", "//pkg/settings/cluster", "//pkg/sql/appstatspb", + "//pkg/sql/clusterunique", "//pkg/sql/execstats", "//pkg/sql/pgwire/pgerror", "//pkg/sql/sessionphase", diff --git a/pkg/sql/sqlstats/sslocal/sslocal_stats_collector.go b/pkg/sql/sqlstats/sslocal/sslocal_stats_collector.go index af88d6faa2e8..069f83ebac60 100644 --- a/pkg/sql/sqlstats/sslocal/sslocal_stats_collector.go +++ b/pkg/sql/sqlstats/sslocal/sslocal_stats_collector.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/appstatspb" + "github.com/cockroachdb/cockroach/pkg/sql/clusterunique" "github.com/cockroachdb/cockroach/pkg/sql/execstats" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sessionphase" @@ -26,7 +27,8 @@ import ( ) // StatsCollector is used to collect statistics for transactions and -// statements for the entire lifetime of a session. +// statements for the entire lifetime of a session. It must be closed +// with Close() when the session is done. type StatsCollector struct { // currentTransactionStatementStats contains the current transaction's statement @@ -139,8 +141,10 @@ func (s *StatsCollector) Reset(appStats sqlstats.ApplicationStats, phaseTime *se s.stmtFingerprintID = 0 } -// Free frees any local memory used by the stats collector. -func (s *StatsCollector) Free(ctx context.Context) { +// Close frees any local memory used by the stats collector and +// any memory allocated by underlying sql stats systems for the session +// that owns this stats collector. +func (s *StatsCollector) Close(ctx context.Context, sessionID clusterunique.ID) { // For stats collectors for executors with outer transactions, // the currentTransactionStatementStats is the flush target. // We should make sure we're never freeing the flush target, @@ -148,6 +152,9 @@ func (s *StatsCollector) Free(ctx context.Context) { if s.currentTransactionStatementStats != s.flushTarget { s.currentTransactionStatementStats.Free(ctx) } + if s.insightsWriter != nil { + s.insightsWriter.ClearSession(sessionID) + } } // StartTransaction sets up the StatsCollector for a new transaction. From 2da68632a284e64fb43b8b48cde4263354401d1a Mon Sep 17 00:00:00 2001 From: Xin Hao Zhang Date: Fri, 9 Aug 2024 16:45:00 -0400 Subject: [PATCH 4/4] insights: move insights testing knobs from sqlstats Move some insights testing knobs that were on the sqlstats testing knobs to insights pkg. Epic: none Release note: None --- pkg/sql/sqlstats/BUILD.bazel | 1 - pkg/sql/sqlstats/insights/ingester.go | 15 ++++++++++++++- .../insights/integration/insights_test.go | 10 ++++------ pkg/sql/sqlstats/insights/test_utils.go | 9 +++++++++ .../sslocal/sslocal_stats_collector.go | 8 ++------ .../ssmemstorage/ss_mem_writer_test.go | 14 +++++++------- pkg/sql/sqlstats/test_utils.go | 18 +----------------- 7 files changed, 37 insertions(+), 38 deletions(-) diff --git a/pkg/sql/sqlstats/BUILD.bazel b/pkg/sql/sqlstats/BUILD.bazel index f79e09f8ddb1..15b92c2fc5f7 100644 --- a/pkg/sql/sqlstats/BUILD.bazel +++ b/pkg/sql/sqlstats/BUILD.bazel @@ -21,7 +21,6 @@ go_library( "//pkg/sql/execstats", "//pkg/sql/sem/tree", "//pkg/sql/sessiondata", - "//pkg/sql/sqlstats/insights", "//pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil", "//pkg/util/log", "//pkg/util/metric", diff --git a/pkg/sql/sqlstats/insights/ingester.go b/pkg/sql/sqlstats/insights/ingester.go index d4572705826f..1462b637f901 100644 --- a/pkg/sql/sqlstats/insights/ingester.go +++ b/pkg/sql/sqlstats/insights/ingester.go @@ -40,7 +40,8 @@ type ConcurrentBufferIngester struct { registry *lockingRegistry clearRegistry uint32 - closeCh chan struct{} + closeCh chan struct{} + testingKnobs *TestingKnobs } type eventBufChPayload struct { @@ -158,6 +159,12 @@ func (i *ConcurrentBufferIngester) ObserveStatement( if !i.registry.enabled() { return } + + if i.testingKnobs != nil && i.testingKnobs.InsightsWriterStmtInterceptor != nil { + i.testingKnobs.InsightsWriterStmtInterceptor(sessionID, statement) + return + } + i.guard.AtomicWrite(func(writerIdx int64) { i.guard.eventBuffer[writerIdx] = event{ sessionID: sessionID, @@ -172,6 +179,12 @@ func (i *ConcurrentBufferIngester) ObserveTransaction( if !i.registry.enabled() { return } + + if i.testingKnobs != nil && i.testingKnobs.InsightsWriterTxnInterceptor != nil { + i.testingKnobs.InsightsWriterTxnInterceptor(sessionID, transaction) + return + } + i.guard.AtomicWrite(func(writerIdx int64) { i.guard.eventBuffer[writerIdx] = event{ sessionID: sessionID, diff --git a/pkg/sql/sqlstats/insights/integration/insights_test.go b/pkg/sql/sqlstats/insights/integration/insights_test.go index 9ce3ac524e7d..80af6205d173 100644 --- a/pkg/sql/sqlstats/insights/integration/insights_test.go +++ b/pkg/sql/sqlstats/insights/integration/insights_test.go @@ -966,11 +966,13 @@ func TestInsightsClearsPerSessionMemory(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() + sessionClosedCh := make(chan struct{}) clearedSessionID := clusterunique.ID{} ts := serverutils.StartServerOnly(t, base.TestServerArgs{ Knobs: base.TestingKnobs{ Insights: &insights.TestingKnobs{ OnSessionClear: func(sessionID clusterunique.ID) { + defer close(sessionClosedCh) clearedSessionID = sessionID }, }, @@ -992,10 +994,6 @@ func TestInsightsClearsPerSessionMemory(t *testing.T) { conn2.Exec(t, "CANCEL SESSION $1", sessionID1) - testutils.SucceedsSoon(t, func() error { - if clearedSessionID.String() != sessionID1 { - return fmt.Errorf("expected session id %s, found %s", sessionID1, clearedSessionID) - } - return nil - }) + <-sessionClosedCh + require.Equal(t, clearedSessionID.String(), sessionID1) } diff --git a/pkg/sql/sqlstats/insights/test_utils.go b/pkg/sql/sqlstats/insights/test_utils.go index 778a5c82d6dc..734d2fb9def2 100644 --- a/pkg/sql/sqlstats/insights/test_utils.go +++ b/pkg/sql/sqlstats/insights/test_utils.go @@ -17,6 +17,15 @@ type TestingKnobs struct { // OnSessionClear is a callback that is triggered when the locking // registry clears a session entry. OnSessionClear func(sessionID clusterunique.ID) + + // InsightsWriterTxnInterceptor is a callback that's triggered when a txn insight + // is observed by the ingester. The callback is called instead of writing the + // insight to the buffer. + InsightsWriterTxnInterceptor func(sessionID clusterunique.ID, transaction *Transaction) + + // InsightsWriterStmtInterceptor is a callback that's triggered when a stmt insight + // is observed. The callback is called instead of writing the insight to the buffer. + InsightsWriterStmtInterceptor func(sessionID clusterunique.ID, statement *Statement) } // ModuleTestingKnobs implements base.ModuleTestingKnobs interface. diff --git a/pkg/sql/sqlstats/sslocal/sslocal_stats_collector.go b/pkg/sql/sqlstats/sslocal/sslocal_stats_collector.go index 069f83ebac60..04525523a4b4 100644 --- a/pkg/sql/sqlstats/sslocal/sslocal_stats_collector.go +++ b/pkg/sql/sqlstats/sslocal/sslocal_stats_collector.go @@ -279,9 +279,7 @@ func (s *StatsCollector) ObserveStatement( ErrorCode: errorCode, ErrorMsg: errorMsg, } - if s.knobs != nil && s.knobs.InsightsWriterStmtInterceptor != nil { - s.knobs.InsightsWriterStmtInterceptor(value.SessionID, &insight) - } else if s.insightsWriter != nil { + if s.insightsWriter != nil { s.insightsWriter.ObserveStatement(value.SessionID, &insight) } } @@ -338,9 +336,7 @@ func (s *StatsCollector) ObserveTransaction( LastErrorMsg: errorMsg, Status: status, } - if s.knobs != nil && s.knobs.InsightsWriterTxnInterceptor != nil { - s.knobs.InsightsWriterTxnInterceptor(ctx, value.SessionID, &insight) - } else if s.insightsWriter != nil { + if s.insightsWriter != nil { s.insightsWriter.ObserveTransaction(value.SessionID, &insight) } } diff --git a/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer_test.go b/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer_test.go index 0241065fd5c0..d16eca72f092 100644 --- a/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer_test.go +++ b/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer_test.go @@ -38,7 +38,7 @@ func TestRecordStatement(t *testing.T) { sqlstats.TxnStatsEnable.Override(ctx, &settings.SV, false) // Initialize knobs & mem container. numStmtInsights := 0 - knobs := &sqlstats.TestingKnobs{ + knobs := &insights.TestingKnobs{ InsightsWriterStmtInterceptor: func(sessionID clusterunique.ID, statement *insights.Statement) { numStmtInsights++ }, @@ -47,8 +47,8 @@ func TestRecordStatement(t *testing.T) { nil, /* uniqueServerCount */ testMonitor(ctx, "test-mon", settings), "test-app", - knobs, - insights.New(settings, insights.NewMetrics(), nil).Anomalies(), + nil, + insights.New(settings, insights.NewMetrics(), knobs).Anomalies(), ) // Record a statement, ensure no insights are generated. statsKey := appstatspb.StatementStatisticsKey{ @@ -72,8 +72,8 @@ func TestRecordTransaction(t *testing.T) { sqlstats.TxnStatsEnable.Override(ctx, &settings.SV, false) // Initialize knobs & mem container. numTxnInsights := 0 - knobs := &sqlstats.TestingKnobs{ - InsightsWriterTxnInterceptor: func(ctx context.Context, sessionID clusterunique.ID, transaction *insights.Transaction) { + knobs := &insights.TestingKnobs{ + InsightsWriterTxnInterceptor: func(sessionID clusterunique.ID, transaction *insights.Transaction) { numTxnInsights++ }, } @@ -81,8 +81,8 @@ func TestRecordTransaction(t *testing.T) { nil, /* uniqueServerCount */ testMonitor(ctx, "test-mon", settings), "test-app", - knobs, - insights.New(settings, insights.NewMetrics(), nil).Anomalies(), + nil, + insights.New(settings, insights.NewMetrics(), knobs).Anomalies(), ) // Record a transaction, ensure no insights are generated. require.NoError(t, memContainer.RecordTransaction(ctx, appstatspb.TransactionFingerprintID(123), sqlstats.RecordedTxnStats{})) diff --git a/pkg/sql/sqlstats/test_utils.go b/pkg/sql/sqlstats/test_utils.go index b5c23eddab74..7607cbc62d9f 100644 --- a/pkg/sql/sqlstats/test_utils.go +++ b/pkg/sql/sqlstats/test_utils.go @@ -10,13 +10,7 @@ package sqlstats -import ( - "context" - "time" - - "github.com/cockroachdb/cockroach/pkg/sql/clusterunique" - "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/insights" -) +import "time" // TestingKnobs provides hooks and knobs for unit tests. type TestingKnobs struct { @@ -28,16 +22,6 @@ type TestingKnobs struct { // finishes flushing. OnTxnStatsFlushFinished func() - // InsightsWriterTxnInterceptor is a callback that's triggered when a txn insight - // is observed when recording txn stats. The callback is called instead of the legitimate - // insights.Writer. - InsightsWriterTxnInterceptor func(ctx context.Context, sessionID clusterunique.ID, transaction *insights.Transaction) - - // InsightsWriterStmtInterceptor is a callback that's triggered when a stmt insight - // is observed when recording stmt stats. The callback is called instead of the legitimate - // insights.Writer. - InsightsWriterStmtInterceptor func(sessionID clusterunique.ID, statement *insights.Statement) - // OnCleanupStartForShard is a callback that is triggered when background // cleanup job starts to delete data from a shard from the system table. OnCleanupStartForShard func(shardIdx int, existingCountInShard, shardLimit int64)