diff --git a/internal/services/dispatcher/dispatcher.go b/internal/services/dispatcher/dispatcher.go index 29c0ece76..2128a3c22 100644 --- a/internal/services/dispatcher/dispatcher.go +++ b/internal/services/dispatcher/dispatcher.go @@ -15,6 +15,7 @@ import ( "github.com/hatchet-dev/hatchet/internal/datautils" "github.com/hatchet-dev/hatchet/internal/msgqueue" + "github.com/hatchet-dev/hatchet/internal/queueutils" "github.com/hatchet-dev/hatchet/internal/services/dispatcher/contracts" "github.com/hatchet-dev/hatchet/internal/services/shared/recoveryutils" "github.com/hatchet-dev/hatchet/internal/services/shared/tasktypes" @@ -358,8 +359,6 @@ func (d *DispatcherImpl) handleTask(ctx context.Context, task *msgqueue.Message) switch task.ID { case "group-key-action-assigned": err = d.a.WrapErr(d.handleGroupKeyActionAssignedTask(ctx, task), map[string]interface{}{}) - case "step-run-assigned": - err = d.a.WrapErr(d.handleStepRunAssignedTask(ctx, task), map[string]interface{}{}) case "step-run-assigned-bulk": err = d.a.WrapErr(d.handleStepRunBulkAssignedTask(ctx, task), map[string]interface{}{}) case "step-run-cancelled": @@ -438,118 +437,6 @@ func (d *DispatcherImpl) handleGroupKeyActionAssignedTask(ctx context.Context, t return multiErr } -func (d *DispatcherImpl) handleStepRunAssignedTask(ctx context.Context, task *msgqueue.Message) error { - ctx, span := telemetry.NewSpanWithCarrier(ctx, "step-run-assigned", task.OtelCarrier) - defer span.End() - - payload := tasktypes.StepRunAssignedTaskPayload{} - metadata := tasktypes.StepRunAssignedTaskMetadata{} - - err := d.dv.DecodeAndValidate(task.Payload, &payload) - - if err != nil { - return fmt.Errorf("could not decode dispatcher task payload: %w", err) - } - - err = d.dv.DecodeAndValidate(task.Metadata, &metadata) - - if err != nil { - return fmt.Errorf("could not decode dispatcher task metadata: %w", err) - } - - // load the step run from the database - stepRun, err := d.repo.StepRun().GetStepRunForEngine(ctx, metadata.TenantId, payload.StepRunId) - - if err != nil { - return fmt.Errorf("could not get step run: %w", err) - } - - // if the step run has a job run in a non-running state, we should not send it to the worker - if repository.IsFinalJobRunStatus(stepRun.JobRunStatus) { - d.l.Debug().Msgf("job run %s is in a final state %s, ignoring", sqlchelpers.UUIDToStr(stepRun.JobRunId), string(stepRun.JobRunStatus)) - - // release the semaphore - return d.repo.StepRun().ReleaseStepRunSemaphore(ctx, metadata.TenantId, payload.StepRunId, false) - } - - // if the step run is in a final state, we should not send it to the worker - if repository.IsFinalStepRunStatus(stepRun.SRStatus) { - d.l.Warn().Msgf("step run %s is in a final state %s, ignoring", payload.StepRunId, string(stepRun.SRStatus)) - - return d.repo.StepRun().ReleaseStepRunSemaphore(ctx, metadata.TenantId, payload.StepRunId, false) - } - - data, err := d.repo.StepRun().GetStepRunDataForEngine(ctx, metadata.TenantId, payload.StepRunId) - - if err != nil { - return fmt.Errorf("could not get step run data: %w", err) - } - - servertel.WithStepRunModel(span, stepRun) - - var multiErr error - var success bool - - // get the worker for this task - workers, err := d.workers.Get(payload.WorkerId) - - if err != nil && !errors.Is(err, ErrWorkerNotFound) { - return fmt.Errorf("could not get worker: %w", err) - } - - for i, w := range workers { - err = w.StartStepRun(ctx, metadata.TenantId, stepRun, data) - - if err != nil { - multiErr = multierror.Append(multiErr, fmt.Errorf("could not send step action to worker (%d): %w", i, err)) - } else { - success = true - break - } - } - - now := time.Now().UTC() - - if success { - defer d.repo.StepRun().DeferredStepRunEvent( - metadata.TenantId, - repository.CreateStepRunEventOpts{ - StepRunId: sqlchelpers.UUIDToStr(stepRun.SRID), - EventMessage: repository.StringPtr("Sent step run to the assigned worker"), - EventReason: repository.StepRunEventReasonPtr(dbsqlc.StepRunEventReasonSENTTOWORKER), - EventSeverity: repository.StepRunEventSeverityPtr(dbsqlc.StepRunEventSeverityINFO), - Timestamp: &now, - EventData: map[string]interface{}{"worker_id": payload.WorkerId}, - }, - ) - - return nil - } - - defer d.repo.StepRun().DeferredStepRunEvent( - metadata.TenantId, - repository.CreateStepRunEventOpts{ - StepRunId: sqlchelpers.UUIDToStr(stepRun.SRID), - EventMessage: repository.StringPtr("Could not send step run to assigned worker"), - EventReason: repository.StepRunEventReasonPtr(dbsqlc.StepRunEventReasonREASSIGNED), - EventSeverity: repository.StepRunEventSeverityPtr(dbsqlc.StepRunEventSeverityWARNING), - Timestamp: &now, - EventData: map[string]interface{}{"worker_id": payload.WorkerId}, - }, - ) - - // we were unable to send the step run to any worker, requeue the step run with an internal retry - _, err = d.repo.StepRun().QueueStepRun(ctx, metadata.TenantId, sqlchelpers.UUIDToStr(stepRun.SRID), &repository.QueueStepRunOpts{ - IsInternalRetry: true, - }) - - if err != nil && !errors.Is(err, repository.ErrAlreadyRunning) { - multiErr = multierror.Append(multiErr, fmt.Errorf("💥 could not requeue step run in dispatcher: %w", err)) - } - - return multiErr -} - func (d *DispatcherImpl) handleStepRunBulkAssignedTask(ctx context.Context, task *msgqueue.Message) error { ctx, span := telemetry.NewSpanWithCarrier(ctx, "step-run-assigned-bulk", task.OtelCarrier) defer span.End() @@ -611,28 +498,25 @@ func (d *DispatcherImpl) handleStepRunBulkAssignedTask(ctx context.Context, task innerEg := errgroup.Group{} + toRetry := []string{} + toRetryMu := sync.Mutex{} + for _, stepRunId := range stepRunIds { stepRunId := stepRunId innerEg.Go(func() error { stepRun := stepRunIdToData[stepRunId] - requeue := func() error { - // we were unable to send the step run to any worker, requeue the step run with an internal retry - _, err = d.repo.StepRun().QueueStepRun(ctx, metadata.TenantId, sqlchelpers.UUIDToStr(stepRun.SRID), &repository.QueueStepRunOpts{ - IsInternalRetry: true, - }) - - if err != nil && !errors.Is(err, repository.ErrAlreadyRunning) { - return fmt.Errorf("💥 could not requeue step run in dispatcher: %w", err) - } - - return nil + requeue := func() { + toRetryMu.Lock() + toRetry = append(toRetry, stepRunId) + toRetryMu.Unlock() } // if we've reached the context deadline, this should be requeued if ctx.Err() != nil { - return requeue() + requeue() + return nil } // if the step run has a job run in a non-running state, we should not send it to the worker @@ -657,6 +541,7 @@ func (d *DispatcherImpl) handleStepRunBulkAssignedTask(ctx context.Context, task err := w.StartStepRunFromBulk(ctx, metadata.TenantId, stepRun) if err != nil { + d.l.Err(err).Msgf("could not send step run to worker (%d)", i) multiErr = multierror.Append(multiErr, fmt.Errorf("could not send step action to worker (%d): %w", i, err)) } else { success = true @@ -694,15 +579,56 @@ func (d *DispatcherImpl) handleStepRunBulkAssignedTask(ctx context.Context, task }, ) - if err := requeue(); err != nil { - multiErr = multierror.Append(multiErr, err) - } + requeue() return multiErr }) } - return innerEg.Wait() + innerErr := innerEg.Wait() + + if len(toRetry) > 0 { + retryCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + _, stepRunsToFail, err := d.repo.StepRun().InternalRetryStepRuns(retryCtx, metadata.TenantId, toRetry) + + if err != nil { + innerErr = multierror.Append(innerErr, fmt.Errorf("could not requeue step runs: %w", err)) + } + + if len(stepRunsToFail) > 0 { + now := time.Now() + + batchErr := queueutils.BatchConcurrent(50, stepRunsToFail, func(stepRuns []*dbsqlc.GetStepRunForEngineRow) error { + var innerBatchErr error + + for _, stepRun := range stepRuns { + err := d.mq.AddMessage( + retryCtx, + msgqueue.JOB_PROCESSING_QUEUE, + tasktypes.StepRunFailedToTask( + stepRun, + "Could not send step run to worker", + &now, + ), + ) + + if err != nil { + innerBatchErr = multierror.Append(innerBatchErr, err) + } + } + + return innerBatchErr + }) + + if batchErr != nil { + innerErr = multierror.Append(innerErr, fmt.Errorf("could not fail step runs: %w", batchErr)) + } + } + } + + return innerErr }) } diff --git a/pkg/client/client.go b/pkg/client/client.go index d70e8e21b..866dde6e8 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -13,6 +13,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/keepalive" "google.golang.org/grpc/status" "github.com/hatchet-dev/hatchet/pkg/client/loader" @@ -255,8 +256,15 @@ func newFromOpts(opts *ClientOpts) (Client, error) { transportCreds = credentials.NewTLS(opts.tls) } + keepAliveParams := keepalive.ClientParameters{ + Time: 10 * time.Second, // grpc.keepalive_time_ms: 10 * 1000 + Timeout: 60 * time.Second, // grpc.keepalive_timeout_ms: 60 * 1000 + PermitWithoutStream: true, // grpc.keepalive_permit_without_calls: 1 + } + grpcOpts := []grpc.DialOption{ grpc.WithTransportCredentials(transportCreds), + grpc.WithKeepaliveParams(keepAliveParams), } if !opts.noGrpcRetry { diff --git a/pkg/repository/prisma/dbsqlc/step_runs.sql b/pkg/repository/prisma/dbsqlc/step_runs.sql index 7187521f3..ac50d681d 100644 --- a/pkg/repository/prisma/dbsqlc/step_runs.sql +++ b/pkg/repository/prisma/dbsqlc/step_runs.sql @@ -757,6 +757,111 @@ SELECT FROM step_runs_to_fail srs2; +-- name: InternalRetryStepRuns :many +WITH step_runs AS ( + SELECT + sr."id", + sr."tenantId", + sr."scheduleTimeoutAt", + sr."retryCount", + sr."internalRetryCount", + s."actionId", + s."id" AS "stepId", + s."timeout" AS "stepTimeout", + s."scheduleTimeout" AS "scheduleTimeout" + FROM + "StepRun" sr + JOIN + "Step" s ON sr."stepId" = s."id" + WHERE + sr."tenantId" = @tenantId::uuid + AND sr."id" = ANY(@stepRunIds::uuid[]) +), +step_runs_to_reassign AS ( + SELECT + * + FROM + step_runs + WHERE + "internalRetryCount" < @maxInternalRetryCount::int +), +step_runs_to_fail AS ( + SELECT + * + FROM + step_runs + WHERE + "internalRetryCount" >= @maxInternalRetryCount::int +), +deleted_sqis AS ( + DELETE FROM + "SemaphoreQueueItem" sqi + USING + step_runs srs + WHERE + sqi."stepRunId" = srs."id" +), +deleted_tqis AS ( + DELETE FROM + "TimeoutQueueItem" tqi + -- delete when step run id AND retry count tuples match + USING + step_runs srs + WHERE + tqi."stepRunId" = srs."id" + AND tqi."retryCount" = srs."retryCount" +), +inserted_queue_items AS ( + INSERT INTO "QueueItem" ( + "stepRunId", + "stepId", + "actionId", + "scheduleTimeoutAt", + "stepTimeout", + "priority", + "isQueued", + "tenantId", + "queue" + ) + SELECT + srs."id", + srs."stepId", + srs."actionId", + CURRENT_TIMESTAMP + COALESCE(convert_duration_to_interval(srs."scheduleTimeout"), INTERVAL '5 minutes'), + srs."stepTimeout", + -- Queue with priority 4 so that reassignment gets highest priority + 4, + true, + srs."tenantId", + srs."actionId" + FROM + step_runs_to_reassign srs +), +updated_step_runs AS ( + UPDATE "StepRun" sr + SET + "status" = 'PENDING_ASSIGNMENT', + "scheduleTimeoutAt" = CURRENT_TIMESTAMP + COALESCE(convert_duration_to_interval(srs."scheduleTimeout"), INTERVAL '5 minutes'), + "updatedAt" = CURRENT_TIMESTAMP, + "internalRetryCount" = sr."internalRetryCount" + 1 + FROM step_runs_to_reassign srs + WHERE sr."id" = srs."id" + RETURNING sr."id" +) +SELECT + srs1."id", + srs1."retryCount", + 'REASSIGNED' AS "operation" +FROM + step_runs_to_reassign srs1 +UNION ALL +SELECT + srs2."id", + srs2."retryCount", + 'FAILED' AS "operation" +FROM + step_runs_to_fail srs2; + -- name: ListStepRunsToTimeout :many SELECT "id" FROM "StepRun" diff --git a/pkg/repository/prisma/dbsqlc/step_runs.sql.go b/pkg/repository/prisma/dbsqlc/step_runs.sql.go index 6e9046432..0a144c177 100644 --- a/pkg/repository/prisma/dbsqlc/step_runs.sql.go +++ b/pkg/repository/prisma/dbsqlc/step_runs.sql.go @@ -1504,6 +1504,144 @@ func (q *Queries) HasActiveWorkersForActionId(ctx context.Context, db DBTX, arg return total, err } +const internalRetryStepRuns = `-- name: InternalRetryStepRuns :many +WITH step_runs AS ( + SELECT + sr."id", + sr."tenantId", + sr."scheduleTimeoutAt", + sr."retryCount", + sr."internalRetryCount", + s."actionId", + s."id" AS "stepId", + s."timeout" AS "stepTimeout", + s."scheduleTimeout" AS "scheduleTimeout" + FROM + "StepRun" sr + JOIN + "Step" s ON sr."stepId" = s."id" + WHERE + sr."tenantId" = $1::uuid + AND sr."id" = ANY($2::uuid[]) +), +step_runs_to_reassign AS ( + SELECT + id, "tenantId", "scheduleTimeoutAt", "retryCount", "internalRetryCount", "actionId", "stepId", "stepTimeout", "scheduleTimeout" + FROM + step_runs + WHERE + "internalRetryCount" < $3::int +), +step_runs_to_fail AS ( + SELECT + id, "tenantId", "scheduleTimeoutAt", "retryCount", "internalRetryCount", "actionId", "stepId", "stepTimeout", "scheduleTimeout" + FROM + step_runs + WHERE + "internalRetryCount" >= $3::int +), +deleted_sqis AS ( + DELETE FROM + "SemaphoreQueueItem" sqi + USING + step_runs srs + WHERE + sqi."stepRunId" = srs."id" +), +deleted_tqis AS ( + DELETE FROM + "TimeoutQueueItem" tqi + -- delete when step run id AND retry count tuples match + USING + step_runs srs + WHERE + tqi."stepRunId" = srs."id" + AND tqi."retryCount" = srs."retryCount" +), +inserted_queue_items AS ( + INSERT INTO "QueueItem" ( + "stepRunId", + "stepId", + "actionId", + "scheduleTimeoutAt", + "stepTimeout", + "priority", + "isQueued", + "tenantId", + "queue" + ) + SELECT + srs."id", + srs."stepId", + srs."actionId", + CURRENT_TIMESTAMP + COALESCE(convert_duration_to_interval(srs."scheduleTimeout"), INTERVAL '5 minutes'), + srs."stepTimeout", + -- Queue with priority 4 so that reassignment gets highest priority + 4, + true, + srs."tenantId", + srs."actionId" + FROM + step_runs_to_reassign srs +), +updated_step_runs AS ( + UPDATE "StepRun" sr + SET + "status" = 'PENDING_ASSIGNMENT', + "scheduleTimeoutAt" = CURRENT_TIMESTAMP + COALESCE(convert_duration_to_interval(srs."scheduleTimeout"), INTERVAL '5 minutes'), + "updatedAt" = CURRENT_TIMESTAMP, + "internalRetryCount" = sr."internalRetryCount" + 1 + FROM step_runs_to_reassign srs + WHERE sr."id" = srs."id" + RETURNING sr."id" +) +SELECT + srs1."id", + srs1."retryCount", + 'REASSIGNED' AS "operation" +FROM + step_runs_to_reassign srs1 +UNION ALL +SELECT + srs2."id", + srs2."retryCount", + 'FAILED' AS "operation" +FROM + step_runs_to_fail srs2 +` + +type InternalRetryStepRunsParams struct { + Tenantid pgtype.UUID `json:"tenantid"` + Steprunids []pgtype.UUID `json:"steprunids"` + Maxinternalretrycount int32 `json:"maxinternalretrycount"` +} + +type InternalRetryStepRunsRow struct { + ID pgtype.UUID `json:"id"` + RetryCount int32 `json:"retryCount"` + Operation string `json:"operation"` +} + +func (q *Queries) InternalRetryStepRuns(ctx context.Context, db DBTX, arg InternalRetryStepRunsParams) ([]*InternalRetryStepRunsRow, error) { + rows, err := db.Query(ctx, internalRetryStepRuns, arg.Tenantid, arg.Steprunids, arg.Maxinternalretrycount) + if err != nil { + return nil, err + } + defer rows.Close() + var items []*InternalRetryStepRunsRow + for rows.Next() { + var i InternalRetryStepRunsRow + if err := rows.Scan(&i.ID, &i.RetryCount, &i.Operation); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const listChildWorkflowRunIds = `-- name: ListChildWorkflowRunIds :many SELECT "id" diff --git a/pkg/repository/prisma/step_run.go b/pkg/repository/prisma/step_run.go index e46445695..e0d407511 100644 --- a/pkg/repository/prisma/step_run.go +++ b/pkg/repository/prisma/step_run.go @@ -502,6 +502,61 @@ func (s *stepRunEngineRepository) ListStepRunsToReassign(ctx context.Context, te return stepRunIdsStr, failedStepRunResults, nil } +func (s *stepRunEngineRepository) InternalRetryStepRuns(ctx context.Context, tenantId string, srIdsIn []string) ([]string, []*dbsqlc.GetStepRunForEngineRow, error) { + pgTenantId := sqlchelpers.UUIDFromStr(tenantId) + stepRuns := make([]pgtype.UUID, 0, len(srIdsIn)) + + for _, id := range srIdsIn { + stepRuns = append(stepRuns, sqlchelpers.UUIDFromStr(id)) + } + + tx, commit, rollback, err := sqlchelpers.PrepareTx(ctx, s.pool, s.l, 5000) + + if err != nil { + return nil, nil, err + } + + defer rollback() + + // get the step run and make sure it's still in pending + results, err := s.queries.InternalRetryStepRuns(ctx, tx, dbsqlc.InternalRetryStepRunsParams{ + Maxinternalretrycount: s.cf.MaxInternalRetryCount, + Tenantid: pgTenantId, + Steprunids: stepRuns, + }) + + if err != nil { + return nil, nil, err + } + + stepRunIdsStr := make([]string, 0, len(results)) + + failedStepRunIds := make([]pgtype.UUID, 0, len(results)) + + for _, sr := range results { + if sr.Operation == "FAILED" { + failedStepRunIds = append(failedStepRunIds, sr.ID) + } + } + + failedStepRunResults, err := s.queries.GetStepRunForEngine(ctx, tx, dbsqlc.GetStepRunForEngineParams{ + Ids: failedStepRunIds, + TenantId: pgTenantId, + }) + + if err != nil { + return nil, nil, err + } + + err = commit(ctx) + + if err != nil { + return nil, nil, err + } + + return stepRunIdsStr, failedStepRunResults, nil +} + func (s *stepRunEngineRepository) ListStepRunsToTimeout(ctx context.Context, tenantId string) (bool, []*dbsqlc.GetStepRunForEngineRow, error) { pgTenantId := sqlchelpers.UUIDFromStr(tenantId) diff --git a/pkg/repository/step_run.go b/pkg/repository/step_run.go index 1dd29f4ea..6e5af7aa8 100644 --- a/pkg/repository/step_run.go +++ b/pkg/repository/step_run.go @@ -172,6 +172,8 @@ type StepRunEngineRepository interface { // ListStepRunsToReassign returns a list of step runs which are in a reassignable state. ListStepRunsToReassign(ctx context.Context, tenantId string) (reassignedStepRunIds []string, failedStepRuns []*dbsqlc.GetStepRunForEngineRow, err error) + InternalRetryStepRuns(ctx context.Context, tenantId string, srIdsIn []string) (reassignedStepRunIds []string, failedStepRuns []*dbsqlc.GetStepRunForEngineRow, err error) + ListStepRunsToTimeout(ctx context.Context, tenantId string) (bool, []*dbsqlc.GetStepRunForEngineRow, error) StepRunAcked(ctx context.Context, tenantId, workflowRunId, stepRunId string, ackedAt time.Time) error