Skip to content

Commit

Permalink
Merge #128378
Browse files Browse the repository at this point in the history
128378: crosscluster/logical: add job ID to application name r=rafiss a=stevendanna

The goal of this change is to make it a bit easier to identify the queries for a particular job on the SQL activity page.

Fixes #127746

Release note: None

Co-authored-by: Steven Danna <[email protected]>
  • Loading branch information
craig[bot] and stevendanna committed Aug 15, 2024
2 parents 4a293c6 + fd1b0b2 commit 67b2617
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ func newLogicalReplicationWriterProcessor(
} else {
rp, err = makeSQLProcessor(
ctx, flowCtx.Cfg.Settings, tableConfigs,
jobspb.JobID(spec.JobID),
// Initialize the executor with a fresh session data - this will
// avoid creating a new copy on each executor usage.
flowCtx.Cfg.DB.Executor(isql.WithSessionData(sql.NewInternalSessionData(ctx, flowCtx.Cfg.Settings, "" /* opName */))),
Expand Down
38 changes: 21 additions & 17 deletions pkg/ccl/crosscluster/logical/lww_row_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,14 @@ func makeSQLProcessor(
ctx context.Context,
settings *cluster.Settings,
tableConfigs map[descpb.ID]sqlProcessorTableConfig,
jobID jobspb.JobID,
ie isql.Executor,
) (*sqlRowProcessor, error) {
switch defaultSQLProcessor {
case lwwProcessor:
return makeSQLLastWriteWinsHandler(ctx, settings, tableConfigs, ie)
return makeSQLLastWriteWinsHandler(ctx, settings, tableConfigs, jobID, ie)
case udfApplierProcessor:
return makeUDFApplierProcessor(ctx, settings, tableConfigs, ie)
return makeUDFApplierProcessor(ctx, settings, tableConfigs, jobID, ie)
default:
return nil, errors.AssertionFailedf("unknown SQL processor: %s", defaultSQLProcessor)
}
Expand Down Expand Up @@ -351,8 +352,6 @@ var (
DisablePlanGists: true,
QualityOfService: &sessiondatapb.UserLowQoS,
}
// Have a separate override for each of the replicated queries.
ieOverrideOptimisticInsert, ieOverrideInsert, ieOverrideDelete sessiondata.InternalExecutorOverride
)

const (
Expand All @@ -362,21 +361,18 @@ const (
replicatedApplyUDFOpName = "replicated-apply-udf"
)

func getIEOverride(opName string) sessiondata.InternalExecutorOverride {
func getIEOverride(opName string, jobID jobspb.JobID) sessiondata.InternalExecutorOverride {
o := ieOverrideBase
// We want the ingestion queries to show up on the SQL Activity page
// alongside with the foreground traffic by default. We can achieve this
// by using the same naming scheme as AttributeToUser feature of the IE
// override (effectively, we opt out of using the "external" metrics for
// the ingestion queries).
o.ApplicationName = catconstants.AttributedToUserInternalAppNamePrefix + "-" + opName
o.ApplicationName = fmt.Sprintf("%s-%s-%d", catconstants.AttributedToUserInternalAppNamePrefix, opName, jobID)
return o
}

func init() {
ieOverrideOptimisticInsert = getIEOverride(replicatedOptimisticInsertOpName)
ieOverrideInsert = getIEOverride(replicatedInsertOpName)
ieOverrideDelete = getIEOverride(replicatedDeleteOpName)
}

var tryOptimisticInsertEnabled = settings.RegisterBoolSetting(
Expand All @@ -402,6 +398,7 @@ func makeSQLLastWriteWinsHandler(
ctx context.Context,
settings *cluster.Settings,
tableConfigs map[descpb.ID]sqlProcessorTableConfig,
jobID jobspb.JobID,
ie isql.Executor,
) (*sqlRowProcessor, error) {

Expand All @@ -414,7 +411,7 @@ func makeSQLLastWriteWinsHandler(

var fallbackQuerier querier
if needFallback {
fallbackQuerier = makeApplierQuerier(ctx, settings, tableConfigs, ie)
fallbackQuerier = makeApplierQuerier(ctx, settings, tableConfigs, jobID, ie)
}

qb := queryBuffer{
Expand All @@ -423,10 +420,13 @@ func makeSQLLastWriteWinsHandler(
}
return makeSQLProcessorFromQuerier(ctx, settings, tableConfigs, ie,
&lwwQuerier{
settings: settings,
queryBuffer: qb,
shouldUseFallback: shouldUseFallback,
fallbackQuerier: fallbackQuerier,
settings: settings,
queryBuffer: qb,
shouldUseFallback: shouldUseFallback,
fallbackQuerier: fallbackQuerier,
ieOverrideOptimisticInsert: getIEOverride(replicatedOptimisticInsertOpName, jobID),
ieOverrideInsert: getIEOverride(replicatedInsertOpName, jobID),
ieOverrideDelete: getIEOverride(replicatedDeleteOpName, jobID),
})
}

Expand All @@ -452,6 +452,10 @@ type lwwQuerier struct {

shouldUseFallback map[catid.DescID]bool
fallbackQuerier querier

ieOverrideOptimisticInsert sessiondata.InternalExecutorOverride
ieOverrideInsert sessiondata.InternalExecutorOverride
ieOverrideDelete sessiondata.InternalExecutorOverride
}

func (lww *lwwQuerier) AddTable(targetDescID int32, tc sqlProcessorTableConfig) error {
Expand Down Expand Up @@ -517,7 +521,7 @@ func (lww *lwwQuerier) InsertRow(
if err != nil {
return batchStats{}, err
}
if _, err = ie.ExecParsed(ctx, replicatedOptimisticInsertOpName, kvTxn, ieOverrideOptimisticInsert, stmt, datums...); err != nil {
if _, err = ie.ExecParsed(ctx, replicatedOptimisticInsertOpName, kvTxn, lww.ieOverrideOptimisticInsert, stmt, datums...); err != nil {
// If the optimistic insert failed with unique violation, we have to
// fall back to the pessimistic path. If we got a different error,
// then we bail completely.
Expand Down Expand Up @@ -545,7 +549,7 @@ func (lww *lwwQuerier) InsertRow(
if err != nil {
return batchStats{}, err
}
if _, err = ie.ExecParsed(ctx, replicatedInsertOpName, kvTxn, ieOverrideInsert, stmt, datums...); err != nil {
if _, err = ie.ExecParsed(ctx, replicatedInsertOpName, kvTxn, lww.ieOverrideInsert, stmt, datums...); err != nil {
log.Warningf(ctx, "replicated insert failed (query: %s): %s", stmt.SQL, err.Error())
return batchStats{}, err
}
Expand Down Expand Up @@ -573,7 +577,7 @@ func (lww *lwwQuerier) DeleteRow(
return batchStats{}, err
}

if _, err := ie.ExecParsed(ctx, replicatedDeleteOpName, kvTxn, ieOverrideDelete, stmt, datums...); err != nil {
if _, err := ie.ExecParsed(ctx, replicatedDeleteOpName, kvTxn, lww.ieOverrideDelete, stmt, datums...); err != nil {
log.Warningf(ctx, "replicated delete failed (query: %s): %s", stmt.SQL, err.Error())
return batchStats{}, err
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/ccl/crosscluster/logical/lww_row_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/replicationtestutils"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
Expand Down Expand Up @@ -84,7 +85,7 @@ func TestLWWInsertQueryGeneration(t *testing.T) {
dstDesc.GetID(): {
srcDesc: srcDesc,
},
}, s.InternalExecutor().(isql.Executor))
}, jobspb.JobID(1), s.InternalExecutor().(isql.Executor))
require.NoError(t, err)
return rp, func(datums ...interface{}) roachpb.KeyValue {
kv := replicationtestutils.EncodeKV(t, s.Codec(), srcDesc, datums...)
Expand Down Expand Up @@ -153,7 +154,7 @@ func BenchmarkLWWInsertBatch(b *testing.B) {
desc.GetID(): {
srcDesc: desc,
},
}, s.InternalDB().(isql.DB).Executor(isql.WithSessionData(sd)))
}, jobspb.JobID(1), s.InternalDB().(isql.DB).Executor(isql.WithSessionData(sd)))
require.NoError(b, err)

// In some configs, we'll be simulating processing the same INSERT over and
Expand Down
11 changes: 7 additions & 4 deletions pkg/ccl/crosscluster/logical/udf_row_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"strings"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
Expand Down Expand Up @@ -96,6 +97,7 @@ func makeApplierQuerier(
ctx context.Context,
settings *cluster.Settings,
tableConfigs map[descpb.ID]sqlProcessorTableConfig,
jobID jobspb.JobID,
ie isql.Executor,
) *applierQuerier {
return &applierQuerier{
Expand All @@ -105,19 +107,20 @@ func makeApplierQuerier(
applierQueries: make(map[catid.DescID]map[catid.FamilyID]queryBuilder, len(tableConfigs)),
},
settings: settings,
ieoInsert: getIEOverride(replicatedInsertOpName),
ieoDelete: getIEOverride(replicatedDeleteOpName),
ieoApplyUDF: getIEOverride(replicatedApplyUDFOpName),
ieoInsert: getIEOverride(replicatedInsertOpName, jobID),
ieoDelete: getIEOverride(replicatedDeleteOpName, jobID),
ieoApplyUDF: getIEOverride(replicatedApplyUDFOpName, jobID),
}
}

func makeUDFApplierProcessor(
ctx context.Context,
settings *cluster.Settings,
tableDescs map[descpb.ID]sqlProcessorTableConfig,
jobID jobspb.JobID,
ie isql.Executor,
) (*sqlRowProcessor, error) {
aq := makeApplierQuerier(ctx, settings, tableDescs, ie)
aq := makeApplierQuerier(ctx, settings, tableDescs, jobID, ie)
return makeSQLProcessorFromQuerier(ctx, settings, tableDescs, ie, aq)
}

Expand Down

0 comments on commit 67b2617

Please sign in to comment.