diff --git a/.changeset/giant-mice-cheer.md b/.changeset/giant-mice-cheer.md new file mode 100644 index 0000000000..9aeb358bee --- /dev/null +++ b/.changeset/giant-mice-cheer.md @@ -0,0 +1,5 @@ +--- +"trigger.dev": patch +--- + +Increase the number of active streams from 2 to 5, total streams from 5 to 10 diff --git a/.changeset/slow-deers-collect.md b/.changeset/slow-deers-collect.md new file mode 100644 index 0000000000..ca84278ff7 --- /dev/null +++ b/.changeset/slow-deers-collect.md @@ -0,0 +1,8 @@ +--- +"@trigger.dev/react-hooks": patch +"@trigger.dev/sdk": patch +"trigger.dev": patch +"@trigger.dev/core": patch +--- + +Adding ability to update parent run metadata from child runs/tasks diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 99c810c76f..f7ab831b50 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -238,13 +238,14 @@ const EnvironmentSchema = z.object({ TASK_PAYLOAD_OFFLOAD_THRESHOLD: z.coerce.number().int().default(524_288), // 512KB TASK_PAYLOAD_MAXIMUM_SIZE: z.coerce.number().int().default(3_145_728), // 3MB BATCH_TASK_PAYLOAD_MAXIMUM_SIZE: z.coerce.number().int().default(1_000_000), // 1MB - TASK_RUN_METADATA_MAXIMUM_SIZE: z.coerce.number().int().default(4_096), // 4KB + TASK_RUN_METADATA_MAXIMUM_SIZE: z.coerce.number().int().default(262_144), // 256KB MAXIMUM_DEV_QUEUE_SIZE: z.coerce.number().int().optional(), MAXIMUM_DEPLOYED_QUEUE_SIZE: z.coerce.number().int().optional(), MAX_BATCH_V2_TRIGGER_ITEMS: z.coerce.number().int().default(500), REALTIME_STREAM_VERSION: z.enum(["v1", "v2"]).default("v1"), + BATCH_METADATA_OPERATIONS_FLUSH_INTERVAL_MS: z.coerce.number().int().default(1000), }); export type Environment = z.infer; diff --git a/apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts b/apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts index 0bf4c932b8..1ebf43bc3d 100644 --- a/apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts +++ b/apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts @@ -1,94 +1,29 @@ -import { type ActionFunctionArgs, json } from "@remix-run/server-runtime"; -import { parsePacket, UpdateMetadataRequestBody } from "@trigger.dev/core/v3"; +import { json } from "@remix-run/server-runtime"; +import { UpdateMetadataRequestBody } from "@trigger.dev/core/v3"; import { z } from "zod"; -import { prisma } from "~/db.server"; -import { authenticateApiRequest } from "~/services/apiAuth.server"; -import { handleMetadataPacket } from "~/utils/packets"; -import { ServiceValidationError } from "~/v3/services/baseService.server"; -import { isFinalRunStatus } from "~/v3/taskStatus"; +import { updateMetadataService } from "~/services/metadata/updateMetadata.server"; +import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; const ParamsSchema = z.object({ runId: z.string(), }); -export async function action({ request, params }: ActionFunctionArgs) { - // Ensure this is a PUT request - if (request.method.toUpperCase() !== "PUT") { - return json({ error: "Method not allowed" }, { status: 405, headers: { Allow: "PUT" } }); - } - - // Authenticate the request - const authenticationResult = await authenticateApiRequest(request); - if (!authenticationResult) { - return json({ error: "Invalid or Missing API Key" }, { status: 401 }); - } - - const parsedParams = ParamsSchema.safeParse(params); - if (!parsedParams.success) { - return json( - { error: "Invalid request parameters", issues: parsedParams.error.issues }, - { status: 400 } - ); - } - - try { - const anyBody = await request.json(); - - const body = UpdateMetadataRequestBody.safeParse(anyBody); - - if (!body.success) { - return json({ error: "Invalid request body", issues: body.error.issues }, { status: 400 }); - } - - const metadataPacket = handleMetadataPacket( - body.data.metadata, - body.data.metadataType ?? "application/json" - ); - - if (!metadataPacket) { - return json({ error: "Invalid metadata" }, { status: 400 }); - } - - const taskRun = await prisma.taskRun.findFirst({ - where: { - friendlyId: parsedParams.data.runId, - runtimeEnvironmentId: authenticationResult.environment.id, - }, - select: { - status: true, - }, - }); - - if (!taskRun) { +const { action } = createActionApiRoute( + { + params: ParamsSchema, + body: UpdateMetadataRequestBody, + maxContentLength: 1024 * 1024, // 1MB + method: "PUT", + }, + async ({ authentication, body, params }) => { + const result = await updateMetadataService.call(authentication.environment, params.runId, body); + + if (!result) { return json({ error: "Task Run not found" }, { status: 404 }); } - if (isFinalRunStatus(taskRun.status)) { - return json({ error: "Cannot update metadata for a completed run" }, { status: 400 }); - } - - await prisma.taskRun.update({ - where: { - friendlyId: parsedParams.data.runId, - runtimeEnvironmentId: authenticationResult.environment.id, - }, - data: { - metadata: metadataPacket?.data, - metadataType: metadataPacket?.dataType, - }, - }); - - const parsedPacket = await parsePacket(metadataPacket); - - return json({ metadata: parsedPacket }, { status: 200 }); - } catch (error) { - if (error instanceof ServiceValidationError) { - return json({ error: error.message }, { status: error.status ?? 422 }); - } else { - return json( - { error: error instanceof Error ? error.message : "Internal Server Error" }, - { status: 500 } - ); - } + return json(result, { status: 200 }); } -} +); + +export { action }; diff --git a/apps/webapp/app/routes/api.v1.tasks.batch.ts b/apps/webapp/app/routes/api.v1.tasks.batch.ts index 5b1b89d2d9..47f603bcc6 100644 --- a/apps/webapp/app/routes/api.v1.tasks.batch.ts +++ b/apps/webapp/app/routes/api.v1.tasks.batch.ts @@ -1,23 +1,21 @@ import { json } from "@remix-run/server-runtime"; import { - BatchTriggerTaskResponse, BatchTriggerTaskV2RequestBody, BatchTriggerTaskV2Response, generateJWT, } from "@trigger.dev/core/v3"; import { env } from "~/env.server"; +import { AuthenticatedEnvironment, getOneTimeUseToken } from "~/services/apiAuth.server"; +import { logger } from "~/services/logger.server"; import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; -import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger"; import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server"; +import { ServiceValidationError } from "~/v3/services/baseService.server"; import { BatchProcessingStrategy, BatchTriggerV2Service, } from "~/v3/services/batchTriggerV2.server"; -import { ServiceValidationError } from "~/v3/services/baseService.server"; import { OutOfEntitlementError } from "~/v3/services/triggerTask.server"; -import { AuthenticatedEnvironment, getOneTimeUseToken } from "~/services/apiAuth.server"; -import { logger } from "~/services/logger.server"; -import { z } from "zod"; +import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger"; const { action, loader } = createActionApiRoute( { diff --git a/apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts b/apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts index 99e9cdb8d7..e648225c55 100644 --- a/apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts +++ b/apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts @@ -2,7 +2,6 @@ import { ActionFunctionArgs } from "@remix-run/server-runtime"; import { z } from "zod"; import { $replica } from "~/db.server"; import { relayRealtimeStreams } from "~/services/realtime/relayRealtimeStreams.server"; -import { v1RealtimeStreams } from "~/services/realtime/v1StreamsGlobal.server"; import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server"; const ParamsSchema = z.object({ diff --git a/apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts b/apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts new file mode 100644 index 0000000000..1735c556e1 --- /dev/null +++ b/apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts @@ -0,0 +1,61 @@ +import { z } from "zod"; +import { $replica } from "~/db.server"; +import { relayRealtimeStreams } from "~/services/realtime/relayRealtimeStreams.server"; +import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; + +const ParamsSchema = z.object({ + runId: z.string(), + target: z.enum(["self", "parent", "root"]), + streamId: z.string(), +}); + +const { action } = createActionApiRoute( + { + params: ParamsSchema, + }, + async ({ request, params, authentication }) => { + if (!request.body) { + return new Response("No body provided", { status: 400 }); + } + + const run = await $replica.taskRun.findFirst({ + where: { + friendlyId: params.runId, + runtimeEnvironmentId: authentication.environment.id, + }, + select: { + id: true, + friendlyId: true, + parentTaskRun: { + select: { + friendlyId: true, + }, + }, + rootTaskRun: { + select: { + friendlyId: true, + }, + }, + }, + }); + + if (!run) { + return new Response("Run not found", { status: 404 }); + } + + const targetId = + params.target === "self" + ? run.friendlyId + : params.target === "parent" + ? run.parentTaskRun?.friendlyId + : run.rootTaskRun?.friendlyId; + + if (!targetId) { + return new Response("Target not found", { status: 404 }); + } + + return relayRealtimeStreams.ingestData(request.body, targetId, params.streamId); + } +); + +export { action }; diff --git a/apps/webapp/app/services/metadata/updateMetadata.server.ts b/apps/webapp/app/services/metadata/updateMetadata.server.ts new file mode 100644 index 0000000000..ea6aa36af8 --- /dev/null +++ b/apps/webapp/app/services/metadata/updateMetadata.server.ts @@ -0,0 +1,414 @@ +import { + applyMetadataOperations, + IOPacket, + parsePacket, + RunMetadataChangeOperation, + UpdateMetadataRequestBody, +} from "@trigger.dev/core/v3"; +import { prisma, PrismaClientOrTransaction } from "~/db.server"; +import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; +import { handleMetadataPacket } from "~/utils/packets"; +import { BaseService, ServiceValidationError } from "~/v3/services/baseService.server"; +import { isFinalRunStatus } from "~/v3/taskStatus"; + +import { Effect, Schedule, Duration } from "effect"; +import { type RuntimeFiber } from "effect/Fiber"; +import { logger } from "../logger.server"; +import { singleton } from "~/utils/singleton"; +import { env } from "~/env.server"; +import { setTimeout } from "timers/promises"; + +type BufferedRunMetadataChangeOperation = { + runId: string; + timestamp: number; + operation: RunMetadataChangeOperation; +}; + +export class UpdateMetadataService extends BaseService { + private _bufferedOperations: Map = new Map(); + private _flushFiber: RuntimeFiber | null = null; + + constructor( + protected readonly _prisma: PrismaClientOrTransaction = prisma, + private readonly flushIntervalMs: number = 5000 + ) { + super(); + + this._startFlushing(); + } + + // Start a loop that periodically flushes buffered operations + private _startFlushing() { + // Create a program that sleeps, then processes buffered ops + const program = Effect.gen(this, function* (_) { + while (true) { + // Wait for flushIntervalMs before flushing again + yield* _(Effect.sleep(Duration.millis(this.flushIntervalMs))); + + // Atomically get and clear current operations + const currentOperations = new Map(this._bufferedOperations); + this._bufferedOperations.clear(); + + yield* Effect.sync(() => { + logger.debug(`[UpdateMetadataService] Flushing operations`, { + operations: Object.fromEntries(currentOperations), + }); + }); + + // If we have operations, process them + if (currentOperations.size > 0) { + yield* _(this._processBufferedOperations(currentOperations)); + } + } + }).pipe( + // Handle any unexpected errors, ensuring program does not fail + Effect.catchAll((error) => + Effect.sync(() => { + logger.error("Error in flushing program:", { error }); + }) + ) + ); + + // Fork the program so it runs in the background + this._flushFiber = Effect.runFork(program as Effect.Effect); + } + + private _processBufferedOperations = ( + operations: Map + ) => { + return Effect.gen(this, function* (_) { + for (const [runId, ops] of operations) { + // Process and cull operations + const processedOps = this._cullOperations(ops); + + // If there are no operations to process, skip + if (processedOps.length === 0) { + continue; + } + + yield* Effect.sync(() => { + logger.debug(`[UpdateMetadataService] Processing operations for run`, { + runId, + operationsCount: processedOps.length, + }); + }); + + // Update run with retry + yield* _( + this._updateRunWithOperations(runId, processedOps).pipe( + Effect.retry(Schedule.exponential(Duration.millis(100), 1.4)), + Effect.catchAll((error) => + Effect.sync(() => { + // On complete failure, return ops to buffer + const existingOps = this._bufferedOperations.get(runId) ?? []; + this._bufferedOperations.set(runId, [...existingOps, ...ops]); + console.error(`Failed to process run ${runId}:`, error); + }) + ) + ) + ); + } + }); + }; + + private _updateRunWithOperations = ( + runId: string, + operations: BufferedRunMetadataChangeOperation[] + ) => { + return Effect.gen(this, function* (_) { + // Fetch current run + const run = yield* _( + Effect.tryPromise(() => + this._prisma.taskRun.findFirst({ + where: { id: runId }, + select: { id: true, metadata: true, metadataType: true, metadataVersion: true }, + }) + ) + ); + + if (!run) { + return yield* _(Effect.fail(new Error(`Run ${runId} not found`))); + } + + const metadata = yield* _( + Effect.tryPromise(() => + run.metadata + ? parsePacket({ data: run.metadata, dataType: run.metadataType }) + : Promise.resolve({}) + ) + ); + + // Apply operations and update + const applyResult = applyMetadataOperations( + metadata, + operations.map((op) => op.operation) + ); + + if (applyResult.unappliedOperations.length === operations.length) { + logger.warn(`No operations applied for run ${runId}`); + // If no operations were applied, return + return; + } + + // Stringify the metadata + const newMetadataPacket = yield* _( + Effect.try(() => handleMetadataPacket(applyResult.newMetadata, run.metadataType)) + ); + + if (!newMetadataPacket) { + // Log and skip if metadata is invalid + logger.warn(`Invalid metadata after operations, skipping update`); + return; + } + + const result = yield* _( + Effect.tryPromise(() => + this._prisma.taskRun.updateMany({ + where: { + id: runId, + metadataVersion: run.metadataVersion, + }, + data: { + metadata: newMetadataPacket.data, + metadataVersion: { increment: 1 }, + }, + }) + ) + ); + + if (result.count === 0) { + yield* Effect.sync(() => { + logger.warn(`Optimistic lock failed for run ${runId}`, { + metadataVersion: run.metadataVersion, + }); + }); + + return yield* _(Effect.fail(new Error("Optimistic lock failed"))); + } + + return result; + }); + }; + + private _cullOperations( + operations: BufferedRunMetadataChangeOperation[] + ): BufferedRunMetadataChangeOperation[] { + // Sort by timestamp + const sortedOps = [...operations].sort((a, b) => a.timestamp - b.timestamp); + + // Track latest set operations by key + const latestSetOps = new Map(); + const resultOps: BufferedRunMetadataChangeOperation[] = []; + + for (const op of sortedOps) { + if (op.operation.type === "set") { + latestSetOps.set(op.operation.key, op); + } else { + resultOps.push(op); + } + } + + // Add winning set operations + resultOps.push(...latestSetOps.values()); + + return resultOps; + } + + public async call( + environment: AuthenticatedEnvironment, + runId: string, + body: UpdateMetadataRequestBody + ) { + const runIdType = runId.startsWith("run_") ? "friendly" : "internal"; + + const taskRun = await this._prisma.taskRun.findFirst({ + where: { + runtimeEnvironmentId: environment.id, + ...(runIdType === "internal" ? { id: runId } : { friendlyId: runId }), + }, + select: { + id: true, + status: true, + metadata: true, + metadataType: true, + metadataVersion: true, + parentTaskRun: { + select: { + id: true, + status: true, + }, + }, + rootTaskRun: { + select: { + id: true, + status: true, + }, + }, + }, + }); + + if (!taskRun) { + return; + } + + if (isFinalRunStatus(taskRun.status)) { + throw new ServiceValidationError("Cannot update metadata for a completed run"); + } + + if (body.parentOperations && body.parentOperations.length > 0 && taskRun.parentTaskRun) { + this.#ingestRunOperations(taskRun.parentTaskRun.id, body.parentOperations); + } + + if (body.rootOperations && body.rootOperations.length > 0 && taskRun.rootTaskRun) { + this.#ingestRunOperations(taskRun.rootTaskRun.id, body.rootOperations); + } + + const newMetadata = await this.#updateRunMetadata({ + runId: taskRun.id, + body, + existingMetadata: { + data: taskRun.metadata ?? undefined, + dataType: taskRun.metadataType, + }, + }); + + return { + metadata: newMetadata, + }; + } + + async #updateRunMetadata({ + runId, + body, + existingMetadata, + }: { + runId: string; + body: UpdateMetadataRequestBody; + existingMetadata: IOPacket; + }) { + if (Array.isArray(body.operations)) { + return this.#updateRunMetadataWithOperations(runId, body.operations); + } else { + return this.#updateRunMetadataDirectly(runId, body, existingMetadata); + } + } + + async #updateRunMetadataWithOperations(runId: string, operations: RunMetadataChangeOperation[]) { + const MAX_RETRIES = 3; + let attempts = 0; + + while (attempts <= MAX_RETRIES) { + // Fetch the latest run data + const run = await this._prisma.taskRun.findFirst({ + where: { id: runId }, + select: { metadata: true, metadataType: true, metadataVersion: true }, + }); + + if (!run) { + throw new Error(`Run ${runId} not found`); + } + + // Parse the current metadata + const currentMetadata = await (run.metadata + ? parsePacket({ data: run.metadata, dataType: run.metadataType }) + : Promise.resolve({})); + + // Apply operations to the current metadata + const applyResults = applyMetadataOperations(currentMetadata, operations); + + // If no operations were applied, return the current metadata + if (applyResults.unappliedOperations.length === operations.length) { + return currentMetadata; + } + + // Update with optimistic locking + const result = await this._prisma.taskRun.updateMany({ + where: { + id: runId, + metadataVersion: run.metadataVersion, + }, + data: { + metadata: JSON.stringify(applyResults.newMetadata), + metadataType: run.metadataType, + metadataVersion: { + increment: 1, + }, + }, + }); + + if (result.count === 0) { + // If this was our last attempt, buffer the operations and return optimistically + if (attempts === MAX_RETRIES) { + this.#ingestRunOperations(runId, operations); + return applyResults.newMetadata; + } + + // Otherwise sleep and try again + await setTimeout(100 * Math.pow(1.4, attempts)); + attempts++; + continue; + } + + // Success! Return the new metadata + return applyResults.newMetadata; + } + } + + async #updateRunMetadataDirectly( + runId: string, + body: UpdateMetadataRequestBody, + existingMetadata: IOPacket + ) { + const metadataPacket = handleMetadataPacket(body.metadata, "application/json"); + + if (!metadataPacket) { + throw new ServiceValidationError("Invalid metadata"); + } + + if ( + metadataPacket.data !== "{}" || + (existingMetadata.data && metadataPacket.data !== existingMetadata.data) + ) { + logger.debug(`Updating metadata directly for run`, { + metadata: metadataPacket.data, + runId, + }); + + // Update the metadata without version check + await this._prisma.taskRun.update({ + where: { + id: runId, + }, + data: { + metadata: metadataPacket?.data, + metadataType: metadataPacket?.dataType, + metadataVersion: { + increment: 1, + }, + }, + }); + } + + const newMetadata = await parsePacket(metadataPacket); + return newMetadata; + } + + #ingestRunOperations(runId: string, operations: RunMetadataChangeOperation[]) { + const bufferedOperations: BufferedRunMetadataChangeOperation[] = operations.map((operation) => { + return { + runId, + timestamp: Date.now(), + operation, + }; + }); + + const existingBufferedOperations = this._bufferedOperations.get(runId) ?? []; + + this._bufferedOperations.set(runId, [...existingBufferedOperations, ...bufferedOperations]); + } +} + +export const updateMetadataService = singleton( + "update-metadata-service", + () => new UpdateMetadataService(prisma, env.BATCH_METADATA_OPERATIONS_FLUSH_INTERVAL_MS) +); diff --git a/apps/webapp/app/services/routeBuilders/apiBuilder.server.ts b/apps/webapp/app/services/routeBuilders/apiBuilder.server.ts index dc35a0cd24..e505994034 100644 --- a/apps/webapp/app/services/routeBuilders/apiBuilder.server.ts +++ b/apps/webapp/app/services/routeBuilders/apiBuilder.server.ts @@ -384,6 +384,7 @@ type ApiKeyActionRouteBuilderOptions< headers?: THeadersSchema; allowJWT?: boolean; corsStrategy?: "all" | "none"; + method?: "POST" | "PUT" | "DELETE" | "PATCH"; authorization?: { action: AuthorizationAction; resource: ( @@ -455,6 +456,19 @@ export function createActionApiRoute< } async function action({ request, params }: ActionFunctionArgs) { + if (options.method) { + if (request.method.toUpperCase() !== options.method) { + return await wrapResponse( + request, + json( + { error: "Method not allowed" }, + { status: 405, headers: { Allow: options.method } } + ), + corsStrategy !== "none" + ); + } + } + try { const authenticationResult = await authenticateApiRequestWithFailure(request, { allowJWT }); @@ -613,6 +627,19 @@ export function createActionApiRoute< if (error instanceof Response) { return await wrapResponse(request, error, corsStrategy !== "none"); } + + logger.error("Error in action", { + error: + error instanceof Error + ? { + name: error.name, + message: error.message, + stack: error.stack, + } + : String(error), + url: request.url, + }); + return await wrapResponse( request, json({ error: "Internal Server Error" }, { status: 500 }), diff --git a/apps/webapp/app/v3/services/completeAttempt.server.ts b/apps/webapp/app/v3/services/completeAttempt.server.ts index b044a4d291..502fc71eea 100644 --- a/apps/webapp/app/v3/services/completeAttempt.server.ts +++ b/apps/webapp/app/v3/services/completeAttempt.server.ts @@ -12,22 +12,23 @@ import { shouldRetryError, taskRunErrorEnhancer, } from "@trigger.dev/core/v3"; -import { $transaction, PrismaClientOrTransaction } from "~/db.server"; +import { TaskRun } from "@trigger.dev/database"; +import { MAX_TASK_RUN_ATTEMPTS } from "~/consts"; +import { PrismaClientOrTransaction } from "~/db.server"; +import { env } from "~/env.server"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; import { safeJsonParse } from "~/utils/json"; -import { createExceptionPropertiesFromError, eventRepository } from "../eventRepository.server"; import { marqs } from "~/v3/marqs/index.server"; +import { createExceptionPropertiesFromError, eventRepository } from "../eventRepository.server"; +import { FailedTaskRunRetryHelper } from "../failedTaskRun.server"; +import { FAILED_RUN_STATUSES, isFinalAttemptStatus, isFinalRunStatus } from "../taskStatus"; import { BaseService } from "./baseService.server"; import { CancelAttemptService } from "./cancelAttempt.server"; -import { MAX_TASK_RUN_ATTEMPTS } from "~/consts"; import { CreateCheckpointService } from "./createCheckpoint.server"; -import { TaskRun } from "@trigger.dev/database"; -import { RetryAttemptService } from "./retryAttempt.server"; -import { FAILED_RUN_STATUSES, isFinalAttemptStatus, isFinalRunStatus } from "../taskStatus"; import { FinalizeTaskRunService } from "./finalizeTaskRun.server"; -import { env } from "~/env.server"; -import { FailedTaskRunRetryHelper } from "../failedTaskRun.server"; +import { RetryAttemptService } from "./retryAttempt.server"; +import { updateMetadataService } from "~/services/metadata/updateMetadata.server"; type FoundAttempt = Awaited>; @@ -94,6 +95,8 @@ export class CompleteAttemptService extends BaseService { code: TaskRunErrorCodes.TASK_EXECUTION_FAILED, message: "Tried to complete attempt but it doesn't exist", }, + metadata: completion.metadata, + env, }); // No attempt, so there's no message to ACK @@ -153,6 +156,8 @@ export class CompleteAttemptService extends BaseService { id: taskRunAttempt.taskRunId, status: "COMPLETED_SUCCESSFULLY", completedAt: new Date(), + metadata: completion.metadata, + env, }); // Now we need to "complete" the task run event/span @@ -306,6 +311,8 @@ export class CompleteAttemptService extends BaseService { id: taskRunAttempt.taskRunId, status, completedAt: failedAt, + metadata: completion.metadata, + env, }); if (status !== "CRASHED" && status !== "SYSTEM_FAILURE") { diff --git a/apps/webapp/app/v3/services/finalizeTaskRun.server.ts b/apps/webapp/app/v3/services/finalizeTaskRun.server.ts index d5b5018d11..c31486021b 100644 --- a/apps/webapp/app/v3/services/finalizeTaskRun.server.ts +++ b/apps/webapp/app/v3/services/finalizeTaskRun.server.ts @@ -1,4 +1,4 @@ -import { sanitizeError, TaskRunError } from "@trigger.dev/core/v3"; +import { FlushedRunMetadata, sanitizeError, TaskRunError } from "@trigger.dev/core/v3"; import { type Prisma, type TaskRun } from "@trigger.dev/database"; import { logger } from "~/services/logger.server"; import { marqs, sanitizeQueueName } from "~/v3/marqs/index.server"; @@ -15,6 +15,8 @@ import { ResumeDependentParentsService } from "./resumeDependentParents.server"; import { ExpireEnqueuedRunService } from "./expireEnqueuedRun.server"; import { socketIo } from "../handleSocketIo.server"; import { ResumeBatchRunService } from "./resumeBatchRun.server"; +import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; +import { updateMetadataService } from "~/services/metadata/updateMetadata.server"; type BaseInput = { id: string; @@ -23,6 +25,8 @@ type BaseInput = { completedAt?: Date; attemptStatus?: FINAL_ATTEMPT_STATUSES; error?: TaskRunError; + metadata?: FlushedRunMetadata; + env?: AuthenticatedEnvironment; }; type InputWithInclude = BaseInput & { @@ -46,6 +50,8 @@ export class FinalizeTaskRunService extends BaseService { include, attemptStatus, error, + metadata, + env, }: T extends Prisma.TaskRunInclude ? InputWithInclude : InputWithoutInclude): Promise< Output > { @@ -64,6 +70,24 @@ export class FinalizeTaskRunService extends BaseService { completedAt, }); + if (env && metadata) { + try { + await updateMetadataService.call(env, id, metadata); + } catch (e) { + logger.error("[FinalizeTaskRunService] Failed to update metadata", { + taskRun: id, + error: + e instanceof Error + ? { + name: e.name, + message: e.message, + stack: e.stack, + } + : e, + }); + } + } + // I moved the error update here for two reasons: // - A single update is more efficient than two // - If the status updates to a final status, realtime will receive that status and then shut down the stream diff --git a/apps/webapp/app/v3/services/resumeDependentParents.server.ts b/apps/webapp/app/v3/services/resumeDependentParents.server.ts index 97a58a7bfa..4155b7b3c0 100644 --- a/apps/webapp/app/v3/services/resumeDependentParents.server.ts +++ b/apps/webapp/app/v3/services/resumeDependentParents.server.ts @@ -21,28 +21,43 @@ type Output = error: string; }; -type Dependency = Prisma.TaskRunDependencyGetPayload<{ - include: { - taskRun: true; - dependentAttempt: true; - dependentBatchRun: true; - }; -}>; +const taskRunDependencySelect = { + select: { + id: true, + taskRunId: true, + taskRun: { + select: { + id: true, + status: true, + friendlyId: true, + runtimeEnvironment: { + select: { + type: true, + }, + }, + }, + }, + dependentAttempt: { + select: { + id: true, + }, + }, + dependentBatchRun: { + select: { + id: true, + }, + }, + }, +} as const; + +type Dependency = Prisma.TaskRunDependencyGetPayload; /** This will resume a dependent (parent) run if there is one and it makes sense. */ export class ResumeDependentParentsService extends BaseService { public async call({ id }: { id: string }): Promise { try { const dependency = await this._prisma.taskRunDependency.findFirst({ - include: { - taskRun: { - include: { - runtimeEnvironment: true, - }, - }, - dependentAttempt: true, - dependentBatchRun: true, - }, + ...taskRunDependencySelect, where: { taskRunId: id, }, @@ -65,6 +80,13 @@ export class ResumeDependentParentsService extends BaseService { }; } + if (dependency.taskRun.runtimeEnvironment.type === "DEVELOPMENT") { + return { + success: true, + action: "dev", + }; + } + if (!isFinalRunStatus(dependency.taskRun.status)) { logger.debug( "ResumeDependentParentsService: run not finished yet, can't resume parent yet", @@ -81,18 +103,6 @@ export class ResumeDependentParentsService extends BaseService { }; } - if (dependency.taskRun.runtimeEnvironment.type === "DEVELOPMENT") { - logger.debug("ResumeDependentParentsService: runs are resumed on device for DEV runs.", { - runId: id, - dependency, - }); - - return { - success: true, - action: "dev", - }; - } - if (dependency.dependentAttempt) { return this.#singleRunDependency(dependency); } else if (dependency.dependentBatchRun) { diff --git a/apps/webapp/package.json b/apps/webapp/package.json index 9cb11c3d4d..6299d7709d 100644 --- a/apps/webapp/package.json +++ b/apps/webapp/package.json @@ -118,6 +118,7 @@ "cross-env": "^7.0.3", "cuid": "^2.1.8", "dotenv": "^16.4.5", + "effect": "^3.11.7", "emails": "workspace:*", "evt": "^2.4.13", "express": "^4.18.1", diff --git a/internal-packages/database/prisma/migrations/20241216212038_add_metadata_version/migration.sql b/internal-packages/database/prisma/migrations/20241216212038_add_metadata_version/migration.sql new file mode 100644 index 0000000000..9e9fce2023 --- /dev/null +++ b/internal-packages/database/prisma/migrations/20241216212038_add_metadata_version/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable +ALTER TABLE "TaskRun" ADD COLUMN "metadataVersion" INTEGER NOT NULL DEFAULT 1; diff --git a/internal-packages/database/prisma/schema.prisma b/internal-packages/database/prisma/schema.prisma index ceaa349bb0..d8b53632ec 100644 --- a/internal-packages/database/prisma/schema.prisma +++ b/internal-packages/database/prisma/schema.prisma @@ -1778,8 +1778,9 @@ model TaskRun { seedMetadataType String @default("application/json") /// Run metadata - metadata String? - metadataType String @default("application/json") + metadata String? + metadataType String @default("application/json") + metadataVersion Int @default(1) /// Run output output String? diff --git a/packages/cli-v3/src/entryPoints/deploy-run-worker.ts b/packages/cli-v3/src/entryPoints/deploy-run-worker.ts index 59ac12bb62..a4bea9e393 100644 --- a/packages/cli-v3/src/entryPoints/deploy-run-worker.ts +++ b/packages/cli-v3/src/entryPoints/deploy-run-worker.ts @@ -363,6 +363,7 @@ const zodIpc = new ZodIpcConnection({ durationMs: usageSample.cpuTime, }, taskIdentifier: execution.task.id, + metadata: runMetadataManager.stopAndReturnLastFlush(), }, }); } @@ -387,6 +388,7 @@ const zodIpc = new ZodIpcConnection({ durationMs: usageSample.cpuTime, }, taskIdentifier: execution.task.id, + metadata: runMetadataManager.stopAndReturnLastFlush(), }, }); } @@ -410,6 +412,7 @@ const zodIpc = new ZodIpcConnection({ durationMs: 0, }, taskIdentifier: execution.task.id, + metadata: runMetadataManager.stopAndReturnLastFlush(), }, }); } diff --git a/packages/cli-v3/src/entryPoints/dev-run-worker.ts b/packages/cli-v3/src/entryPoints/dev-run-worker.ts index 5d3052de1e..8784b1dfe5 100644 --- a/packages/cli-v3/src/entryPoints/dev-run-worker.ts +++ b/packages/cli-v3/src/entryPoints/dev-run-worker.ts @@ -330,6 +330,7 @@ const zodIpc = new ZodIpcConnection({ durationMs: usageSample.cpuTime, }, taskIdentifier: execution.task.id, + metadata: runMetadataManager.stopAndReturnLastFlush(), }, }); } @@ -354,6 +355,7 @@ const zodIpc = new ZodIpcConnection({ durationMs: usageSample.cpuTime, }, taskIdentifier: execution.task.id, + metadata: runMetadataManager.stopAndReturnLastFlush(), }, }); } diff --git a/packages/core/src/v3/apiClient/index.ts b/packages/core/src/v3/apiClient/index.ts index 2b778a14d8..53892c36fd 100644 --- a/packages/core/src/v3/apiClient/index.ts +++ b/packages/core/src/v3/apiClient/index.ts @@ -50,6 +50,7 @@ import { RunSubscription, TaskRunShape, runShapeStream, + SSEStreamSubscriptionFactory, } from "./runStream.js"; import { CreateEnvironmentVariableParams, @@ -59,12 +60,14 @@ import { SubscribeToRunsQueryParams, UpdateEnvironmentVariableParams, } from "./types.js"; +import type { AsyncIterableStream } from "./stream.js"; export type { CreateEnvironmentVariableParams, ImportEnvironmentVariablesParams, SubscribeToRunsQueryParams, UpdateEnvironmentVariableParams, + AsyncIterableStream, }; export type ClientTriggerOptions = { @@ -623,9 +626,25 @@ export class ApiClient { ); } - subscribeToRun(runId: string, options?: { signal?: AbortSignal }) { + getRunMetadata(runId: string, requestOptions?: ZodFetchOptions) { + return zodfetch( + UpdateMetadataResponseBody, + `${this.baseUrl}/api/v1/runs/${runId}/metadata`, + { + method: "GET", + headers: this.#getHeaders(false), + }, + mergeRequestOptions(this.defaultRequestOptions, requestOptions) + ); + } + + subscribeToRun( + runId: string, + options?: { signal?: AbortSignal; closeOnComplete?: boolean } + ) { return runShapeStream(`${this.baseUrl}/realtime/v1/runs/${runId}`, { - closeOnComplete: true, + closeOnComplete: + typeof options?.closeOnComplete === "boolean" ? options.closeOnComplete : true, headers: this.#getRealtimeHeaders(), client: this, signal: options?.signal, @@ -663,6 +682,23 @@ export class ApiClient { }); } + async fetchStream( + runId: string, + streamKey: string, + options?: { signal?: AbortSignal; baseUrl?: string } + ): Promise> { + const streamFactory = new SSEStreamSubscriptionFactory(options?.baseUrl ?? this.baseUrl, { + headers: this.getHeaders(), + signal: options?.signal, + }); + + const subscription = streamFactory.createSubscription(runId, streamKey); + + const stream = await subscription.subscribe(); + + return stream as AsyncIterableStream; + } + async generateJWTClaims(requestOptions?: ZodFetchOptions): Promise> { return zodfetch( z.record(z.any()), diff --git a/packages/core/src/v3/apiClient/runStream.ts b/packages/core/src/v3/apiClient/runStream.ts index c1bb7c93ce..ed68b91e35 100644 --- a/packages/core/src/v3/apiClient/runStream.ts +++ b/packages/core/src/v3/apiClient/runStream.ts @@ -89,15 +89,7 @@ export function runShapeStream( ): RunSubscription { const abortController = new AbortController(); - const version1 = new SSEStreamSubscriptionFactory( - getEnvVar("TRIGGER_STREAM_URL", getEnvVar("TRIGGER_API_URL")) ?? "https://api.trigger.dev", - { - headers: options?.headers, - signal: abortController.signal, - } - ); - - const version2 = new ElectricStreamSubscriptionFactory( + const streamFactory = new SSEStreamSubscriptionFactory( getEnvVar("TRIGGER_STREAM_URL", getEnvVar("TRIGGER_API_URL")) ?? "https://api.trigger.dev", { headers: options?.headers, @@ -123,8 +115,8 @@ export function runShapeStream( const $options: RunSubscriptionOptions = { runShapeStream: runStreamInstance.stream, - stopRunShapeStream: runStreamInstance.stop, - streamFactory: new VersionedStreamSubscriptionFactory(version1, version2), + stopRunShapeStream: () => runStreamInstance.stop(30 * 1000), + streamFactory: streamFactory, abortController, ...options, }; @@ -138,12 +130,7 @@ export interface StreamSubscription { } export interface StreamSubscriptionFactory { - createSubscription( - metadata: Record, - runId: string, - streamKey: string, - baseUrl?: string - ): StreamSubscription; + createSubscription(runId: string, streamKey: string, baseUrl?: string): StreamSubscription; } // Real implementation for production @@ -194,12 +181,7 @@ export class SSEStreamSubscriptionFactory implements StreamSubscriptionFactory { private options: { headers?: Record; signal?: AbortSignal } ) {} - createSubscription( - metadata: Record, - runId: string, - streamKey: string, - baseUrl?: string - ): StreamSubscription { + createSubscription(runId: string, streamKey: string, baseUrl?: string): StreamSubscription { if (!runId || !streamKey) { throw new Error("runId and streamKey are required"); } @@ -238,63 +220,6 @@ export class ElectricStreamSubscription implements StreamSubscription { } } -export class ElectricStreamSubscriptionFactory implements StreamSubscriptionFactory { - constructor( - private baseUrl: string, - private options: { headers?: Record; signal?: AbortSignal } - ) {} - - createSubscription( - metadata: Record, - runId: string, - streamKey: string, - baseUrl?: string - ): StreamSubscription { - if (!runId || !streamKey) { - throw new Error("runId and streamKey are required"); - } - - return new ElectricStreamSubscription( - `${baseUrl ?? this.baseUrl}/realtime/v2/streams/${runId}/${streamKey}`, - this.options - ); - } -} - -export class VersionedStreamSubscriptionFactory implements StreamSubscriptionFactory { - constructor( - private version1: StreamSubscriptionFactory, - private version2: StreamSubscriptionFactory - ) {} - - createSubscription( - metadata: Record, - runId: string, - streamKey: string, - baseUrl?: string - ): StreamSubscription { - if (!runId || !streamKey) { - throw new Error("runId and streamKey are required"); - } - - const version = - typeof metadata.$$streamsVersion === "string" ? metadata.$$streamsVersion : "v1"; - - const $baseUrl = - typeof metadata.$$streamsBaseUrl === "string" ? metadata.$$streamsBaseUrl : baseUrl; - - if (version === "v1") { - return this.version1.createSubscription(metadata, runId, streamKey, $baseUrl); - } - - if (version === "v2") { - return this.version2.createSubscription(metadata, runId, streamKey, $baseUrl); - } - - throw new Error(`Unknown stream version: ${version}`); - } -} - export interface RunShapeProvider { onShape(callback: (shape: SubscribeRunRawShape) => Promise): Promise<() => void>; } @@ -324,6 +249,7 @@ export class RunSubscription { controller.enqueue(run); + // only set the run to complete when finishedAt is set this._isRunComplete = !!run.finishedAt; if ( @@ -384,36 +310,40 @@ export class RunSubscription { activeStreams.add(streamKey); const subscription = this.options.streamFactory.createSubscription( - run.metadata, run.id, streamKey, this.options.client?.baseUrl ); - const stream = await subscription.subscribe(); - - // Create the pipeline and start it - stream - .pipeThrough( - new TransformStream({ - transform(chunk, controller) { - controller.enqueue({ - type: streamKey, - chunk: chunk as TStreams[typeof streamKey], - run, - } as StreamPartResult, TStreams>); - }, - }) - ) - .pipeTo( - new WritableStream({ - write(chunk) { - controller.enqueue(chunk); - }, - }) - ) + // Start stream processing in the background + subscription + .subscribe() + .then((stream) => { + stream + .pipeThrough( + new TransformStream({ + transform(chunk, controller) { + controller.enqueue({ + type: streamKey, + chunk: chunk as TStreams[typeof streamKey], + run, + }); + }, + }) + ) + .pipeTo( + new WritableStream({ + write(chunk) { + controller.enqueue(chunk); + }, + }) + ) + .catch((error) => { + console.error(`Error in stream ${streamKey}:`, error); + }); + }) .catch((error) => { - console.error(`Error in stream ${streamKey}:`, error); + console.error(`Error subscribing to stream ${streamKey}:`, error); }); } } diff --git a/packages/core/src/v3/apiClient/stream.ts b/packages/core/src/v3/apiClient/stream.ts index 396f242186..d9124cb380 100644 --- a/packages/core/src/v3/apiClient/stream.ts +++ b/packages/core/src/v3/apiClient/stream.ts @@ -18,7 +18,7 @@ export type ZodShapeStreamOptions = { export type ZodShapeStreamInstance = { stream: AsyncIterableStream>; - stop: () => void; + stop: (delay?: number) => void; }; export function zodShapeStream( @@ -64,8 +64,16 @@ export function zodShapeStream( return { stream: stream as AsyncIterableStream>, - stop: () => { - abortController.abort(); + stop: (delay?: number) => { + if (delay) { + setTimeout(() => { + if (abortController.signal.aborted) return; + + abortController.abort(); + }, delay); + } else { + abortController.abort(); + } }, }; } diff --git a/packages/core/src/v3/run-metadata-api.ts b/packages/core/src/v3/run-metadata-api.ts index 12bc83ca39..34532356b5 100644 --- a/packages/core/src/v3/run-metadata-api.ts +++ b/packages/core/src/v3/run-metadata-api.ts @@ -3,3 +3,6 @@ import { RunMetadataAPI } from "./runMetadata/index.js"; export const runMetadata = RunMetadataAPI.getInstance(); + +export * from "./runMetadata/types.js"; +export * from "./runMetadata/operations.js"; diff --git a/packages/core/src/v3/runMetadata/index.ts b/packages/core/src/v3/runMetadata/index.ts index 16e5cf752e..e39213cd62 100644 --- a/packages/core/src/v3/runMetadata/index.ts +++ b/packages/core/src/v3/runMetadata/index.ts @@ -1,8 +1,9 @@ import { DeserializedJson } from "../../schemas/json.js"; +import { AsyncIterableStream } from "../apiClient/stream.js"; import { getGlobal, registerGlobal } from "../utils/globals.js"; import { ApiRequestOptions } from "../zodfetch.js"; import { NoopRunMetadataManager } from "./noopManager.js"; -import { RunMetadataManager } from "./types.js"; +import { RunMetadataManager, RunMetadataUpdater } from "./types.js"; const API_NAME = "run-metadata"; @@ -41,32 +42,39 @@ export class RunMetadataAPI implements RunMetadataManager { return this.#getManager().getKey(key); } - public setKey(key: string, value: DeserializedJson) { - return this.#getManager().setKey(key, value); + public set(key: string, value: DeserializedJson) { + this.#getManager().set(key, value); + return this; } - public deleteKey(key: string) { - return this.#getManager().deleteKey(key); + public del(key: string) { + this.#getManager().del(key); + return this; } - public incrementKey(key: string, value: number): void { - return this.#getManager().incrementKey(key, value); + public increment(key: string, value: number) { + this.#getManager().increment(key, value); + return this; } - decrementKey(key: string, value: number): void { - return this.#getManager().decrementKey(key, value); + decrement(key: string, value: number) { + this.#getManager().decrement(key, value); + return this; } - appendKey(key: string, value: DeserializedJson): void { - this.#getManager().appendKey(key, value); + append(key: string, value: DeserializedJson) { + this.#getManager().append(key, value); + return this; } - removeFromKey(key: string, value: DeserializedJson): void { - this.#getManager().removeFromKey(key, value); + remove(key: string, value: DeserializedJson) { + this.#getManager().remove(key, value); + return this; } - public update(metadata: Record): void { - return this.#getManager().update(metadata); + public update(metadata: Record) { + this.#getManager().update(metadata); + return this; } public stream( @@ -77,7 +85,23 @@ export class RunMetadataAPI implements RunMetadataManager { return this.#getManager().stream(key, value, signal); } + public fetchStream(key: string, signal?: AbortSignal): Promise> { + return this.#getManager().fetchStream(key, signal); + } + flush(requestOptions?: ApiRequestOptions): Promise { return this.#getManager().flush(requestOptions); } + + refresh(requestOptions?: ApiRequestOptions): Promise { + return this.#getManager().refresh(requestOptions); + } + + get parent(): RunMetadataUpdater { + return this.#getManager().parent; + } + + get root(): RunMetadataUpdater { + return this.#getManager().root; + } } diff --git a/packages/core/src/v3/runMetadata/manager.ts b/packages/core/src/v3/runMetadata/manager.ts index 470640fbed..f847d644c4 100644 --- a/packages/core/src/v3/runMetadata/manager.ts +++ b/packages/core/src/v3/runMetadata/manager.ts @@ -1,21 +1,27 @@ -import { JSONHeroPath } from "@jsonhero/path"; import { dequal } from "dequal/lite"; import { DeserializedJson } from "../../schemas/json.js"; +import { ApiClient } from "../apiClient/index.js"; +import { AsyncIterableStream } from "../apiClient/stream.js"; +import { FlushedRunMetadata, RunMetadataChangeOperation } from "../schemas/common.js"; import { ApiRequestOptions } from "../zodfetch.js"; -import { RunMetadataManager } from "./types.js"; import { MetadataStream } from "./metadataStream.js"; -import { ApiClient } from "../apiClient/index.js"; +import { applyMetadataOperations } from "./operations.js"; +import { RunMetadataManager, RunMetadataUpdater } from "./types.js"; -const MAXIMUM_ACTIVE_STREAMS = 2; -const MAXIMUM_TOTAL_STREAMS = 5; +const MAXIMUM_ACTIVE_STREAMS = 5; +const MAXIMUM_TOTAL_STREAMS = 10; export class StandardMetadataManager implements RunMetadataManager { private flushTimeoutId: NodeJS.Timeout | null = null; - private hasChanges: boolean = false; + private isFlushing: boolean = false; private store: Record | undefined; // Add a Map to track active streams private activeStreams = new Map>(); + private queuedOperations: Set = new Set(); + private queuedParentOperations: Set = new Set(); + private queuedRootOperations: Set = new Set(); + public runId: string | undefined; constructor( @@ -24,6 +30,74 @@ export class StandardMetadataManager implements RunMetadataManager { private streamsVersion: "v1" | "v2" = "v1" ) {} + get parent(): RunMetadataUpdater { + return { + set: (key, value) => { + this.queuedParentOperations.add({ type: "set", key, value }); + return this.parent; + }, + del: (key) => { + this.queuedParentOperations.add({ type: "delete", key }); + return this.parent; + }, + append: (key, value) => { + this.queuedParentOperations.add({ type: "append", key, value }); + return this.parent; + }, + remove: (key, value) => { + this.queuedParentOperations.add({ type: "remove", key, value }); + return this.parent; + }, + increment: (key, value) => { + this.queuedParentOperations.add({ type: "increment", key, value }); + return this.parent; + }, + decrement: (key, value) => { + this.queuedParentOperations.add({ type: "increment", key, value: -Math.abs(value) }); + return this.parent; + }, + update: (value) => { + this.queuedParentOperations.add({ type: "update", value }); + return this.parent; + }, + stream: (key, value, signal) => this.doStream(key, value, "parent", this.parent, signal), + }; + } + + get root(): RunMetadataUpdater { + return { + set: (key, value) => { + this.queuedRootOperations.add({ type: "set", key, value }); + return this.root; + }, + del: (key) => { + this.queuedRootOperations.add({ type: "delete", key }); + return this.root; + }, + append: (key, value) => { + this.queuedRootOperations.add({ type: "append", key, value }); + return this.root; + }, + remove: (key, value) => { + this.queuedRootOperations.add({ type: "remove", key, value }); + return this.root; + }, + increment: (key, value) => { + this.queuedRootOperations.add({ type: "increment", key, value }); + return this.root; + }, + decrement: (key, value) => { + this.queuedRootOperations.add({ type: "increment", key, value: -Math.abs(value) }); + return this.root; + }, + update: (value) => { + this.queuedRootOperations.add({ type: "update", value }); + return this.root; + }, + stream: (key, value, signal) => this.doStream(key, value, "root", this.root, signal), + }; + } + public enterWithMetadata(metadata: Record): void { this.store = metadata ?? {}; } @@ -36,173 +110,110 @@ export class StandardMetadataManager implements RunMetadataManager { return this.store?.[key]; } - public setKey(key: string, value: DeserializedJson) { - if (!this.runId) { - return; - } + private enqueueOperation(operation: RunMetadataChangeOperation) { + const applyResults = applyMetadataOperations(this.store ?? {}, operation); - let nextStore: Record | undefined = this.store - ? structuredClone(this.store) - : undefined; - - if (key.startsWith("$.")) { - const path = new JSONHeroPath(key); - path.set(nextStore, value); - } else { - nextStore = { - ...(nextStore ?? {}), - [key]: value, - }; - } - - if (!nextStore) { + if (applyResults.unappliedOperations.length > 0) { return; } - if (!dequal(this.store, nextStore)) { - this.hasChanges = true; + if (dequal(this.store, applyResults.newMetadata)) { + return; } - this.store = nextStore; + this.queuedOperations.add(operation); + this.store = applyResults.newMetadata as Record; } - public deleteKey(key: string) { + public set(key: string, value: DeserializedJson) { if (!this.runId) { - return; + return this; } - const nextStore = { ...(this.store ?? {}) }; - delete nextStore[key]; - - if (!dequal(this.store, nextStore)) { - this.hasChanges = true; - } + this.enqueueOperation({ type: "set", key, value }); - this.store = nextStore; + return this; } - public appendKey(key: string, value: DeserializedJson) { + public del(key: string) { if (!this.runId) { - return; + return this; } - let nextStore: Record | undefined = this.store - ? structuredClone(this.store) - : {}; - - if (key.startsWith("$.")) { - const path = new JSONHeroPath(key); - const currentValue = path.first(nextStore); - - if (currentValue === undefined) { - // Initialize as array with single item - path.set(nextStore, [value]); - } else if (Array.isArray(currentValue)) { - // Append to existing array - path.set(nextStore, [...currentValue, value]); - } else { - // Convert to array if not already - path.set(nextStore, [currentValue, value]); - } - } else { - const currentValue = nextStore[key]; - - if (currentValue === undefined) { - // Initialize as array with single item - nextStore[key] = [value]; - } else if (Array.isArray(currentValue)) { - // Append to existing array - nextStore[key] = [...currentValue, value]; - } else { - // Convert to array if not already - nextStore[key] = [currentValue, value]; - } - } - - if (!dequal(this.store, nextStore)) { - this.hasChanges = true; - } + this.enqueueOperation({ type: "delete", key }); - this.store = nextStore; + return this; } - public removeFromKey(key: string, value: DeserializedJson) { + public append(key: string, value: DeserializedJson) { if (!this.runId) { - return; + return this; } - let nextStore: Record | undefined = this.store - ? structuredClone(this.store) - : {}; - - if (key.startsWith("$.")) { - const path = new JSONHeroPath(key); - const currentValue = path.first(nextStore); + this.enqueueOperation({ type: "append", key, value }); - if (Array.isArray(currentValue)) { - // Remove the value from array using deep equality check - const newArray = currentValue.filter((item) => !dequal(item, value)); - path.set(nextStore, newArray); - } - } else { - const currentValue = nextStore[key]; + return this; + } - if (Array.isArray(currentValue)) { - // Remove the value from array using deep equality check - nextStore[key] = currentValue.filter((item) => !dequal(item, value)); - } + public remove(key: string, value: DeserializedJson) { + if (!this.runId) { + return this; } - if (!dequal(this.store, nextStore)) { - this.hasChanges = true; - } + this.enqueueOperation({ type: "remove", key, value }); - this.store = nextStore; + return this; } - public incrementKey(key: string, increment: number = 1) { + public increment(key: string, increment: number = 1) { if (!this.runId) { - return; + return this; } - let nextStore = this.store ? structuredClone(this.store) : {}; - let currentValue = key.startsWith("$.") - ? new JSONHeroPath(key).first(nextStore) - : nextStore[key]; + this.enqueueOperation({ type: "increment", key, value: increment }); - const newValue = (typeof currentValue === "number" ? currentValue : 0) + increment; + return this; + } - if (key.startsWith("$.")) { - new JSONHeroPath(key).set(nextStore, newValue); - } else { - nextStore[key] = newValue; - } + public decrement(key: string, decrement: number = 1) { + return this.increment(key, -decrement); + } - if (!dequal(this.store, nextStore)) { - this.hasChanges = true; - this.store = nextStore; + public update(metadata: Record) { + if (!this.runId) { + return this; } + + this.enqueueOperation({ type: "update", value: metadata }); + + return this; } - public decrementKey(key: string, decrement: number = 1) { - this.incrementKey(key, -decrement); + public async stream( + key: string, + value: AsyncIterable | ReadableStream, + signal?: AbortSignal + ): Promise> { + return this.doStream(key, value, "self", this, signal); } - public update(metadata: Record): void { + public async fetchStream(key: string, signal?: AbortSignal): Promise> { if (!this.runId) { - return; + throw new Error("Run ID is required to fetch metadata streams."); } - if (!dequal(this.store, metadata)) { - this.hasChanges = true; - } + const baseUrl = this.getKey("$$streamsBaseUrl"); + + const $baseUrl = typeof baseUrl === "string" ? baseUrl : this.streamsBaseUrl; - this.store = metadata; + return this.apiClient.fetchStream(this.runId, key, { baseUrl: $baseUrl, signal }); } - public async stream( + private async doStream( key: string, value: AsyncIterable | ReadableStream, + target: "self" | "parent" | "root", + updater: RunMetadataUpdater = this, signal?: AbortSignal ): Promise> { const $value = value as AsyncIterable; @@ -238,6 +249,7 @@ export class StandardMetadataManager implements RunMetadataManager { headers: this.apiClient.getHeaders(), signal, version: this.streamsVersion, + target, }); this.activeStreams.set(key, streamInstance); @@ -246,16 +258,17 @@ export class StandardMetadataManager implements RunMetadataManager { streamInstance.wait().finally(() => this.activeStreams.delete(key)); // Add the key to the special stream metadata object - this.appendKey(`$$streams`, key); - this.setKey("$$streamsVersion", this.streamsVersion); - this.setKey("$$streamsBaseUrl", this.streamsBaseUrl); + updater + .append(`$$streams`, key) + .set("$$streamsVersion", this.streamsVersion) + .set("$$streamsBaseUrl", this.streamsBaseUrl); await this.flush(); return streamInstance; } catch (error) { // Clean up metadata key if stream creation fails - this.removeFromKey(`$$streams`, key); + updater.remove(`$$streams`, key); throw error; } } @@ -289,36 +302,72 @@ export class StandardMetadataManager implements RunMetadataManager { } } + public async refresh(requestOptions?: ApiRequestOptions): Promise { + if (!this.runId) { + return; + } + + try { + const metadata = await this.apiClient.getRunMetadata(this.runId, requestOptions); + this.store = metadata.metadata; + } catch (error) { + console.error("Failed to refresh metadata", error); + throw error; + } + } + public async flush(requestOptions?: ApiRequestOptions): Promise { if (!this.runId) { return; } - if (!this.store) { + if (!this.#needsFlush()) { return; } - if (!this.hasChanges) { + if (this.isFlushing) { return; } + this.isFlushing = true; + + const operations = Array.from(this.queuedOperations); + this.queuedOperations.clear(); + + const parentOperations = Array.from(this.queuedParentOperations); + this.queuedParentOperations.clear(); + + const rootOperations = Array.from(this.queuedRootOperations); + this.queuedRootOperations.clear(); + try { - this.hasChanges = false; - await this.apiClient.updateRunMetadata(this.runId, { metadata: this.store }, requestOptions); + const response = await this.apiClient.updateRunMetadata( + this.runId, + { operations, parentOperations, rootOperations }, + requestOptions + ); + + this.store = response.metadata; } catch (error) { - this.hasChanges = true; - throw error; + console.error("Failed to flush metadata", error); + } finally { + this.isFlushing = false; } } public startPeriodicFlush(intervalMs: number = 1000) { const periodicFlush = async (intervalMs: number) => { + if (this.isFlushing) { + return; + } + try { await this.flush(); } catch (error) { console.error("Failed to flush metadata", error); throw error; } finally { + this.isFlushing = false; scheduleNext(); } }; @@ -336,4 +385,27 @@ export class StandardMetadataManager implements RunMetadataManager { this.flushTimeoutId = null; } } + + stopAndReturnLastFlush(): FlushedRunMetadata | undefined { + this.stopPeriodicFlush(); + this.isFlushing = true; + + if (!this.#needsFlush()) { + return; + } + + return { + operations: Array.from(this.queuedOperations), + parentOperations: Array.from(this.queuedParentOperations), + rootOperations: Array.from(this.queuedRootOperations), + }; + } + + #needsFlush(): boolean { + return ( + this.queuedOperations.size > 0 || + this.queuedParentOperations.size > 0 || + this.queuedRootOperations.size > 0 + ); + } } diff --git a/packages/core/src/v3/runMetadata/metadataStream.ts b/packages/core/src/v3/runMetadata/metadataStream.ts index 1d6143f5cf..0ec263666b 100644 --- a/packages/core/src/v3/runMetadata/metadataStream.ts +++ b/packages/core/src/v3/runMetadata/metadataStream.ts @@ -6,6 +6,7 @@ export type MetadataOptions = { headers?: Record; signal?: AbortSignal; version?: "v1" | "v2"; + target?: "self" | "parent" | "root"; }; export class MetadataStream { @@ -45,19 +46,14 @@ export class MetadataStream { }) ); - return fetch( - `${this.options.baseUrl}/realtime/${this.options.version ?? "v1"}/streams/${ - this.options.runId - }/${this.options.key}`, - { - method: "POST", - headers: this.options.headers ?? {}, - body: serverStream, - // @ts-expect-error - duplex: "half", - signal: this.controller.signal, - } - ); + return fetch(this.buildUrl(), { + method: "POST", + headers: this.options.headers ?? {}, + body: serverStream, + signal: this.controller.signal, + // @ts-expect-error + duplex: "half", + }); } public async wait(): Promise { @@ -67,6 +63,19 @@ export class MetadataStream { public [Symbol.asyncIterator]() { return streamToAsyncIterator(this.consumerStream); } + + private buildUrl(): string { + switch (this.options.version ?? "v1") { + case "v1": { + return `${this.options.baseUrl}/realtime/v1/streams/${this.options.runId}/${ + this.options.target ?? "self" + }/${this.options.key}`; + } + case "v2": { + return `${this.options.baseUrl}/realtime/v2/streams/${this.options.runId}/${this.options.key}`; + } + } + } } async function* streamToAsyncIterator(stream: ReadableStream): AsyncIterableIterator { diff --git a/packages/core/src/v3/runMetadata/noopManager.ts b/packages/core/src/v3/runMetadata/noopManager.ts index eb7937f886..03758d9032 100644 --- a/packages/core/src/v3/runMetadata/noopManager.ts +++ b/packages/core/src/v3/runMetadata/noopManager.ts @@ -1,26 +1,33 @@ import { DeserializedJson } from "../../schemas/json.js"; +import { AsyncIterableStream } from "../apiClient/stream.js"; import { ApiRequestOptions } from "../zodfetch.js"; -import type { RunMetadataManager } from "./types.js"; +import type { RunMetadataManager, RunMetadataUpdater } from "./types.js"; export class NoopRunMetadataManager implements RunMetadataManager { - appendKey(key: string, value: DeserializedJson): void { + append(key: string, value: DeserializedJson): this { throw new Error("Method not implemented."); } - removeFromKey(key: string, value: DeserializedJson): void { + remove(key: string, value: DeserializedJson): this { throw new Error("Method not implemented."); } - incrementKey(key: string, value: number): void { + increment(key: string, value: number): this { throw new Error("Method not implemented."); } - decrementKey(key: string, value: number): void { + decrement(key: string, value: number): this { throw new Error("Method not implemented."); } stream(key: string, value: AsyncIterable): Promise> { throw new Error("Method not implemented."); } + fetchStream(key: string, signal?: AbortSignal): Promise> { + throw new Error("Method not implemented."); + } flush(requestOptions?: ApiRequestOptions): Promise { throw new Error("Method not implemented."); } + refresh(requestOptions?: ApiRequestOptions): Promise { + throw new Error("Method not implemented."); + } enterWithMetadata(metadata: Record): void {} current(): Record | undefined { throw new Error("Method not implemented."); @@ -28,13 +35,49 @@ export class NoopRunMetadataManager implements RunMetadataManager { getKey(key: string): DeserializedJson | undefined { throw new Error("Method not implemented."); } - setKey(key: string, value: DeserializedJson): void { + set(key: string, value: DeserializedJson): this { throw new Error("Method not implemented."); } - deleteKey(key: string): void { + del(key: string): this { throw new Error("Method not implemented."); } - update(metadata: Record): void { + update(metadata: Record): this { throw new Error("Method not implemented."); } + + get parent(): RunMetadataUpdater { + return { + append: () => this.parent, + set: () => this.parent, + del: () => this.parent, + increment: () => this.parent, + decrement: () => this.parent, + remove: () => this.parent, + stream: () => + Promise.resolve({ + [Symbol.asyncIterator]: () => ({ + next: () => Promise.resolve({ done: true, value: undefined }), + }), + }), + update: () => this.parent, + }; + } + + get root(): RunMetadataUpdater { + return { + append: () => this.root, + set: () => this.root, + del: () => this.root, + increment: () => this.root, + decrement: () => this.root, + remove: () => this.root, + stream: () => + Promise.resolve({ + [Symbol.asyncIterator]: () => ({ + next: () => Promise.resolve({ done: true, value: undefined }), + }), + }), + update: () => this.root, + }; + } } diff --git a/packages/core/src/v3/runMetadata/operations.ts b/packages/core/src/v3/runMetadata/operations.ts new file mode 100644 index 0000000000..f4a0f000d9 --- /dev/null +++ b/packages/core/src/v3/runMetadata/operations.ts @@ -0,0 +1,130 @@ +import { JSONHeroPath } from "@jsonhero/path"; +import { RunMetadataChangeOperation } from "../schemas/common.js"; +import { dequal } from "dequal"; + +export type ApplyOperationResult = { + newMetadata: Record; + unappliedOperations: RunMetadataChangeOperation[]; +}; + +export function applyMetadataOperations( + currentMetadata: Record, + operations: RunMetadataChangeOperation | RunMetadataChangeOperation[] +): ApplyOperationResult { + const unappliedOperations: RunMetadataChangeOperation[] = []; + // Start with a mutable copy of the current metadata + let newMetadata: Record = structuredClone(currentMetadata); + + for (const operation of Array.isArray(operations) ? operations : [operations]) { + switch (operation.type) { + case "set": { + if (operation.key.startsWith("$.")) { + const path = new JSONHeroPath(operation.key); + path.set(newMetadata, operation.value); + } else { + // Set the value directly + newMetadata[operation.key] = operation.value; + } + + break; + } + + case "delete": { + // Safely delete the key if it exists + if (operation.key in newMetadata) { + delete newMetadata[operation.key]; + } + break; + } + + case "append": { + if (operation.key.startsWith("$.")) { + const path = new JSONHeroPath(operation.key); + const currentValue = path.first(newMetadata); + + if (currentValue === undefined) { + // Initialize as array with single item + path.set(newMetadata, [operation.value]); + } else if (Array.isArray(currentValue)) { + // Append to existing array + path.set(newMetadata, [...currentValue, operation.value]); + } else { + // Convert to array if not already + path.set(newMetadata, [currentValue, operation.value]); + } + } else { + // Ensure the value at key is an array or initialize as an array + const existingValue = newMetadata[operation.key]; + if (Array.isArray(existingValue)) { + existingValue.push(operation.value); + } else if (existingValue === undefined) { + newMetadata[operation.key] = [operation.value]; + } else { + // Convert to array if not already + newMetadata[operation.key] = [existingValue, operation.value]; + } + } + + break; + } + + case "remove": { + if (operation.key.startsWith("$.")) { + const path = new JSONHeroPath(operation.key); + const currentValue = path.first(newMetadata); + + if (Array.isArray(currentValue)) { + // Remove the value from array using deep equality check + const newArray = currentValue.filter((item) => !dequal(item, operation.value)); + path.set(newMetadata, newArray); + } else { + unappliedOperations.push(operation); + } + } else { + // Remove matching values if the key points to an array + const existingValue = newMetadata[operation.key]; + + if (Array.isArray(existingValue)) { + newMetadata[operation.key] = existingValue.filter( + (item) => !dequal(item, operation.value) + ); + } else { + unappliedOperations.push(operation); + } + } + + break; + } + + case "increment": { + let currentValue = operation.key.startsWith("$.") + ? new JSONHeroPath(operation.key).first(newMetadata) + : newMetadata[operation.key]; + + const newValue = (typeof currentValue === "number" ? currentValue : 0) + operation.value; + + if (operation.key.startsWith("$.")) { + new JSONHeroPath(operation.key).set(newMetadata, newValue); + } else { + newMetadata[operation.key] = newValue; + } + + break; + } + + case "update": { + // Update the metadata object with the new object + newMetadata = operation.value; + break; + } + + default: { + // Log unsupported operation type + unappliedOperations.push(operation); + break; + } + } + } + + return { newMetadata, unappliedOperations }; +} diff --git a/packages/core/src/v3/runMetadata/types.ts b/packages/core/src/v3/runMetadata/types.ts index 94133313da..10cb506ec1 100644 --- a/packages/core/src/v3/runMetadata/types.ts +++ b/packages/core/src/v3/runMetadata/types.ts @@ -1,22 +1,31 @@ import { DeserializedJson } from "../../schemas/json.js"; +import { AsyncIterableStream } from "../apiClient/stream.js"; import { ApiRequestOptions } from "../zodfetch.js"; -export interface RunMetadataManager { - // Instance Methods - enterWithMetadata(metadata: Record): void; - current(): Record | undefined; - getKey(key: string): DeserializedJson | undefined; - setKey(key: string, value: DeserializedJson): void; - deleteKey(key: string): void; - appendKey(key: string, value: DeserializedJson): void; - removeFromKey(key: string, value: DeserializedJson): void; - incrementKey(key: string, value: number): void; - decrementKey(key: string, value: number): void; - update(metadata: Record): void; - flush(requestOptions?: ApiRequestOptions): Promise; +export interface RunMetadataUpdater { + set(key: string, value: DeserializedJson): this; + del(key: string): this; + append(key: string, value: DeserializedJson): this; + remove(key: string, value: DeserializedJson): this; + increment(key: string, value: number): this; + decrement(key: string, value: number): this; + update(metadata: Record): this; stream( key: string, value: AsyncIterable | ReadableStream, signal?: AbortSignal ): Promise>; } + +export interface RunMetadataManager extends RunMetadataUpdater { + // Instance Methods + enterWithMetadata(metadata: Record): void; + current(): Record | undefined; + getKey(key: string): DeserializedJson | undefined; + flush(requestOptions?: ApiRequestOptions): Promise; + refresh(requestOptions?: ApiRequestOptions): Promise; + fetchStream(key: string, signal?: AbortSignal): Promise>; + + get parent(): RunMetadataUpdater; + get root(): RunMetadataUpdater; +} diff --git a/packages/core/src/v3/runtime/devRuntimeManager.ts b/packages/core/src/v3/runtime/devRuntimeManager.ts index 07e933edf4..acad2c3d0f 100644 --- a/packages/core/src/v3/runtime/devRuntimeManager.ts +++ b/packages/core/src/v3/runtime/devRuntimeManager.ts @@ -1,3 +1,4 @@ +import { runMetadata } from "../run-metadata-api.js"; import { BatchTaskRunExecutionResult, TaskRunContext, @@ -41,6 +42,8 @@ export class DevRuntimeManager implements RuntimeManager { this._taskWaits.set(params.id, { resolve }); }); + await this.#tryFlushMetadata(); + return await promise; } @@ -71,6 +74,8 @@ export class DevRuntimeManager implements RuntimeManager { }) ); + await this.#tryFlushMetadata(); + const results = await promise; return { @@ -93,4 +98,10 @@ export class DevRuntimeManager implements RuntimeManager { this._taskWaits.delete(runId); } + + async #tryFlushMetadata() { + try { + await runMetadata.flush(); + } catch (err) {} + } } diff --git a/packages/core/src/v3/schemas/api.ts b/packages/core/src/v3/schemas/api.ts index 55189d8140..492b4539e7 100644 --- a/packages/core/src/v3/schemas/api.ts +++ b/packages/core/src/v3/schemas/api.ts @@ -1,6 +1,11 @@ import { z } from "zod"; import { DeserializedJsonSchema } from "../../schemas/json.js"; -import { SerializedError, TaskRunError } from "./common.js"; +import { + FlushedRunMetadata, + RunMetadataChangeOperation, + SerializedError, + TaskRunError, +} from "./common.js"; import { BackgroundWorkerMetadata } from "./resources.js"; import { QueueOptions } from "./schemas.js"; @@ -669,10 +674,7 @@ export const EnvironmentVariables = z.array(EnvironmentVariable); export type EnvironmentVariables = z.infer; -export const UpdateMetadataRequestBody = z.object({ - metadata: z.record(DeserializedJsonSchema), - metadataType: z.string().optional(), -}); +export const UpdateMetadataRequestBody = FlushedRunMetadata; export type UpdateMetadataRequestBody = z.infer; diff --git a/packages/core/src/v3/schemas/common.ts b/packages/core/src/v3/schemas/common.ts index 270d87e6df..ca61333684 100644 --- a/packages/core/src/v3/schemas/common.ts +++ b/packages/core/src/v3/schemas/common.ts @@ -1,6 +1,72 @@ import { z } from "zod"; import { DeserializedJsonSchema } from "../../schemas/json.js"; +export const RunMetadataUpdateOperation = z.object({ + type: z.literal("update"), + value: z.record(z.unknown()), +}); + +export type RunMetadataUpdateOperation = z.infer; + +export const RunMetadataSetKeyOperation = z.object({ + type: z.literal("set"), + key: z.string(), + value: DeserializedJsonSchema, +}); + +export type RunMetadataSetKeyOperation = z.infer; + +export const RunMetadataDeleteKeyOperation = z.object({ + type: z.literal("delete"), + key: z.string(), +}); + +export type RunMetadataDeleteKeyOperation = z.infer; + +export const RunMetadataAppendKeyOperation = z.object({ + type: z.literal("append"), + key: z.string(), + value: DeserializedJsonSchema, +}); + +export type RunMetadataAppendKeyOperation = z.infer; + +export const RunMetadataRemoveFromKeyOperation = z.object({ + type: z.literal("remove"), + key: z.string(), + value: DeserializedJsonSchema, +}); + +export type RunMetadataRemoveFromKeyOperation = z.infer; + +export const RunMetadataIncrementKeyOperation = z.object({ + type: z.literal("increment"), + key: z.string(), + value: z.number(), +}); + +export type RunMetadataIncrementKeyOperation = z.infer; + +export const RunMetadataChangeOperation = z.discriminatedUnion("type", [ + RunMetadataUpdateOperation, + RunMetadataSetKeyOperation, + RunMetadataDeleteKeyOperation, + RunMetadataAppendKeyOperation, + RunMetadataRemoveFromKeyOperation, + RunMetadataIncrementKeyOperation, +]); + +export type RunMetadataChangeOperation = z.infer; + +export const FlushedRunMetadata = z.object({ + metadata: z.record(DeserializedJsonSchema).optional(), + operations: z.array(RunMetadataChangeOperation).optional(), + parentOperations: z.array(RunMetadataChangeOperation).optional(), + rootOperations: z.array(RunMetadataChangeOperation).optional(), +}); + +export type FlushedRunMetadata = z.infer; + // Defaults to 0.5 export const MachineCpu = z.union([ z.literal(0.25), @@ -253,6 +319,7 @@ export const TaskRunFailedExecutionResult = z.object({ usage: TaskRunExecutionUsage.optional(), // Optional for now for backwards compatibility taskIdentifier: z.string().optional(), + metadata: FlushedRunMetadata.optional(), }); export type TaskRunFailedExecutionResult = z.infer; @@ -265,6 +332,7 @@ export const TaskRunSuccessfulExecutionResult = z.object({ usage: TaskRunExecutionUsage.optional(), // Optional for now for backwards compatibility taskIdentifier: z.string().optional(), + metadata: FlushedRunMetadata.optional(), }); export type TaskRunSuccessfulExecutionResult = z.infer; diff --git a/packages/core/test/runStream.test.ts b/packages/core/test/runStream.test.ts index 6924d07681..c8b15a7d4d 100644 --- a/packages/core/test/runStream.test.ts +++ b/packages/core/test/runStream.test.ts @@ -31,11 +31,7 @@ class TestStreamSubscriptionFactory implements StreamSubscriptionFactory { this.streams.set(`${runId}:${streamKey}`, chunks); } - createSubscription( - metadata: Record, - runId: string, - streamKey: string - ): StreamSubscription { + createSubscription(runId: string, streamKey: string): StreamSubscription { const chunks = this.streams.get(`${runId}:${streamKey}`) ?? []; return new TestStreamSubscription(chunks); } @@ -285,13 +281,9 @@ describe("RunSubscription", () => { // Override createSubscription to count calls const originalCreate = streamFactory.createSubscription.bind(streamFactory); - streamFactory.createSubscription = ( - metadata: Record, - runId: string, - streamKey: string - ) => { + streamFactory.createSubscription = (runId: string, streamKey: string) => { streamCreationCount++; - return originalCreate(metadata, runId, streamKey); + return originalCreate(runId, streamKey); }; // Set up test chunks diff --git a/packages/core/test/standardMetadataManager.test.ts b/packages/core/test/standardMetadataManager.test.ts index b88bc7fc1a..bd1592739d 100644 --- a/packages/core/test/standardMetadataManager.test.ts +++ b/packages/core/test/standardMetadataManager.test.ts @@ -2,22 +2,30 @@ import { describe, test, expect, beforeEach, afterEach } from "vitest"; import { createTestHttpServer } from "@epic-web/test-server/http"; import { StandardMetadataManager } from "../src/v3/runMetadata/manager.js"; import { ApiClient } from "../src/v3/apiClient/index.js"; +import { UpdateMetadataRequestBody } from "../src/v3/schemas/index.js"; +import { applyMetadataOperations } from "../src/v3/runMetadata/operations.js"; describe("StandardMetadataManager", () => { const runId = "test-run-id"; let server: Awaited>; - let metadataUpdates: Array> = []; + let metadataUpdates: Array = []; let manager: StandardMetadataManager; beforeEach(async () => { metadataUpdates = []; + const store = {}; server = await createTestHttpServer({ defineRoutes(router) { router.put("/api/v1/runs/:runId/metadata", async ({ req }) => { const body = await req.json(); - metadataUpdates.push(body); - return Response.json({ metadata: body.metadata }); + const parsedBody = UpdateMetadataRequestBody.parse(body); + + metadataUpdates.push(parsedBody); + + const { newMetadata } = applyMetadataOperations(store, parsedBody.operations ?? []); + + return Response.json({ metadata: newMetadata }); }); }, }); @@ -37,13 +45,13 @@ describe("StandardMetadataManager", () => { }); test("should set and get simple keys", () => { - manager.setKey("test", "value"); + manager.set("test", "value"); expect(manager.getKey("test")).toBe("value"); }); test("should handle JSON path keys", () => { - manager.setKey("nested", { foo: "bar" }); - manager.setKey("$.nested.path", "value"); + manager.set("nested", { foo: "bar" }); + manager.set("$.nested.path", "value"); expect(manager.current()).toEqual({ nested: { foo: "bar", @@ -53,31 +61,26 @@ describe("StandardMetadataManager", () => { }); test("should flush changes to server", async () => { - manager.setKey("test", "value"); + manager.set("test", "value"); await manager.flush(); expect(metadataUpdates).toHaveLength(1); - expect(metadataUpdates[0]).toEqual({ - metadata: { - test: "value", - }, - }); }); test("should only flush to server when data has actually changed", async () => { // Initial set and flush - manager.setKey("test", "value"); + manager.set("test", "value"); await manager.flush(); expect(metadataUpdates).toHaveLength(1); // Same value set again - manager.setKey("test", "value"); + manager.set("test", "value"); await manager.flush(); // Should not trigger another update since value hasn't changed expect(metadataUpdates).toHaveLength(1); // Different value set - manager.setKey("test", "new value"); + manager.set("test", "new value"); await manager.flush(); // Should trigger new update expect(metadataUpdates).toHaveLength(2); @@ -85,18 +88,18 @@ describe("StandardMetadataManager", () => { test("should only flush to server when nested data has actually changed", async () => { // Initial nested object - manager.setKey("nested", { foo: "bar" }); + manager.set("nested", { foo: "bar" }); await manager.flush(); expect(metadataUpdates).toHaveLength(1); // Same nested value - manager.setKey("nested", { foo: "bar" }); + manager.set("nested", { foo: "bar" }); await manager.flush(); // Should not trigger another update expect(metadataUpdates).toHaveLength(1); // Different nested value - manager.setKey("nested", { foo: "baz" }); + manager.set("nested", { foo: "baz" }); await manager.flush(); // Should trigger new update expect(metadataUpdates).toHaveLength(2); @@ -104,20 +107,20 @@ describe("StandardMetadataManager", () => { test("should append to list with simple key", () => { // First append creates the array - manager.appendKey("myList", "first"); + manager.append("myList", "first"); expect(manager.getKey("myList")).toEqual(["first"]); // Second append adds to existing array - manager.appendKey("myList", "second"); + manager.append("myList", "second"); expect(manager.getKey("myList")).toEqual(["first", "second"]); }); test("should append to list with JSON path", () => { // First create nested structure - manager.setKey("nested", { items: [] }); + manager.set("nested", { items: [] }); // Append to empty array - manager.appendKey("$.nested.items", "first"); + manager.append("$.nested.items", "first"); expect(manager.current()).toEqual({ nested: { items: ["first"], @@ -125,7 +128,7 @@ describe("StandardMetadataManager", () => { }); // Append another item - manager.appendKey("$.nested.items", "second"); + manager.append("$.nested.items", "second"); expect(manager.current()).toEqual({ nested: { items: ["first", "second"], @@ -135,19 +138,19 @@ describe("StandardMetadataManager", () => { test("should convert non-array values to array when appending", () => { // Set initial non-array value - manager.setKey("value", "initial"); + manager.set("value", "initial"); // Append should convert to array - manager.appendKey("value", "second"); + manager.append("value", "second"); expect(manager.getKey("value")).toEqual(["initial", "second"]); }); test("should convert non-array values to array when appending with JSON path", () => { // Set initial nested non-array value - manager.setKey("nested", { value: "initial" }); + manager.set("nested", { value: "initial" }); // Append should convert to array - manager.appendKey("$.nested.value", "second"); + manager.append("$.nested.value", "second"); expect(manager.current()).toEqual({ nested: { value: ["initial", "second"], @@ -156,35 +159,25 @@ describe("StandardMetadataManager", () => { }); test("should trigger server update when appending to list", async () => { - manager.appendKey("myList", "first"); + manager.append("myList", "first"); await manager.flush(); expect(metadataUpdates).toHaveLength(1); - expect(metadataUpdates[0]).toEqual({ - metadata: { - myList: ["first"], - }, - }); - manager.appendKey("myList", "second"); + manager.append("myList", "second"); await manager.flush(); expect(metadataUpdates).toHaveLength(2); - expect(metadataUpdates[1]).toEqual({ - metadata: { - myList: ["first", "second"], - }, - }); }); test("should not trigger server update when appending same value", async () => { - manager.appendKey("myList", "first"); + manager.append("myList", "first"); await manager.flush(); expect(metadataUpdates).toHaveLength(1); // Append same value - manager.appendKey("myList", "first"); + manager.append("myList", "first"); await manager.flush(); // Should still be only one update @@ -192,34 +185,34 @@ describe("StandardMetadataManager", () => { }); test("should increment and decrement keys", () => { - manager.incrementKey("counter"); + manager.increment("counter"); expect(manager.getKey("counter")).toBe(1); - manager.incrementKey("counter", 5); + manager.increment("counter", 5); expect(manager.getKey("counter")).toBe(6); - manager.decrementKey("counter"); + manager.decrement("counter"); expect(manager.getKey("counter")).toBe(5); - manager.decrementKey("counter", 3); + manager.decrement("counter", 3); expect(manager.getKey("counter")).toBe(2); }); test("should remove value from array with simple key", () => { // Setup initial array - manager.setKey("myList", ["first", "second", "third"]); + manager.set("myList", ["first", "second", "third"]); // Remove a value - manager.removeFromKey("myList", "second"); + manager.remove("myList", "second"); expect(manager.getKey("myList")).toEqual(["first", "third"]); }); test("should remove value from array with JSON path", () => { // Setup initial nested array - manager.setKey("nested", { items: ["first", "second", "third"] }); + manager.set("nested", { items: ["first", "second", "third"] }); // Remove a value - manager.removeFromKey("$.nested.items", "second"); + manager.remove("$.nested.items", "second"); expect(manager.current()).toEqual({ nested: { items: ["first", "third"], @@ -229,32 +222,32 @@ describe("StandardMetadataManager", () => { test("should handle removing non-existent value", () => { // Setup initial array - manager.setKey("myList", ["first", "second"]); + manager.set("myList", ["first", "second"]); // Try to remove non-existent value - manager.removeFromKey("myList", "third"); + manager.remove("myList", "third"); expect(manager.getKey("myList")).toEqual(["first", "second"]); }); test("should handle removing from non-array values", () => { // Setup non-array value - manager.setKey("value", "string"); + manager.set("value", "string"); // Try to remove from non-array - manager.removeFromKey("value", "something"); + manager.remove("value", "something"); expect(manager.getKey("value")).toBe("string"); }); test("should remove object from array using deep equality", () => { // Setup array with objects - manager.setKey("objects", [ + manager.set("objects", [ { id: 1, name: "first" }, { id: 2, name: "second" }, { id: 3, name: "third" }, ]); // Remove object - manager.removeFromKey("objects", { id: 2, name: "second" }); + manager.remove("objects", { id: 2, name: "second" }); expect(manager.getKey("objects")).toEqual([ { id: 1, name: "first" }, { id: 3, name: "third" }, @@ -263,30 +256,25 @@ describe("StandardMetadataManager", () => { test("should trigger server update when removing from array", async () => { // Setup initial array - manager.setKey("myList", ["first", "second", "third"]); + manager.set("myList", ["first", "second", "third"]); await manager.flush(); expect(metadataUpdates).toHaveLength(1); // Remove value - manager.removeFromKey("myList", "second"); + manager.remove("myList", "second"); await manager.flush(); expect(metadataUpdates).toHaveLength(2); - expect(metadataUpdates[1]).toEqual({ - metadata: { - myList: ["first", "third"], - }, - }); }); test("should not trigger server update when removing non-existent value", async () => { // Setup initial array - manager.setKey("myList", ["first", "second"]); + manager.set("myList", ["first", "second"]); await manager.flush(); expect(metadataUpdates).toHaveLength(1); // Try to remove non-existent value - manager.removeFromKey("myList", "third"); + manager.remove("myList", "third"); await manager.flush(); // Should not trigger new update since nothing changed diff --git a/packages/react-hooks/package.json b/packages/react-hooks/package.json index 94a8c3bffd..303aa94701 100644 --- a/packages/react-hooks/package.json +++ b/packages/react-hooks/package.json @@ -74,4 +74,4 @@ "main": "./dist/commonjs/index.js", "types": "./dist/commonjs/index.d.ts", "module": "./dist/esm/index.js" -} \ No newline at end of file +} diff --git a/packages/react-hooks/src/hooks/useRealtime.ts b/packages/react-hooks/src/hooks/useRealtime.ts index 8c370666a5..f018df6ecc 100644 --- a/packages/react-hooks/src/hooks/useRealtime.ts +++ b/packages/react-hooks/src/hooks/useRealtime.ts @@ -20,6 +20,15 @@ export type UseRealtimeSingleRunOptions = UseRe * @param {Error} [err] - The error that occurred */ onComplete?: (run: RealtimeRun, err?: Error) => void; + + /** + * Whether to stop the subscription when the run completes + * + * @default true + * + * Set this to false if you are making updates to the run metadata after completion through child runs + */ + stopOnCompletion?: boolean; }; export type UseRealtimeRunInstance = { @@ -90,7 +99,13 @@ export function useRealtimeRun( const abortController = new AbortController(); abortControllerRef.current = abortController; - await processRealtimeRun(runId, apiClient, mutateRun, abortControllerRef); + await processRealtimeRun( + runId, + apiClient, + mutateRun, + abortControllerRef, + typeof options?.stopOnCompletion === "boolean" ? options.stopOnCompletion : true + ); } catch (err) { // Ignore abort errors as they are expected. if ((err as any).name === "AbortError") { @@ -244,6 +259,7 @@ export function useRealtimeRunWithStreams< mutateStreams, streamsRef, abortControllerRef, + typeof options?.stopOnCompletion === "boolean" ? options.stopOnCompletion : true, options?.experimental_throttleInMs ); } catch (err) { @@ -567,10 +583,12 @@ async function processRealtimeRunWithStreams< mutateStreamData: KeyedMutator>, existingDataRef: React.MutableRefObject>, abortControllerRef: React.MutableRefObject, + stopOnCompletion: boolean = true, throttleInMs?: number ) { const subscription = apiClient.subscribeToRun>(runId, { signal: abortControllerRef.current?.signal, + closeOnComplete: stopOnCompletion, }); type StreamUpdate = { @@ -619,10 +637,12 @@ async function processRealtimeRun( runId: string, apiClient: ApiClient, mutateRunData: KeyedMutator>, - abortControllerRef: React.MutableRefObject + abortControllerRef: React.MutableRefObject, + stopOnCompletion: boolean = true ) { const subscription = apiClient.subscribeToRun>(runId, { signal: abortControllerRef.current?.signal, + closeOnComplete: stopOnCompletion, }); for await (const part of subscription) { diff --git a/packages/trigger-sdk/src/v3/metadata.ts b/packages/trigger-sdk/src/v3/metadata.ts index 7f9e10f669..b0c321d81d 100644 --- a/packages/trigger-sdk/src/v3/metadata.ts +++ b/packages/trigger-sdk/src/v3/metadata.ts @@ -1,12 +1,16 @@ import { DeserializedJson } from "@trigger.dev/core"; import { ApiRequestOptions, - flattenAttributes, mergeRequestOptions, runMetadata, + type RunMetadataUpdater, + type AsyncIterableStream, } from "@trigger.dev/core/v3"; import { tracer } from "./tracer.js"; +const parentMetadataUpdater: RunMetadataUpdater = runMetadata.parent; +const rootMetadataUpdater: RunMetadataUpdater = runMetadata.root; + /** * Provides access to run metadata operations. * @namespace @@ -17,19 +21,27 @@ import { tracer } from "./tracer.js"; * @property {Function} save - Update the entire metadata object for the current run. */ -export const metadata = { - current: currentMetadata, - get: getMetadataKey, +const metadataUpdater = { set: setMetadataKey, del: deleteMetadataKey, - save: saveMetadata, - replace: replaceMetadata, - flush: flushMetadata, - stream: stream, append: appendMetadataKey, remove: removeMetadataKey, increment: incrementMetadataKey, decrement: decrementMetadataKey, + flush: flushMetadata, +}; + +export const metadata = { + current: currentMetadata, + get: getMetadataKey, + save: saveMetadata, + replace: replaceMetadata, + stream: stream, + fetchStream: fetchStream, + parent: parentMetadataUpdater, + root: rootMetadataUpdater, + refresh: refreshMetadata, + ...metadataUpdater, }; export type RunMetadata = Record; @@ -74,7 +86,9 @@ function getMetadataKey(key: string): DeserializedJson | undefined { * metadata.set("progress", 0.5); */ function setMetadataKey(key: string, value: DeserializedJson) { - runMetadata.setKey(key, value); + runMetadata.set(key, value); + + return metadataUpdater; } /** @@ -86,7 +100,8 @@ function setMetadataKey(key: string, value: DeserializedJson) { * metadata.del("progress"); */ function deleteMetadataKey(key: string) { - runMetadata.deleteKey(key); + runMetadata.del(key); + return metadataUpdater; } /** @@ -99,14 +114,14 @@ function deleteMetadataKey(key: string) { * @example * metadata.replace({ progress: 0.6, user: { name: "Alice", id: "user_5678" } }); */ -function replaceMetadata(metadata: RunMetadata): void { +function replaceMetadata(metadata: RunMetadata) { runMetadata.update(metadata); } /** * @deprecated Use `metadata.replace()` instead. */ -function saveMetadata(metadata: RunMetadata): void { +function saveMetadata(metadata: RunMetadata) { runMetadata.update(metadata); } @@ -122,7 +137,8 @@ function saveMetadata(metadata: RunMetadata): void { * metadata.increment("score", 10); // Increments score by 10 */ function incrementMetadataKey(key: string, value: number = 1) { - runMetadata.incrementKey(key, value); + runMetadata.increment(key, value); + return metadataUpdater; } /** @@ -137,7 +153,8 @@ function incrementMetadataKey(key: string, value: number = 1) { * metadata.decrement("score", 5); // Decrements score by 5 */ function decrementMetadataKey(key: string, value: number = 1) { - runMetadata.decrementKey(key, value); + runMetadata.decrement(key, value); + return metadataUpdater; } /** @@ -153,7 +170,8 @@ function decrementMetadataKey(key: string, value: number = 1) { * metadata.append("events", { type: "click", timestamp: Date.now() }); */ function appendMetadataKey(key: string, value: DeserializedJson) { - runMetadata.appendKey(key, value); + runMetadata.append(key, value); + return metadataUpdater; } /** @@ -168,7 +186,8 @@ function appendMetadataKey(key: string, value: DeserializedJson) { * metadata.remove("events", { type: "click", timestamp: Date.now() }); */ function removeMetadataKey(key: string, value: DeserializedJson) { - runMetadata.removeFromKey(key, value); + runMetadata.remove(key, value); + return metadataUpdater; } /** @@ -190,6 +209,25 @@ async function flushMetadata(requestOptions?: ApiRequestOptions): Promise await runMetadata.flush($requestOptions); } +/** + * Refreshes metadata from the Trigger.dev instance + * + * @param {ApiRequestOptions} [requestOptions] - Optional request options to customize the API request. + * @returns {Promise} A promise that resolves when the metadata refresh operation is complete. + */ +async function refreshMetadata(requestOptions?: ApiRequestOptions): Promise { + const $requestOptions = mergeRequestOptions( + { + tracer, + name: "metadata.refresh()", + icon: "code-plus", + }, + requestOptions + ); + + await runMetadata.refresh($requestOptions); +} + async function stream( key: string, value: AsyncIterable | ReadableStream, @@ -197,3 +235,7 @@ async function stream( ): Promise> { return runMetadata.stream(key, value, signal); } + +async function fetchStream(key: string, signal?: AbortSignal): Promise> { + return runMetadata.fetchStream(key, signal); +} diff --git a/packages/trigger-sdk/src/v3/runs.ts b/packages/trigger-sdk/src/v3/runs.ts index 3ae9333a4f..16e1384016 100644 --- a/packages/trigger-sdk/src/v3/runs.ts +++ b/packages/trigger-sdk/src/v3/runs.ts @@ -13,6 +13,7 @@ import type { RunSubscription, TaskRunShape, AnyBatchedRunHandle, + AsyncIterableStream, } from "@trigger.dev/core/v3"; import { ApiPromise, @@ -51,6 +52,7 @@ export const runs = { subscribeToRun, subscribeToRunsWithTag, subscribeToBatch: subscribeToRunsInBatch, + fetchStream, }; export type ListRunsItem = ListRunResponseItem; @@ -335,6 +337,17 @@ async function poll( ); } +export type SubscribeToRunOptions = { + /** + * Whether to close the subscription when the run completes + * + * @default true + * + * Set this to false if you are making updates to the run metadata after completion through child runs + */ + stopOnCompletion?: boolean; +}; + /** * Subscribes to real-time updates for a specific run. * @@ -345,6 +358,8 @@ async function poll( * * @template TRunId - The type parameter extending AnyRunHandle, AnyTask, or string * @param {RunId} runId - The ID of the run to subscribe to. Can be a string ID, RunHandle, or Task + * @param {SubscribeToRunOptions} [options] - Optional configuration for the subscription + * @param {boolean} [options.stopOnCompletion=true] - Whether to close the subscription when the run completes * @returns {RunSubscription>} An async iterator that yields updated run objects * * @example @@ -365,13 +380,17 @@ async function poll( * ``` */ function subscribeToRun( - runId: RunId + runId: RunId, + options?: SubscribeToRunOptions ): RunSubscription> { const $runId = typeof runId === "string" ? runId : runId.id; const apiClient = apiClientManager.clientOrThrow(); - return apiClient.subscribeToRun($runId); + return apiClient.subscribeToRun($runId, { + closeOnComplete: + typeof options?.stopOnCompletion === "boolean" ? options.stopOnCompletion : true, + }); } /** @@ -448,3 +467,12 @@ function subscribeToRunsInBatch( return apiClient.subscribeToBatch>(batchId); } + +/** + * Fetches a stream of data from a run's stream key. + */ +async function fetchStream(runId: string, streamKey: string): Promise> { + const apiClient = apiClientManager.clientOrThrow(); + + return await apiClient.fetchStream(runId, streamKey); +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 17bc72bddc..e547256a7f 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -447,6 +447,9 @@ importers: dotenv: specifier: ^16.4.5 version: 16.4.5 + effect: + specifier: ^3.11.7 + version: 3.11.7 emails: specifier: workspace:* version: link:../../internal-packages/emails @@ -1607,12 +1610,18 @@ importers: '@fal-ai/serverless-client': specifier: ^0.15.0 version: 0.15.0 + '@fast-csv/parse': + specifier: ^5.0.2 + version: 5.0.2 '@radix-ui/react-dialog': specifier: ^1.0.3 version: 1.0.4(@types/react-dom@18.2.7)(@types/react@18.3.1)(react-dom@18.2.0)(react@18.3.1) '@radix-ui/react-icons': specifier: ^1.3.0 version: 1.3.0(react@18.3.1) + '@radix-ui/react-progress': + specifier: ^1.1.1 + version: 1.1.1(@types/react-dom@18.2.7)(@types/react@18.3.1)(react-dom@18.2.0)(react@18.3.1) '@radix-ui/react-scroll-area': specifier: ^1.2.0 version: 1.2.0(@types/react-dom@18.2.7)(@types/react@18.3.1)(react-dom@18.2.0)(react@18.3.1) @@ -1703,7 +1712,7 @@ importers: dependencies: '@effect/schema': specifier: ^0.75.5 - version: 0.75.5(effect@3.9.2) + version: 0.75.5(effect@3.11.7) '@infisical/sdk': specifier: ^2.1.9 version: 2.3.5 @@ -5103,12 +5112,12 @@ packages: fast-check: 3.22.0 dev: false - /@effect/schema@0.75.5(effect@3.9.2): + /@effect/schema@0.75.5(effect@3.11.7): resolution: {integrity: sha512-TQInulTVCuF+9EIbJpyLP6dvxbQJMphrnRqgexm/Ze39rSjfhJuufF7XvU3SxTgg3HnL7B/kpORTJbHhlE6thw==} peerDependencies: effect: ^3.9.2 dependencies: - effect: 3.9.2 + effect: 3.11.7 fast-check: 3.22.0 dev: false @@ -6647,6 +6656,17 @@ packages: robot3: 0.4.1 dev: false + /@fast-csv/parse@5.0.2: + resolution: {integrity: sha512-gMu1Btmm99TP+wc0tZnlH30E/F1Gw1Tah3oMDBHNPe9W8S68ixVHjt89Wg5lh7d9RuQMtwN+sGl5kxR891+fzw==} + dependencies: + lodash.escaperegexp: 4.1.2 + lodash.groupby: 4.6.0 + lodash.isfunction: 3.0.9 + lodash.isnil: 4.0.0 + lodash.isundefined: 3.0.1 + lodash.uniq: 4.5.0 + dev: false + /@fastify/busboy@2.0.0: resolution: {integrity: sha512-JUFJad5lv7jxj926GPgymrWQxxjPYuJNiNjNMzqT+HiuP6Vl3dk5xzG+8sTX96np0ZAluvaMzPsjhHZ5rNuNQQ==} engines: {node: '>=14'} @@ -9126,6 +9146,19 @@ packages: react: 18.3.1 dev: false + /@radix-ui/react-compose-refs@1.1.1(@types/react@18.3.1)(react@18.3.1): + resolution: {integrity: sha512-Y9VzoRDSJtgFMUCoiZBDVo084VQ5hfpXxVE+NgkdNsjiDBByiImMZKKhxMwCbdHvhlENG6a833CbFkOQvTricw==} + peerDependencies: + '@types/react': '*' + react: ^16.8 || ^17.0 || ^18.0 || ^19.0 || ^19.0.0-rc + peerDependenciesMeta: + '@types/react': + optional: true + dependencies: + '@types/react': 18.3.1 + react: 18.3.1 + dev: false + /@radix-ui/react-context@1.0.0(react@18.2.0): resolution: {integrity: sha512-1pVM9RfOQ+n/N5PJK33kRSKsr1glNxomxONs5c49MliinBY6Yw2Q995qfBUUo0/Mbg05B/sGA0gkgPI7kmSHBg==} peerDependencies: @@ -10018,6 +10051,47 @@ packages: react-dom: 18.2.0(react@18.3.1) dev: false + /@radix-ui/react-primitive@2.0.1(@types/react-dom@18.2.7)(@types/react@18.3.1)(react-dom@18.2.0)(react@18.3.1): + resolution: {integrity: sha512-sHCWTtxwNn3L3fH8qAfnF3WbUZycW93SM1j3NFDzXBiz8D6F5UTTy8G1+WFEaiCdvCVRJWj6N2R4Xq6HdiHmDg==} + peerDependencies: + '@types/react': '*' + '@types/react-dom': '*' + react: ^16.8 || ^17.0 || ^18.0 || ^19.0 || ^19.0.0-rc + react-dom: ^16.8 || ^17.0 || ^18.0 || ^19.0 || ^19.0.0-rc + peerDependenciesMeta: + '@types/react': + optional: true + '@types/react-dom': + optional: true + dependencies: + '@radix-ui/react-slot': 1.1.1(@types/react@18.3.1)(react@18.3.1) + '@types/react': 18.3.1 + '@types/react-dom': 18.2.7 + react: 18.3.1 + react-dom: 18.2.0(react@18.3.1) + dev: false + + /@radix-ui/react-progress@1.1.1(@types/react-dom@18.2.7)(@types/react@18.3.1)(react-dom@18.2.0)(react@18.3.1): + resolution: {integrity: sha512-6diOawA84f/eMxFHcWut0aE1C2kyE9dOyCTQOMRR2C/qPiXz/X0SaiA/RLbapQaXUCmy0/hLMf9meSccD1N0pA==} + peerDependencies: + '@types/react': '*' + '@types/react-dom': '*' + react: ^16.8 || ^17.0 || ^18.0 || ^19.0 || ^19.0.0-rc + react-dom: ^16.8 || ^17.0 || ^18.0 || ^19.0 || ^19.0.0-rc + peerDependenciesMeta: + '@types/react': + optional: true + '@types/react-dom': + optional: true + dependencies: + '@radix-ui/react-context': 1.1.1(@types/react@18.3.1)(react@18.3.1) + '@radix-ui/react-primitive': 2.0.1(@types/react-dom@18.2.7)(@types/react@18.3.1)(react-dom@18.2.0)(react@18.3.1) + '@types/react': 18.3.1 + '@types/react-dom': 18.2.7 + react: 18.3.1 + react-dom: 18.2.0(react@18.3.1) + dev: false + /@radix-ui/react-radio-group@1.1.3(@types/react-dom@18.2.7)(@types/react@18.2.69)(react-dom@18.2.0)(react@18.2.0): resolution: {integrity: sha512-x+yELayyefNeKeTx4fjK6j99Fs6c4qKm3aY38G3swQVTN6xMpsrbigC0uHs2L//g8q4qR7qOcww8430jJmi2ag==} peerDependencies: @@ -10318,6 +10392,20 @@ packages: react: 18.3.1 dev: false + /@radix-ui/react-slot@1.1.1(@types/react@18.3.1)(react@18.3.1): + resolution: {integrity: sha512-RApLLOcINYJA+dMVbOju7MYv1Mb2EBp2nH4HdDzXTSyaR5optlm6Otrz1euW3HbdOR8UmmFK06TD+A9frYWv+g==} + peerDependencies: + '@types/react': '*' + react: ^16.8 || ^17.0 || ^18.0 || ^19.0 || ^19.0.0-rc + peerDependenciesMeta: + '@types/react': + optional: true + dependencies: + '@radix-ui/react-compose-refs': 1.1.1(@types/react@18.3.1)(react@18.3.1) + '@types/react': 18.3.1 + react: 18.3.1 + dev: false + /@radix-ui/react-switch@1.0.3(@types/react-dom@18.2.7)(@types/react@18.2.69)(react-dom@18.2.0)(react@18.2.0): resolution: {integrity: sha512-mxm87F88HyHztsI7N+ZUmEoARGkC22YVW5CaC+Byc+HRpuvCrOBPTAnXgf+tZ/7i0Sg/eOePGdMhUKhPaQEqow==} peerDependencies: @@ -19511,12 +19599,14 @@ packages: /ee-first@1.1.1: resolution: {integrity: sha512-WMwm9LhRUo+WUaRN+vRuETqG89IgZphVSNkdFgeb6sS/E4OrDIN7t48CAewSHXc6C8lefD8KKfr5vY61brQlow==} - /effect@3.7.2: - resolution: {integrity: sha512-pV7l1+LSZFvVObj4zuy4nYiBaC7qZOfrKV6s/Ef4p3KueiQwZFgamazklwyZ+x7Nyj2etRDFvHE/xkThTfQD1w==} + /effect@3.11.7: + resolution: {integrity: sha512-laj+TCxWGn0eOv6jNmS9vavMO01Z4vvRr7v5airaOUfE7Zr5PrHiECpiI5HRvOewxa1im/4EcOvRodOZ1S2Y7Q==} + dependencies: + fast-check: 3.22.0 dev: false - /effect@3.9.2: - resolution: {integrity: sha512-1sx/v1HTWHTodXfzWxAFg+SCF+ACgpJVruaAMIh/NmDVvrUsf0x9PzpXvkgJUbQ1fMdmKYK//FqxeHSQ+Zxv/Q==} + /effect@3.7.2: + resolution: {integrity: sha512-pV7l1+LSZFvVObj4zuy4nYiBaC7qZOfrKV6s/Ef4p3KueiQwZFgamazklwyZ+x7Nyj2etRDFvHE/xkThTfQD1w==} dev: false /electron-to-chromium@1.4.433: @@ -23165,10 +23255,18 @@ packages: resolution: {integrity: sha512-dS2j+W26TQ7taQBGN8Lbbq04ssV3emRw4NY58WErlTO29pIqS0HmoT5aJ9+TUQ1N3G+JOZSji4eugsWwGp9yPA==} dev: true + /lodash.escaperegexp@4.1.2: + resolution: {integrity: sha512-TM9YBvyC84ZxE3rgfefxUWiQKLilstD6k7PTGt6wfbtXF8ixIJLOL3VYyV/z+ZiPLsVxAsKAFVwWlWeb2Y8Yyw==} + dev: false + /lodash.flatten@4.4.0: resolution: {integrity: sha512-C5N2Z3DgnnKr0LOpv/hKCgKdb7ZZwafIrsesve6lmzvZIRZRGaZ/l6Q8+2W7NaT+ZwO3fFlSCzCzrDCFdJfZ4g==} dev: true + /lodash.groupby@4.6.0: + resolution: {integrity: sha512-5dcWxm23+VAoz+awKmBaiBvzox8+RqMgFhi7UvX9DHZr2HdxHXM/Wrf8cfKpsW37RNrvtPn6hSwNqurSILbmJw==} + dev: false + /lodash.isarguments@3.1.0: resolution: {integrity: sha512-chi4NHZlZqZD18a0imDHnZPrDeBbTtVN7GXMwuGdRH9qotxAjYs3aVLKc7zNOG9eddR5Ksd8rvFEBc9SsggPpg==} dev: false @@ -23177,10 +23275,22 @@ packages: resolution: {integrity: sha512-pDo3lu8Jhfjqls6GkMgpahsF9kCyayhgykjyLMNFTKWrpVdAQtYyB4muAMWozBB4ig/dtWAmsMxLEI8wuz+DYQ==} dev: false + /lodash.isfunction@3.0.9: + resolution: {integrity: sha512-AirXNj15uRIMMPihnkInB4i3NHeb4iBtNg9WRWuK2o31S+ePwwNmDPaTL3o7dTJ+VXNZim7rFs4rxN4YU1oUJw==} + dev: false + + /lodash.isnil@4.0.0: + resolution: {integrity: sha512-up2Mzq3545mwVnMhTDMdfoG1OurpA/s5t88JmQX809eH3C8491iu2sfKhTfhQtKY78oPNhiaHJUpT/dUDAAtng==} + dev: false + /lodash.isplainobject@4.0.6: resolution: {integrity: sha512-oSXzaWypCMHkPC3NvBEaPHf0KsA5mvPrOPgQWDsbg8n7orZ290M0BmC/jgRZ4vcJ6DTAhjrsSYgdsW/F+MFOBA==} dev: true + /lodash.isundefined@3.0.1: + resolution: {integrity: sha512-MXB1is3s899/cD8jheYYE2V9qTHwKvt+npCwpD+1Sxm3Q3cECXCiYHjeHWXNwr6Q0SOBPrYUDxendrO6goVTEA==} + dev: false + /lodash.merge@4.6.2: resolution: {integrity: sha512-0KpjqXRVvrYyCsX1swR/XTK0va6VQkQM6MNo7PqW77ByjAhoARA8EfrP1N4+KlKj8YS0ZUCtRT/YUuhyYDujIQ==} @@ -23196,6 +23306,10 @@ packages: resolution: {integrity: sha512-c4pB2CdGrGdjMKYLA+XiRDO7Y0PRQbm/Gzg8qMj+QH+pFVAoTp5sBpO0odL3FjoPCGjK96p6qsP+yQoiLoOBcw==} dev: true + /lodash.uniq@4.5.0: + resolution: {integrity: sha512-xfBaXQd9ryd9dlSDvnvI0lvxfLJlYAZzXomUYzLKtUeOQvOP5piqAWuGtrhWeqaXK9hhoM/iyJc5AV+XfsX3HQ==} + dev: false + /lodash@4.17.21: resolution: {integrity: sha512-v2kDEe57lecTulaDIuNTPy3Ry4gLGJ6Z1O3vE1krgXZNrsQ+LFTGHVxVjcXPs17LhbZVGedAJv8XZ1tvj5FvSg==} diff --git a/references/nextjs-realtime/package.json b/references/nextjs-realtime/package.json index 8985fc2a32..9a00ea0b51 100644 --- a/references/nextjs-realtime/package.json +++ b/references/nextjs-realtime/package.json @@ -13,8 +13,10 @@ "dependencies": { "@ai-sdk/openai": "^1.0.1", "@fal-ai/serverless-client": "^0.15.0", + "@fast-csv/parse": "^5.0.2", "@radix-ui/react-dialog": "^1.0.3", "@radix-ui/react-icons": "^1.3.0", + "@radix-ui/react-progress": "^1.1.1", "@radix-ui/react-scroll-area": "^1.2.0", "@radix-ui/react-slot": "^1.1.0", "@radix-ui/react-tabs": "^1.0.3", diff --git a/references/nextjs-realtime/src/app/api/uploadthing/core.ts b/references/nextjs-realtime/src/app/api/uploadthing/core.ts index 7f09693f8b..8aec3e6506 100644 --- a/references/nextjs-realtime/src/app/api/uploadthing/core.ts +++ b/references/nextjs-realtime/src/app/api/uploadthing/core.ts @@ -2,6 +2,7 @@ import { randomUUID } from "crypto"; import { createUploadthing, type FileRouter } from "uploadthing/next"; import { UploadThingError } from "uploadthing/server"; import type { handleUpload } from "@/trigger/images"; +import type { handleCSVUpload } from "@/trigger/csv"; import { auth, tasks } from "@trigger.dev/sdk/v3"; const f = createUploadthing(); @@ -46,6 +47,13 @@ export const ourFileRouter = { // !!! Whatever is returned here is sent to the clientside `onClientUploadComplete` callback return { uploadedBy: metadata.userId, publicAccessToken, fileId: file.key }; }), + csvUploader: f({ blob: { maxFileSize: "4MB" } }).onUploadComplete(async ({ metadata, file }) => { + console.log("file", file); + + const handle = await tasks.trigger("handle-csv-upload", file); + + return handle; + }), } satisfies FileRouter; export type OurFileRouter = typeof ourFileRouter; diff --git a/references/nextjs-realtime/src/app/csv/[id]/RealtimeCSVRun.tsx b/references/nextjs-realtime/src/app/csv/[id]/RealtimeCSVRun.tsx new file mode 100644 index 0000000000..8e70a51c1f --- /dev/null +++ b/references/nextjs-realtime/src/app/csv/[id]/RealtimeCSVRun.tsx @@ -0,0 +1,176 @@ +"use client"; + +import { Badge } from "@/components/ui/badge"; +import { Card, CardContent, CardDescription, CardHeader, CardTitle } from "@/components/ui/card"; +import { Progress } from "@/components/ui/progress"; +import { handleCSVUpload } from "@/trigger/csv"; +import { CSVUploadMetadataSchema } from "@/trigger/schemas"; +import { useRealtimeRun } from "@trigger.dev/react-hooks"; +import { Terminal } from "lucide-react"; + +type UseCSVUploadInstance = { + status: "loading" | "queued" | "fetching" | "parsing" | "processing" | "complete" | "error"; + filename?: string; + progress: number; + message: string; + totalRows?: number; + inProgressRows?: number; + processedRows?: number; +}; + +function useCSVUpload(runId: string, accessToken: string): UseCSVUploadInstance { + const instance = useRealtimeRun(runId, { + accessToken, + baseURL: process.env.NEXT_PUBLIC_TRIGGER_API_URL, + onComplete: (run) => { + console.log("CSV Upload complete", run); + }, + stopOnCompletion: false, + }); + + if (!instance.run) { + return { status: "loading", progress: 0, message: "Loading..." }; + } + + console.log("CSV Upload", instance.run); + + if (!instance.run.metadata) { + return { + status: "queued", + progress: 0.05, + message: "Queued...", + filename: instance.run.payload.name, + }; + } + + const parsedMetadata = CSVUploadMetadataSchema.safeParse(instance.run.metadata); + + if (!parsedMetadata.success) { + return { + status: "error", + progress: 0, + message: "Failed to parse metadata", + filename: instance.run.payload.name, + }; + } + + switch (parsedMetadata.data.status) { + case "fetching": { + return { + status: "fetching", + progress: 0.1, + message: "Fetching CSV file...", + filename: instance.run.payload.name, + }; + } + case "parsing": { + return { + status: "parsing", + progress: 0.2, + message: "Parsing CSV file...", + filename: instance.run.payload.name, + }; + } + case "processing": { + // progress will be some number between 0.3 and 0.95 + // depending on the totalRows and processedRows + + const progress = + typeof parsedMetadata.data.processedRows === "number" && + typeof parsedMetadata.data.totalRows === "number" + ? 0.3 + (parsedMetadata.data.processedRows / parsedMetadata.data.totalRows) * 0.65 + : 0.3; + + return { + status: "processing", + progress: progress, + message: "Processing CSV file...", + totalRows: parsedMetadata.data.totalRows, + inProgressRows: parsedMetadata.data.inProgressRows, + processedRows: parsedMetadata.data.processedRows, + filename: instance.run.payload.name, + }; + } + case "complete": { + return { + status: "complete", + progress: 1, + message: "CSV processing complete", + totalRows: parsedMetadata.data.totalRows, + inProgressRows: parsedMetadata.data.inProgressRows, + processedRows: parsedMetadata.data.processedRows, + filename: instance.run.payload.name, + }; + } + } +} + +export default function RealtimeCSVRun({ + runId, + accessToken, +}: { + runId: string; + accessToken: string; +}) { + const csvRun = useCSVUpload(runId, accessToken); + + const progress = Math.round(csvRun.progress * 100); + const isComplete = csvRun.status === "complete"; + + return ( +
+
+
+ +

CSV Email Validation

+
+ + + +
+
+ {csvRun.filename ?? "n/a"} + + {csvRun.totalRows ? `Processing ${csvRun.totalRows} rows` : "Processing CSV file"} + +
+ + {isComplete ? "Completed" : "Running"} + +
+
+ +
+
+ Overall Progress + {progress}% +
+ +
+ +
+ + +
+ {typeof csvRun.processedRows === "number" ? csvRun.processedRows : "N/A"} +
+
Emails Processed
+
+
+ + +
+ {typeof csvRun.totalRows === "number" + ? csvRun.totalRows - (csvRun.processedRows ?? 0) + : "N/A"} +
+
Remaining
+
+
+
+
+
+
+
+ ); +} diff --git a/references/nextjs-realtime/src/app/csv/[id]/page.tsx b/references/nextjs-realtime/src/app/csv/[id]/page.tsx new file mode 100644 index 0000000000..0d4c22f347 --- /dev/null +++ b/references/nextjs-realtime/src/app/csv/[id]/page.tsx @@ -0,0 +1,16 @@ +import { notFound } from "next/navigation"; +import RealtimeCSVRun from "./RealtimeCSVRun"; + +export default function CSVProcessor({ + params, + searchParams, +}: { + params: { id: string }; + searchParams: { [key: string]: string | string[] | undefined }; +}) { + if (typeof searchParams.publicAccessToken !== "string") { + notFound(); + } + + return ; +} diff --git a/references/nextjs-realtime/src/app/csv/page.tsx b/references/nextjs-realtime/src/app/csv/page.tsx new file mode 100644 index 0000000000..00df941594 --- /dev/null +++ b/references/nextjs-realtime/src/app/csv/page.tsx @@ -0,0 +1,14 @@ +import { CSVUploadDropzone } from "@/components/ImageUploadButton"; + +export default async function CSVPage() { + return ( +
+
+

+ Trigger.dev Realtime + UploadThing + CSV Import +

+ +
+
+ ); +} diff --git a/references/nextjs-realtime/src/components/ImageUploadButton.tsx b/references/nextjs-realtime/src/components/ImageUploadButton.tsx index df06858e0c..ff435c633c 100644 --- a/references/nextjs-realtime/src/components/ImageUploadButton.tsx +++ b/references/nextjs-realtime/src/components/ImageUploadButton.tsx @@ -51,3 +51,28 @@ export function ImageUploadDropzone() { /> ); } + +export function CSVUploadDropzone() { + const router = useRouter(); + + return ( + { + // Do something with the response + console.log("Files: ", res); + + const firstFile = res[0]; + + router.push( + `/csv/${firstFile.serverData.id}?publicAccessToken=${firstFile.serverData.publicAccessToken}` + ); + }} + onUploadError={(error: Error) => { + // Do something with the error. + console.error(`ERROR! ${error.message}`); + }} + className="border-gray-600" + /> + ); +} diff --git a/references/nextjs-realtime/src/components/ui/progress.tsx b/references/nextjs-realtime/src/components/ui/progress.tsx new file mode 100644 index 0000000000..4fc3b473e6 --- /dev/null +++ b/references/nextjs-realtime/src/components/ui/progress.tsx @@ -0,0 +1,28 @@ +"use client" + +import * as React from "react" +import * as ProgressPrimitive from "@radix-ui/react-progress" + +import { cn } from "@/lib/utils" + +const Progress = React.forwardRef< + React.ElementRef, + React.ComponentPropsWithoutRef +>(({ className, value, ...props }, ref) => ( + + + +)) +Progress.displayName = ProgressPrimitive.Root.displayName + +export { Progress } diff --git a/references/nextjs-realtime/src/trigger/csv.ts b/references/nextjs-realtime/src/trigger/csv.ts new file mode 100644 index 0000000000..3acdddd4d4 --- /dev/null +++ b/references/nextjs-realtime/src/trigger/csv.ts @@ -0,0 +1,106 @@ +import { UploadedFileData } from "@/utils/schemas"; +import { parse } from "@fast-csv/parse"; +import { batch, logger, metadata, schemaTask } from "@trigger.dev/sdk/v3"; +import { setTimeout } from "timers/promises"; +import { CSVRow } from "./schemas"; + +export const handleCSVUpload = schemaTask({ + id: "handle-csv-upload", + schema: UploadedFileData, + run: async (file, { ctx }) => { + logger.info("Handling uploaded file", { file }); + + metadata.set("status", "fetching"); + + const response = await fetch(file.url); + + if (!response.ok) { + throw new Error(`Failed to fetch file: ${response.statusText}`); + } + + const body = await response.text(); + + metadata.set("status", "parsing"); + + const rows = await new Promise>((resolve, reject) => { + const rows: Array = []; + + const parser = parse({ headers: true }); + + parser.on("data", (row) => { + logger.info("Row", { row }); + + const parsedRow = CSVRow.safeParse(row); + + if (parsedRow.success) { + rows.push(parsedRow.data); + } else { + logger.warn("Failed to parse row", { row, errors: parsedRow.error }); + } + }); + + parser.on("end", () => { + logger.info("CSV parsing complete"); + + resolve(rows); + }); + + parser.on("error", reject); + + parser.write(body); + parser.end(); + }); + + metadata.set("status", "processing").set("totalRows", rows.length); + + const results = await batch.triggerAndWait( + rows.map((row) => ({ id: "handle-csv-row", payload: row })) + ); + + metadata.set("status", "complete"); + + const successfulRows = results.runs.filter((r) => r.ok); + const failedRows = results.runs.filter((r) => !r.ok); + + const firstSuccessfulRow = successfulRows[0]; + + if (firstSuccessfulRow) { + const stream = await metadata.fetchStream(firstSuccessfulRow.id); + + for await (const value of stream) { + logger.info(`Stream value from ${firstSuccessfulRow.id}`, { value }); + } + } + + return { + file, + rows, + rowCount: rows.length, + successCount: successfulRows.length, + failedCount: failedRows.length, + }; + }, +}); + +export const handleCSVRow = schemaTask({ + id: "handle-csv-row", + schema: CSVRow, + run: async (row, { ctx }) => { + logger.info("Handling CSV row", { row }); + + // Simulate processing time + await setTimeout(200 + Math.random() * 1000); // 200ms - 1.2s + + metadata.parent.increment("processedRows", 1).append("rowRuns", ctx.run.id); + + await metadata.parent.stream( + ctx.run.id, + (async function* () { + yield "hello"; + yield "world"; + })() + ); + + return row; + }, +}); diff --git a/references/nextjs-realtime/src/trigger/schemas.ts b/references/nextjs-realtime/src/trigger/schemas.ts new file mode 100644 index 0000000000..b99eb8640d --- /dev/null +++ b/references/nextjs-realtime/src/trigger/schemas.ts @@ -0,0 +1,35 @@ +import { z } from "zod"; + +export const CSVRow = z.object({ + Date: z.string(), + Impressions: z.coerce.number(), + Likes: z.coerce.number(), + Engagements: z.coerce.number(), + Bookmarks: z.coerce.number(), + Shares: z.coerce.number(), + "New follows": z.coerce.number(), + Unfollows: z.coerce.number(), + Replies: z.coerce.number(), + Reposts: z.coerce.number(), + "Profile visits": z.coerce.number(), + "Create Post": z.coerce.number(), + "Video views": z.coerce.number(), + "Media views": z.coerce.number(), +}); + +export type CSVRow = z.infer; + +// Status schema for progress updates +export const CSVStatus = z.enum(["fetching", "parsing", "processing", "complete"]); + +export type CSVStatus = z.infer; + +// The full metadata schema that encompasses all possible metadata fields +export const CSVUploadMetadataSchema = z.object({ + status: CSVStatus, + totalRows: z.number().int().nonnegative().optional(), + inProgressRows: z.number().int().nonnegative().optional(), + processedRows: z.number().int().nonnegative().optional(), +}); + +export type CSVUploadMetadata = z.infer; diff --git a/references/v3-catalog/src/trigger/runMetadata.ts b/references/v3-catalog/src/trigger/runMetadata.ts index 4a12c6009c..40ccbe50c5 100644 --- a/references/v3-catalog/src/trigger/runMetadata.ts +++ b/references/v3-catalog/src/trigger/runMetadata.ts @@ -3,6 +3,8 @@ import { logger, task, metadata, AbortTaskRunError } from "@trigger.dev/sdk/v3"; export const runMetadataTask = task({ id: "run-metadata-task", run: async (payload: any) => { + metadata.set("numberOfChildren", 2); + await runMetadataChildTask.triggerAndWait(payload, { metadata: { hello: "world", @@ -19,6 +21,9 @@ export const runMetadataTask = task({ export const runMetadataChildTask = task({ id: "run-metadata-child-task", run: async (payload: any, { ctx }) => { + metadata.parent.increment("numberOfChildren", 1); + metadata.root.increment("numberOfChildren", 1); + logger.info("metadata", { metadata: metadata.current() }); metadata.set("child", "task"); @@ -58,7 +63,7 @@ export const runMetadataChildTask = task({ export const runMetadataChildTask2 = task({ id: "run-metadata-child-task-2", run: async (payload: any, { ctx }) => { - throw new AbortTaskRunError("aborting"); + metadata.root.increment("numberOfChildren", 1); }, });