diff --git a/packages/core/src/v3/apiClient/index.ts b/packages/core/src/v3/apiClient/index.ts index ff9ea3bf76..53892c36fd 100644 --- a/packages/core/src/v3/apiClient/index.ts +++ b/packages/core/src/v3/apiClient/index.ts @@ -50,6 +50,7 @@ import { RunSubscription, TaskRunShape, runShapeStream, + SSEStreamSubscriptionFactory, } from "./runStream.js"; import { CreateEnvironmentVariableParams, @@ -681,6 +682,23 @@ export class ApiClient { }); } + async fetchStream( + runId: string, + streamKey: string, + options?: { signal?: AbortSignal; baseUrl?: string } + ): Promise> { + const streamFactory = new SSEStreamSubscriptionFactory(options?.baseUrl ?? this.baseUrl, { + headers: this.getHeaders(), + signal: options?.signal, + }); + + const subscription = streamFactory.createSubscription(runId, streamKey); + + const stream = await subscription.subscribe(); + + return stream as AsyncIterableStream; + } + async generateJWTClaims(requestOptions?: ZodFetchOptions): Promise> { return zodfetch( z.record(z.any()), diff --git a/packages/core/src/v3/apiClient/runStream.ts b/packages/core/src/v3/apiClient/runStream.ts index 3291923177..ed68b91e35 100644 --- a/packages/core/src/v3/apiClient/runStream.ts +++ b/packages/core/src/v3/apiClient/runStream.ts @@ -89,15 +89,7 @@ export function runShapeStream( ): RunSubscription { const abortController = new AbortController(); - const version1 = new SSEStreamSubscriptionFactory( - getEnvVar("TRIGGER_STREAM_URL", getEnvVar("TRIGGER_API_URL")) ?? "https://api.trigger.dev", - { - headers: options?.headers, - signal: abortController.signal, - } - ); - - const version2 = new ElectricStreamSubscriptionFactory( + const streamFactory = new SSEStreamSubscriptionFactory( getEnvVar("TRIGGER_STREAM_URL", getEnvVar("TRIGGER_API_URL")) ?? "https://api.trigger.dev", { headers: options?.headers, @@ -124,7 +116,7 @@ export function runShapeStream( const $options: RunSubscriptionOptions = { runShapeStream: runStreamInstance.stream, stopRunShapeStream: () => runStreamInstance.stop(30 * 1000), - streamFactory: new VersionedStreamSubscriptionFactory(version1, version2), + streamFactory: streamFactory, abortController, ...options, }; @@ -138,12 +130,7 @@ export interface StreamSubscription { } export interface StreamSubscriptionFactory { - createSubscription( - metadata: Record, - runId: string, - streamKey: string, - baseUrl?: string - ): StreamSubscription; + createSubscription(runId: string, streamKey: string, baseUrl?: string): StreamSubscription; } // Real implementation for production @@ -194,12 +181,7 @@ export class SSEStreamSubscriptionFactory implements StreamSubscriptionFactory { private options: { headers?: Record; signal?: AbortSignal } ) {} - createSubscription( - metadata: Record, - runId: string, - streamKey: string, - baseUrl?: string - ): StreamSubscription { + createSubscription(runId: string, streamKey: string, baseUrl?: string): StreamSubscription { if (!runId || !streamKey) { throw new Error("runId and streamKey are required"); } @@ -238,63 +220,6 @@ export class ElectricStreamSubscription implements StreamSubscription { } } -export class ElectricStreamSubscriptionFactory implements StreamSubscriptionFactory { - constructor( - private baseUrl: string, - private options: { headers?: Record; signal?: AbortSignal } - ) {} - - createSubscription( - metadata: Record, - runId: string, - streamKey: string, - baseUrl?: string - ): StreamSubscription { - if (!runId || !streamKey) { - throw new Error("runId and streamKey are required"); - } - - return new ElectricStreamSubscription( - `${baseUrl ?? this.baseUrl}/realtime/v2/streams/${runId}/${streamKey}`, - this.options - ); - } -} - -export class VersionedStreamSubscriptionFactory implements StreamSubscriptionFactory { - constructor( - private version1: StreamSubscriptionFactory, - private version2: StreamSubscriptionFactory - ) {} - - createSubscription( - metadata: Record, - runId: string, - streamKey: string, - baseUrl?: string - ): StreamSubscription { - if (!runId || !streamKey) { - throw new Error("runId and streamKey are required"); - } - - const version = - typeof metadata.$$streamsVersion === "string" ? metadata.$$streamsVersion : "v1"; - - const $baseUrl = - typeof metadata.$$streamsBaseUrl === "string" ? metadata.$$streamsBaseUrl : baseUrl; - - if (version === "v1") { - return this.version1.createSubscription(metadata, runId, streamKey, $baseUrl); - } - - if (version === "v2") { - return this.version2.createSubscription(metadata, runId, streamKey, $baseUrl); - } - - throw new Error(`Unknown stream version: ${version}`); - } -} - export interface RunShapeProvider { onShape(callback: (shape: SubscribeRunRawShape) => Promise): Promise<() => void>; } @@ -385,7 +310,6 @@ export class RunSubscription { activeStreams.add(streamKey); const subscription = this.options.streamFactory.createSubscription( - run.metadata, run.id, streamKey, this.options.client?.baseUrl diff --git a/packages/core/src/v3/runMetadata/manager.ts b/packages/core/src/v3/runMetadata/manager.ts index 6f561c800a..f847d644c4 100644 --- a/packages/core/src/v3/runMetadata/manager.ts +++ b/packages/core/src/v3/runMetadata/manager.ts @@ -1,14 +1,12 @@ -import { JSONHeroPath } from "@jsonhero/path"; import { dequal } from "dequal/lite"; import { DeserializedJson } from "../../schemas/json.js"; -import { ApiRequestOptions } from "../zodfetch.js"; -import { RunMetadataManager, RunMetadataUpdater } from "./types.js"; -import { MetadataStream } from "./metadataStream.js"; import { ApiClient } from "../apiClient/index.js"; +import { AsyncIterableStream } from "../apiClient/stream.js"; import { FlushedRunMetadata, RunMetadataChangeOperation } from "../schemas/common.js"; +import { ApiRequestOptions } from "../zodfetch.js"; +import { MetadataStream } from "./metadataStream.js"; import { applyMetadataOperations } from "./operations.js"; -import { SSEStreamSubscriptionFactory } from "../apiClient/runStream.js"; -import { AsyncIterableStream } from "../apiClient/stream.js"; +import { RunMetadataManager, RunMetadataUpdater } from "./types.js"; const MAXIMUM_ACTIVE_STREAMS = 5; const MAXIMUM_TOTAL_STREAMS = 10; @@ -208,14 +206,7 @@ export class StandardMetadataManager implements RunMetadataManager { const $baseUrl = typeof baseUrl === "string" ? baseUrl : this.streamsBaseUrl; - const streamFactory = new SSEStreamSubscriptionFactory($baseUrl, { - headers: this.apiClient.getHeaders(), - signal, - }); - - const subscription = streamFactory.createSubscription(this.store ?? {}, this.runId, key); - - return (await subscription.subscribe()) as AsyncIterableStream; + return this.apiClient.fetchStream(this.runId, key, { baseUrl: $baseUrl, signal }); } private async doStream( diff --git a/packages/trigger-sdk/src/v3/runs.ts b/packages/trigger-sdk/src/v3/runs.ts index c527415e21..16e1384016 100644 --- a/packages/trigger-sdk/src/v3/runs.ts +++ b/packages/trigger-sdk/src/v3/runs.ts @@ -13,6 +13,7 @@ import type { RunSubscription, TaskRunShape, AnyBatchedRunHandle, + AsyncIterableStream, } from "@trigger.dev/core/v3"; import { ApiPromise, @@ -51,6 +52,7 @@ export const runs = { subscribeToRun, subscribeToRunsWithTag, subscribeToBatch: subscribeToRunsInBatch, + fetchStream, }; export type ListRunsItem = ListRunResponseItem; @@ -465,3 +467,12 @@ function subscribeToRunsInBatch( return apiClient.subscribeToBatch>(batchId); } + +/** + * Fetches a stream of data from a run's stream key. + */ +async function fetchStream(runId: string, streamKey: string): Promise> { + const apiClient = apiClientManager.clientOrThrow(); + + return await apiClient.fetchStream(runId, streamKey); +}