diff --git a/pkg/ccl/crosscluster/logical/logical_replication_writer_processor.go b/pkg/ccl/crosscluster/logical/logical_replication_writer_processor.go index e6183820d7f6..3d8bba21365d 100644 --- a/pkg/ccl/crosscluster/logical/logical_replication_writer_processor.go +++ b/pkg/ccl/crosscluster/logical/logical_replication_writer_processor.go @@ -154,6 +154,7 @@ func newLogicalReplicationWriterProcessor( } else { rp, err = makeSQLProcessor( ctx, flowCtx.Cfg.Settings, tableConfigs, + jobspb.JobID(spec.JobID), // Initialize the executor with a fresh session data - this will // avoid creating a new copy on each executor usage. flowCtx.Cfg.DB.Executor(isql.WithSessionData(sql.NewInternalSessionData(ctx, flowCtx.Cfg.Settings, "" /* opName */))), diff --git a/pkg/ccl/crosscluster/logical/lww_row_processor.go b/pkg/ccl/crosscluster/logical/lww_row_processor.go index 1aecdb97e576..61f73588be24 100644 --- a/pkg/ccl/crosscluster/logical/lww_row_processor.go +++ b/pkg/ccl/crosscluster/logical/lww_row_processor.go @@ -209,13 +209,14 @@ func makeSQLProcessor( ctx context.Context, settings *cluster.Settings, tableConfigs map[descpb.ID]sqlProcessorTableConfig, + jobID jobspb.JobID, ie isql.Executor, ) (*sqlRowProcessor, error) { switch defaultSQLProcessor { case lwwProcessor: - return makeSQLLastWriteWinsHandler(ctx, settings, tableConfigs, ie) + return makeSQLLastWriteWinsHandler(ctx, settings, tableConfigs, jobID, ie) case udfApplierProcessor: - return makeUDFApplierProcessor(ctx, settings, tableConfigs, ie) + return makeUDFApplierProcessor(ctx, settings, tableConfigs, jobID, ie) default: return nil, errors.AssertionFailedf("unknown SQL processor: %s", defaultSQLProcessor) } @@ -351,8 +352,6 @@ var ( DisablePlanGists: true, QualityOfService: &sessiondatapb.UserLowQoS, } - // Have a separate override for each of the replicated queries. - ieOverrideOptimisticInsert, ieOverrideInsert, ieOverrideDelete sessiondata.InternalExecutorOverride ) const ( @@ -362,21 +361,18 @@ const ( replicatedApplyUDFOpName = "replicated-apply-udf" ) -func getIEOverride(opName string) sessiondata.InternalExecutorOverride { +func getIEOverride(opName string, jobID jobspb.JobID) sessiondata.InternalExecutorOverride { o := ieOverrideBase // We want the ingestion queries to show up on the SQL Activity page // alongside with the foreground traffic by default. We can achieve this // by using the same naming scheme as AttributeToUser feature of the IE // override (effectively, we opt out of using the "external" metrics for // the ingestion queries). - o.ApplicationName = catconstants.AttributedToUserInternalAppNamePrefix + "-" + opName + o.ApplicationName = fmt.Sprintf("%s-%s-%d", catconstants.AttributedToUserInternalAppNamePrefix, opName, jobID) return o } func init() { - ieOverrideOptimisticInsert = getIEOverride(replicatedOptimisticInsertOpName) - ieOverrideInsert = getIEOverride(replicatedInsertOpName) - ieOverrideDelete = getIEOverride(replicatedDeleteOpName) } var tryOptimisticInsertEnabled = settings.RegisterBoolSetting( @@ -402,6 +398,7 @@ func makeSQLLastWriteWinsHandler( ctx context.Context, settings *cluster.Settings, tableConfigs map[descpb.ID]sqlProcessorTableConfig, + jobID jobspb.JobID, ie isql.Executor, ) (*sqlRowProcessor, error) { @@ -414,7 +411,7 @@ func makeSQLLastWriteWinsHandler( var fallbackQuerier querier if needFallback { - fallbackQuerier = makeApplierQuerier(ctx, settings, tableConfigs, ie) + fallbackQuerier = makeApplierQuerier(ctx, settings, tableConfigs, jobID, ie) } qb := queryBuffer{ @@ -423,10 +420,13 @@ func makeSQLLastWriteWinsHandler( } return makeSQLProcessorFromQuerier(ctx, settings, tableConfigs, ie, &lwwQuerier{ - settings: settings, - queryBuffer: qb, - shouldUseFallback: shouldUseFallback, - fallbackQuerier: fallbackQuerier, + settings: settings, + queryBuffer: qb, + shouldUseFallback: shouldUseFallback, + fallbackQuerier: fallbackQuerier, + ieOverrideOptimisticInsert: getIEOverride(replicatedOptimisticInsertOpName, jobID), + ieOverrideInsert: getIEOverride(replicatedInsertOpName, jobID), + ieOverrideDelete: getIEOverride(replicatedDeleteOpName, jobID), }) } @@ -452,6 +452,10 @@ type lwwQuerier struct { shouldUseFallback map[catid.DescID]bool fallbackQuerier querier + + ieOverrideOptimisticInsert sessiondata.InternalExecutorOverride + ieOverrideInsert sessiondata.InternalExecutorOverride + ieOverrideDelete sessiondata.InternalExecutorOverride } func (lww *lwwQuerier) AddTable(targetDescID int32, tc sqlProcessorTableConfig) error { @@ -517,7 +521,7 @@ func (lww *lwwQuerier) InsertRow( if err != nil { return batchStats{}, err } - if _, err = ie.ExecParsed(ctx, replicatedOptimisticInsertOpName, kvTxn, ieOverrideOptimisticInsert, stmt, datums...); err != nil { + if _, err = ie.ExecParsed(ctx, replicatedOptimisticInsertOpName, kvTxn, lww.ieOverrideOptimisticInsert, stmt, datums...); err != nil { // If the optimistic insert failed with unique violation, we have to // fall back to the pessimistic path. If we got a different error, // then we bail completely. @@ -545,7 +549,7 @@ func (lww *lwwQuerier) InsertRow( if err != nil { return batchStats{}, err } - if _, err = ie.ExecParsed(ctx, replicatedInsertOpName, kvTxn, ieOverrideInsert, stmt, datums...); err != nil { + if _, err = ie.ExecParsed(ctx, replicatedInsertOpName, kvTxn, lww.ieOverrideInsert, stmt, datums...); err != nil { log.Warningf(ctx, "replicated insert failed (query: %s): %s", stmt.SQL, err.Error()) return batchStats{}, err } @@ -573,7 +577,7 @@ func (lww *lwwQuerier) DeleteRow( return batchStats{}, err } - if _, err := ie.ExecParsed(ctx, replicatedDeleteOpName, kvTxn, ieOverrideDelete, stmt, datums...); err != nil { + if _, err := ie.ExecParsed(ctx, replicatedDeleteOpName, kvTxn, lww.ieOverrideDelete, stmt, datums...); err != nil { log.Warningf(ctx, "replicated delete failed (query: %s): %s", stmt.SQL, err.Error()) return batchStats{}, err } diff --git a/pkg/ccl/crosscluster/logical/lww_row_processor_test.go b/pkg/ccl/crosscluster/logical/lww_row_processor_test.go index 72153ef68896..1a2465996bd6 100644 --- a/pkg/ccl/crosscluster/logical/lww_row_processor_test.go +++ b/pkg/ccl/crosscluster/logical/lww_row_processor_test.go @@ -15,6 +15,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/replicationtestutils" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" @@ -84,7 +85,7 @@ func TestLWWInsertQueryGeneration(t *testing.T) { dstDesc.GetID(): { srcDesc: srcDesc, }, - }, s.InternalExecutor().(isql.Executor)) + }, jobspb.JobID(1), s.InternalExecutor().(isql.Executor)) require.NoError(t, err) return rp, func(datums ...interface{}) roachpb.KeyValue { kv := replicationtestutils.EncodeKV(t, s.Codec(), srcDesc, datums...) @@ -153,7 +154,7 @@ func BenchmarkLWWInsertBatch(b *testing.B) { desc.GetID(): { srcDesc: desc, }, - }, s.InternalDB().(isql.DB).Executor(isql.WithSessionData(sd))) + }, jobspb.JobID(1), s.InternalDB().(isql.DB).Executor(isql.WithSessionData(sd))) require.NoError(b, err) // In some configs, we'll be simulating processing the same INSERT over and diff --git a/pkg/ccl/crosscluster/logical/udf_row_processor.go b/pkg/ccl/crosscluster/logical/udf_row_processor.go index 9b57e406ab9a..e9d777eba8c9 100644 --- a/pkg/ccl/crosscluster/logical/udf_row_processor.go +++ b/pkg/ccl/crosscluster/logical/udf_row_processor.go @@ -14,6 +14,7 @@ import ( "strings" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog" @@ -96,6 +97,7 @@ func makeApplierQuerier( ctx context.Context, settings *cluster.Settings, tableConfigs map[descpb.ID]sqlProcessorTableConfig, + jobID jobspb.JobID, ie isql.Executor, ) *applierQuerier { return &applierQuerier{ @@ -105,9 +107,9 @@ func makeApplierQuerier( applierQueries: make(map[catid.DescID]map[catid.FamilyID]queryBuilder, len(tableConfigs)), }, settings: settings, - ieoInsert: getIEOverride(replicatedInsertOpName), - ieoDelete: getIEOverride(replicatedDeleteOpName), - ieoApplyUDF: getIEOverride(replicatedApplyUDFOpName), + ieoInsert: getIEOverride(replicatedInsertOpName, jobID), + ieoDelete: getIEOverride(replicatedDeleteOpName, jobID), + ieoApplyUDF: getIEOverride(replicatedApplyUDFOpName, jobID), } } @@ -115,9 +117,10 @@ func makeUDFApplierProcessor( ctx context.Context, settings *cluster.Settings, tableDescs map[descpb.ID]sqlProcessorTableConfig, + jobID jobspb.JobID, ie isql.Executor, ) (*sqlRowProcessor, error) { - aq := makeApplierQuerier(ctx, settings, tableDescs, ie) + aq := makeApplierQuerier(ctx, settings, tableDescs, jobID, ie) return makeSQLProcessorFromQuerier(ctx, settings, tableDescs, ie, aq) }