Skip to content

Commit

Permalink
Merge #128777
Browse files Browse the repository at this point in the history
128777: crosscluster/logical: replan with correct asOf time r=msbutler a=stevendanna

Previously the replanner re-used the original planning request. As a result, we were always replanning with the original replicated time. This replicated time stop working for replanning once it is outside of the gc window on the source.

Fixes #128776
Release note: None

Co-authored-by: Steven Danna <[email protected]>
  • Loading branch information
craig[bot] and stevendanna committed Aug 15, 2024
2 parents 575cdd4 + 8105594 commit 4a293c6
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 90 deletions.
2 changes: 2 additions & 0 deletions pkg/ccl/crosscluster/logical/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ go_test(
"//pkg/ccl/changefeedccl/changefeedbase",
"//pkg/ccl/crosscluster/replicationtestutils",
"//pkg/ccl/crosscluster/replicationutils",
"//pkg/ccl/crosscluster/streamclient",
"//pkg/ccl/crosscluster/streamclient/randclient",
"//pkg/ccl/storageccl",
"//pkg/jobs",
Expand All @@ -117,6 +118,7 @@ go_test(
"//pkg/roachpb",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/security/username",
"//pkg/server",
"//pkg/sql",
"//pkg/sql/catalog",
Expand Down
163 changes: 80 additions & 83 deletions pkg/ccl/crosscluster/logical/logical_replication_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (r *logicalReplicationResumer) ingest(

client, err := streamclient.GetFirstActiveClient(ctx,
append([]string{payload.SourceClusterConnStr}, progress.StreamAddresses...),
jobExecCtx.ExecCfg().InternalDB,
execCfg.InternalDB,
streamclient.WithStreamID(streampb.StreamID(streamID)),
streamclient.WithLogical(),
)
Expand All @@ -143,16 +143,8 @@ func (r *logicalReplicationResumer) ingest(
req.TableIDs = append(req.TableIDs, pair.SrcDescriptorID)
}

planner := makeLogicalReplicationPlanner(
req,
jobExecCtx,
client,
progress,
payload,
jobID,
replicatedTimeAtStart)

initialPlan, initialPlanCtx, planInfo, err := planner.generateInitialPlanWithInfo(ctx, distSQLPlanner)
planner := makeLogicalReplicationPlanner(jobExecCtx, r.job, client)
initialPlan, initialPlanCtx, planInfo, err := planner.generateInitialPlan(ctx, distSQLPlanner)
if err != nil {
return err
}
Expand All @@ -165,10 +157,27 @@ func (r *logicalReplicationResumer) ingest(
return err
}

// TODO(azhu): add a flag to avoid recreating dlq tables during replanning
dlqClient := InitDeadLetterQueueClient(execCfg.InternalDB.Executor(), planInfo.tableIDsToNames)
if err := dlqClient.Create(ctx); err != nil {
return errors.Wrap(err, "failed to create dead letter queue")
}

frontier, err := span.MakeFrontierAt(replicatedTimeAtStart, planInfo.sourceSpans...)
if err != nil {
return err
}

for _, resolvedSpan := range progress.Checkpoint.ResolvedSpans {
if _, err := frontier.Forward(resolvedSpan.Span, resolvedSpan.Timestamp); err != nil {
return err
}
}

replanOracle := sql.ReplanOnCustomFunc(
getNodes,
func() float64 {
return crosscluster.LogicalReplanThreshold.Get(jobExecCtx.ExecCfg().SV())
return crosscluster.LogicalReplanThreshold.Get(execCfg.SV())
},
)

Expand All @@ -177,7 +186,7 @@ func (r *logicalReplicationResumer) ingest(
planner.generatePlan,
jobExecCtx,
replanOracle,
func() time.Duration { return crosscluster.LogicalReplanFrequency.Get(jobExecCtx.ExecCfg().SV()) },
func() time.Duration { return crosscluster.LogicalReplanFrequency.Get(execCfg.SV()) },
)
metrics := execCfg.JobRegistry.MetricsStruct().JobSpecificMetrics[jobspb.TypeLogicalReplication].(*Metrics)

Expand Down Expand Up @@ -205,7 +214,7 @@ func (r *logicalReplicationResumer) ingest(
}
rh := rowHandler{
replicatedTimeAtStart: replicatedTimeAtStart,
frontier: planInfo.frontier,
frontier: frontier,
metrics: metrics,
settings: &execCfg.Settings.SV,
job: r.job,
Expand Down Expand Up @@ -277,66 +286,80 @@ func getNodes(plan *sql.PhysicalPlan) (src, dst map[string]struct{}, nodeCount i
// generated plan differs significantly from the initial plan, the entire
// distSQL flow is shut down and a new initial plan will be created.
type logicalReplicationPlanner struct {
req streampb.LogicalReplicationPlanRequest
jobExecCtx sql.JobExecContext
client streamclient.Client
progress *jobspb.LogicalReplicationProgress
payload jobspb.LogicalReplicationDetails
jobID jobspb.JobID
replicatedTimeAtStart hlc.Timestamp
job *jobs.Job
jobExecCtx sql.JobExecContext
client streamclient.Client
}

type logicalReplicationPlanInfo struct {
sourceSpans []roachpb.Span
streamAddress []string
tableIDsToNames map[int32]fullyQualifiedTableName
}

func makeLogicalReplicationPlanner(
req streampb.LogicalReplicationPlanRequest,
jobExecCtx sql.JobExecContext,
client streamclient.Client,
progress *jobspb.LogicalReplicationProgress,
payload jobspb.LogicalReplicationDetails,
jobID jobspb.JobID,
replicatedTimeAtStart hlc.Timestamp,
jobExecCtx sql.JobExecContext, job *jobs.Job, client streamclient.Client,
) logicalReplicationPlanner {

return logicalReplicationPlanner{
req: req,
jobExecCtx: jobExecCtx,
client: client,
progress: progress,
payload: payload,
jobID: jobID,
replicatedTimeAtStart: replicatedTimeAtStart,
job: job,
jobExecCtx: jobExecCtx,
client: client,
}
}

type planInfo struct {
frontier span.Frontier
streamAddress []string
}

// generateInitialPlan generates a plan along with the information required to
// initialize the job.
func (p *logicalReplicationPlanner) generateInitialPlanWithInfo(
func (p *logicalReplicationPlanner) generateInitialPlan(
ctx context.Context, dsp *sql.DistSQLPlanner,
) (*sql.PhysicalPlan, *sql.PlanningCtx, logicalReplicationPlanInfo, error) {
return p.generatePlanImpl(ctx, dsp)
}

func (p *logicalReplicationPlanner) generatePlan(
ctx context.Context, dsp *sql.DistSQLPlanner,
) (*sql.PhysicalPlan, *sql.PlanningCtx, error) {
plan, planCtx, _, err := p.generatePlanImpl(ctx, dsp)
return plan, planCtx, err
}

func (p *logicalReplicationPlanner) generatePlanImpl(
ctx context.Context, dsp *sql.DistSQLPlanner,
) (*sql.PhysicalPlan, *sql.PlanningCtx, planInfo, error) {
) (*sql.PhysicalPlan, *sql.PlanningCtx, logicalReplicationPlanInfo, error) {
var (
execCfg = p.jobExecCtx.ExecCfg()
evalCtx = p.jobExecCtx.ExtendedEvalContext()
info = planInfo{}
execCfg = p.jobExecCtx.ExecCfg()
evalCtx = p.jobExecCtx.ExtendedEvalContext()
progress = p.job.Progress().Details.(*jobspb.Progress_LogicalReplication).LogicalReplication
payload = p.job.Payload().Details.(*jobspb.Payload_LogicalReplicationDetails).LogicalReplicationDetails
info = logicalReplicationPlanInfo{
tableIDsToNames: make(map[int32]fullyQualifiedTableName),
}
)
asOf := progress.ReplicatedTime
if asOf.IsEmpty() {
asOf = payload.ReplicationStartTime
}
req := streampb.LogicalReplicationPlanRequest{
PlanAsOf: asOf,
}
for _, pair := range payload.ReplicationPairs {
req.TableIDs = append(req.TableIDs, pair.SrcDescriptorID)
}

plan, err := p.client.PlanLogicalReplication(ctx, p.req)
plan, err := p.client.PlanLogicalReplication(ctx, req)
if err != nil {
return nil, nil, info, err
}
info.sourceSpans = plan.SourceSpans
info.streamAddress = plan.Topology.StreamAddresses()

var defaultFnOID oid.Oid
if defaultFnID := p.payload.DefaultConflictResolution.FunctionId; defaultFnID != 0 {
if defaultFnID := payload.DefaultConflictResolution.FunctionId; defaultFnID != 0 {
defaultFnOID = catid.FuncIDToOID(catid.DescID(defaultFnID))
}

tableIDToName := make(map[int32]fullyQualifiedTableName)
tablesMd := make(map[int32]execinfrapb.TableReplicationMetadata)
if err := sql.DescsTxn(ctx, execCfg, func(ctx context.Context, txn isql.Txn, descriptors *descs.Collection) error {
for _, pair := range p.payload.ReplicationPairs {
for _, pair := range payload.ReplicationPairs {
srcTableDesc := plan.DescriptorMap[pair.SrcDescriptorID]

// Look up fully qualified destination table name
Expand Down Expand Up @@ -367,7 +390,7 @@ func (p *logicalReplicationPlanner) generateInitialPlanWithInfo(
DestinationTableName: dstTableDesc.GetName(),
DestinationFunctionOID: uint32(fnOID),
}
tableIDToName[pair.DstDescriptorID] = fullyQualifiedTableName{
info.tableIDsToNames[pair.DstDescriptorID] = fullyQualifiedTableName{
database: dbDesc.GetName(),
schema: scDesc.GetName(),
table: dstTableDesc.GetName(),
Expand All @@ -378,25 +401,6 @@ func (p *logicalReplicationPlanner) generateInitialPlanWithInfo(
return nil, nil, info, err
}

// TODO(azhu): add a flag to avoid recreating dlq tables during replanning
dlqClient := InitDeadLetterQueueClient(p.jobExecCtx.ExecCfg().InternalDB.Executor(), tableIDToName)
if err := dlqClient.Create(ctx); err != nil {
return nil, nil, info, errors.Wrap(err, "failed to create dead letter queue")
}

frontier, err := span.MakeFrontierAt(p.replicatedTimeAtStart, plan.SourceSpans...)
if err != nil {
return nil, nil, info, err
}
info.streamAddress = plan.Topology.StreamAddresses()
info.frontier = frontier

for _, resolvedSpan := range p.progress.Checkpoint.ResolvedSpans {
if _, err := frontier.Forward(resolvedSpan.Span, resolvedSpan.Timestamp); err != nil {
return nil, nil, info, err
}
}

planCtx, nodes, err := dsp.SetupAllNodesPlanning(ctx, evalCtx, execCfg)
if err != nil {
return nil, nil, info, err
Expand All @@ -407,15 +411,15 @@ func (p *logicalReplicationPlanner) generateInitialPlanWithInfo(
}

specs, err := constructLogicalReplicationWriterSpecs(ctx,
crosscluster.StreamAddress(p.payload.SourceClusterConnStr),
crosscluster.StreamAddress(payload.SourceClusterConnStr),
plan.Topology,
destNodeLocalities,
p.payload.ReplicationStartTime,
p.progress.ReplicatedTime,
p.progress.Checkpoint,
payload.ReplicationStartTime,
progress.ReplicatedTime,
progress.Checkpoint,
tablesMd,
p.jobID,
streampb.StreamID(p.payload.StreamID))
p.job.ID(),
streampb.StreamID(payload.StreamID))
if err != nil {
return nil, nil, info, err
}
Expand Down Expand Up @@ -456,13 +460,6 @@ func (p *logicalReplicationPlanner) generateInitialPlanWithInfo(
return physicalPlan, planCtx, info, nil
}

func (p *logicalReplicationPlanner) generatePlan(
ctx context.Context, dsp *sql.DistSQLPlanner,
) (*sql.PhysicalPlan, *sql.PlanningCtx, error) {
plan, planCtx, _, err := p.generateInitialPlanWithInfo(ctx, dsp)
return plan, planCtx, err
}

// rowHandler is responsible for handling checkpoints sent by logical
// replication writer processors.
type rowHandler struct {
Expand Down
80 changes: 80 additions & 0 deletions pkg/ccl/crosscluster/logical/logical_replication_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,21 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcevent"
"github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/replicationutils"
"github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/streamclient"
_ "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/streamclient/randclient"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/randgen"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -1184,3 +1188,79 @@ func TestLogicalStreamIngestionJobWithFallbackUDF(t *testing.T) {
dbA.CheckQueryResults(t, "SELECT * from a.tab", expectedRows)
dbB.CheckQueryResults(t, "SELECT * from b.tab", expectedRows)
}

func TestLogicalReplicationPlanner(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

srv := serverutils.StartServerOnly(t, base.TestServerArgs{})
defer srv.Stopper().Stop(ctx)
s := srv.ApplicationLayer()

execCfg := s.ExecutorConfig().(sql.ExecutorConfig)
registry := s.JobRegistry().(*jobs.Registry)

jobExecCtx, cleanup := sql.MakeJobExecContext(ctx, "test", username.RootUserName(), &sql.MemoryMetrics{}, &execCfg)
defer cleanup()

replicationStartTime := hlc.Timestamp{WallTime: 42}

var sj *jobs.StartableJob
id := registry.MakeJobID()
require.NoError(t, s.InternalDB().(isql.DB).Txn(ctx, func(
ctx context.Context, txn isql.Txn,
) (err error) {
return registry.CreateStartableJobWithTxn(ctx, &sj, id, txn, jobs.Record{
Username: username.RootUserName(),
Details: jobspb.LogicalReplicationDetails{
ReplicationStartTime: replicationStartTime,
},
Progress: jobspb.LogicalReplicationProgress{},
})
}))
asOfChan := make(chan hlc.Timestamp, 1)
client := &streamclient.MockStreamClient{
OnPlanLogicalReplication: func(req streampb.LogicalReplicationPlanRequest) (streamclient.LogicalReplicationPlan, error) {
asOfChan <- req.PlanAsOf
return streamclient.LogicalReplicationPlan{
Topology: streamclient.Topology{
Partitions: []streamclient.PartitionInfo{
{
ID: "1",
SubscriptionToken: streamclient.SubscriptionToken("1"),
Spans: []roachpb.Span{s.Codec().TenantSpan()},
},
},
},
}, nil
},
}
requireAsOf := func(expected hlc.Timestamp) {
select {
case actual := <-asOfChan:
require.Equal(t, expected, actual)
case <-time.After(testutils.SucceedsSoonDuration()):
}
}
planner := logicalReplicationPlanner{
job: sj.Job,
jobExecCtx: jobExecCtx,
client: client,
}
t.Run("generatePlan uses the replicationStartTime for planning if replication is unset", func(t *testing.T) {
_, _, _ = planner.generatePlan(ctx, jobExecCtx.DistSQLPlanner())
requireAsOf(replicationStartTime)
})
t.Run("generatePlan uses the latest replicated time for planning", func(t *testing.T) {
replicatedTime := hlc.Timestamp{WallTime: 142}
require.NoError(t, sj.Job.NoTxn().Update(ctx, func(txn isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
prog := md.Progress.Details.(*jobspb.Progress_LogicalReplication).LogicalReplication
prog.ReplicatedTime = replicatedTime
ju.UpdateProgress(md.Progress)
return nil
}))
_, _, _ = planner.generatePlan(ctx, jobExecCtx.DistSQLPlanner())
requireAsOf(replicatedTime)
})
}
Loading

0 comments on commit 4a293c6

Please sign in to comment.