-
-
Notifications
You must be signed in to change notification settings - Fork 608
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add ability to update parent and root run metadata from children #1563
base: main
Are you sure you want to change the base?
Conversation
🦋 Changeset detectedLatest commit: 203d7a0 The changes in this PR will be included in the next version bump. This PR includes changesets to release 12 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
Warning There were issues while running some tools. Please review the errors and either fix the tool’s configuration or disable the tool if it’s a critical failure. 🔧 eslint
packages/core/test/runStream.test.tsOops! Something went wrong! :( ESLint: 8.45.0 ESLint couldn't find the config "custom" to extend from. Please check that the name of the config is correct. The config "custom" was referenced from the config file in "/.eslintrc.js". If you still have problems, please stop by https://eslint.org/chat/help to chat with the team. WalkthroughThis pull request introduces updates to the configuration of active and total streams within the "trigger.dev" patch, increasing the number of active streams from 2 to 5 and the total number of streams from 5 to 10. Additionally, it enhances the Changes
Possibly related PRs
Poem
Tip CodeRabbit's docstrings feature is now available as part of our Early Access Program! Simply use the command Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (1)
packages/core/src/v3/runMetadata/manager.ts (1)
Line range hint
246-254
: Enhance warning messages with more actionable informationThe warning messages could be more helpful by suggesting what users should do when they hit these limits.
Consider updating the warning messages to be more actionable:
- console.warn( - `Exceeded the maximum number of active streams (${MAXIMUM_ACTIVE_STREAMS}). The "${key}" stream will be ignored.` - ); + console.warn( + `Exceeded the maximum number of active streams (${MAXIMUM_ACTIVE_STREAMS}). The "${key}" stream will be ignored. ` + + `Wait for some existing streams to complete before creating new ones.` + ); - console.warn( - `Exceeded the maximum number of total streams (${MAXIMUM_TOTAL_STREAMS}). The "${key}" stream will be ignored.` - ); + console.warn( + `Exceeded the maximum number of total streams (${MAXIMUM_TOTAL_STREAMS}). The "${key}" stream will be ignored. ` + + `Consider batching or combining related streams to stay within limits.` + );Also applies to: 259-267
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
.changeset/giant-mice-cheer.md
(1 hunks)packages/core/src/v3/runMetadata/manager.ts
(1 hunks)
🔇 Additional comments (2)
.changeset/giant-mice-cheer.md (1)
1-5
: LGTM! Changeset correctly documents the stream limit increases
The patch version bump and change description accurately reflect both the increase in active streams (2→5) and total streams (5→10).
packages/core/src/v3/runMetadata/manager.ts (1)
9-10
: LGTM! Constants updated to match PR objectives
The constant values have been correctly updated:
MAXIMUM_ACTIVE_STREAMS
increased from 2 to 5 (matches PR objective)MAXIMUM_TOTAL_STREAMS
increased from 5 to 10 (maintains the 2:1 ratio)
@trigger.dev/core
@trigger.dev/build
@trigger.dev/rsc
@trigger.dev/sdk
trigger.dev
@trigger.dev/react-hooks
commit: |
8f1d8e1
to
64ce918
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🔭 Outside diff range comments (2)
apps/webapp/app/services/routeBuilders/apiBuilder.server.ts (1)
Line range hint
474-479
: Validate actual request body size instead of relying on 'Content-Length' headerRelying solely on the
Content-Length
header for enforcingmaxContentLength
may not be reliable, as this header can be missing or manipulated. Consider reading the request body in chunks and enforcing the size limit based on the actual data received to ensure proper handling of oversized requests.apps/webapp/app/routes/api.v1.tasks.batch.ts (1)
Line range hint
91-98
: Consider sanitizing error logs.The error logging includes the full error stack trace which might expose sensitive information in production logs.
Consider sanitizing the error object:
- error: { - message: (error as Error).message, - stack: (error as Error).stack, - }, + error: error instanceof Error ? error.message : String(error), + code: error instanceof ServiceValidationError ? 'VALIDATION_ERROR' : 'UNKNOWN_ERROR'
🧹 Nitpick comments (4)
apps/webapp/app/services/metadata/updateMetadata.server.ts (1)
45-47
: Consider throwing aServiceValidationError
when the task run is not foundCurrently, when a task run is not found, the method returns without any indication of the issue. Throwing a
ServiceValidationError
with a message like "Task Run not found" would provide clearer feedback and allow consistent error handling upstream.Apply this diff to implement the change:
if (!taskRun) { - return; + throw new ServiceValidationError("Task Run not found"); }packages/core/src/v3/runMetadata/types.ts (1)
13-26
: Consider documenting the stream method's behavior.The interface looks good, but the
stream
method could benefit from documentation explaining:
- The relationship between streams and metadata
- The purpose of the AbortSignal
- Expected behavior when maximum stream limits are reached
Consider adding JSDoc comments:
/** * Streams data while updating metadata. Limited by MAXIMUM_ACTIVE_STREAMS. * @param key - Metadata key to update * @param value - AsyncIterable or ReadableStream to process * @param signal - Optional AbortSignal to cancel the stream * @returns AsyncIterable of processed values * @throws When maximum active streams limit is reached */ stream<T>( key: string, value: AsyncIterable<T> | ReadableStream<T>, signal?: AbortSignal ): Promise<AsyncIterable<T>>;packages/core/src/v3/runMetadata/noopManager.ts (1)
41-61
: LGTM! Consider adding documentation.The no-op implementations for parent and root getters correctly follow the null object pattern. Consider adding JSDoc comments to explain the purpose of these getters for better maintainability.
apps/webapp/app/routes/api.v1.tasks.batch.ts (1)
Line range hint
76-77
: Extract magic number into a constant.The 30-day TTL default is hardcoded. Consider extracting it into a named constant for better maintainability.
+const DEFAULT_IDEMPOTENCY_KEY_TTL_DAYS = 30; + const idempotencyKeyExpiresAt = resolveIdempotencyKeyTTL(idempotencyKeyTTL) ?? - new Date(Date.now() + 24 * 60 * 60 * 1000 * 30); + new Date(Date.now() + 24 * 60 * 60 * 1000 * DEFAULT_IDEMPOTENCY_KEY_TTL_DAYS);
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (14)
.changeset/giant-mice-cheer.md
(1 hunks)apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts
(1 hunks)apps/webapp/app/routes/api.v1.tasks.batch.ts
(1 hunks)apps/webapp/app/services/metadata/updateMetadata.server.ts
(1 hunks)apps/webapp/app/services/routeBuilders/apiBuilder.server.ts
(2 hunks)packages/core/src/v3/index.ts
(1 hunks)packages/core/src/v3/runMetadata/index.ts
(2 hunks)packages/core/src/v3/runMetadata/manager.ts
(5 hunks)packages/core/src/v3/runMetadata/noopManager.ts
(2 hunks)packages/core/src/v3/runMetadata/types.ts
(1 hunks)packages/core/src/v3/schemas/api.ts
(1 hunks)packages/react-hooks/package.json
(1 hunks)packages/trigger-sdk/src/v3/metadata.ts
(2 hunks)references/v3-catalog/src/trigger/runMetadata.ts
(3 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- .changeset/giant-mice-cheer.md
🔇 Additional comments (17)
apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts (1)
1-29
: Refactoring simplifies the API route and improves maintainability
The use of createActionApiRoute
streamlines the route handling by integrating parameter validation, authentication, and method enforcement. This reduces code duplication and enhances readability.
apps/webapp/app/services/routeBuilders/apiBuilder.server.ts (1)
459-471
: HTTP method validation enhances API security
The introduction of method validation ensures that only the specified HTTP methods are accepted, preventing unintended access and enhancing security.
packages/core/src/v3/schemas/api.ts (1)
672-729
: New metadata operation schemas are well-structured and comprehensive
The addition of various RunMetadata*Operation
schemas provides a flexible approach to handle metadata changes. The use of a discriminated union in RunMetadataChangeOperation
ensures type safety and clear validation of operations.
packages/core/src/v3/runMetadata/types.ts (1)
4-11
: LGTM! Well-structured metadata updater interface.
The RunMetadataUpdater
interface provides a comprehensive set of methods for metadata manipulation with clear, consistent naming and proper typing.
references/v3-catalog/src/trigger/runMetadata.ts (2)
24-25
: LGTM! Proper parent metadata update.
The increment operation correctly updates the parent's count of child tasks.
6-7
: Verify the initial numberOfChildren value.
Setting numberOfChildren
to 2 seems arbitrary. Consider:
- Starting from 0 and letting child tasks increment it
- Documenting why 2 is the initial value
✅ Verification successful
Initial numberOfChildren value is correct and intentional
The initial value of 2 is correct as it represents the expected number of child tasks that will be triggered in the execution flow:
runMetadataChildTask
is triggered directly fromrunMetadataTask
runMetadataChildTask2
is triggered from withinrunMetadataChildTask
The subsequent increment operations in child tasks are used for tracking additional dynamic children, not for counting these two statically known child tasks.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for other occurrences of numberOfChildren to understand the expected initial value
rg "numberOfChildren" --type ts
Length of output: 331
Script:
#!/bin/bash
# Let's look at the full context of runMetadata.ts to understand the metadata structure and child task relationships
cat references/v3-catalog/src/trigger/runMetadata.ts
Length of output: 1789
packages/react-hooks/package.json (2)
77-77
: LGTM! Added newline at EOF.
Following the best practice of ending files with a newline character.
Line range hint 1-1
: Verify stream limit changes across codebase.
The PR aims to increase active streams from 2 to 5, but the core changes to MAXIMUM_ACTIVE_STREAMS
and MAXIMUM_TOTAL_STREAMS
constants aren't visible in the reviewed files.
✅ Verification successful
Stream limit changes are correctly implemented in manager.ts
The verification confirms that the PR's objective to increase active streams from 2 to 5 is properly implemented in packages/core/src/v3/runMetadata/manager.ts
. The constants are defined as:
MAXIMUM_ACTIVE_STREAMS = 5
(increased from 2)MAXIMUM_TOTAL_STREAMS = 10
These limits are consistently enforced with appropriate warning messages when exceeded.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for stream limit constants and their usage
rg -A 2 "MAXIMUM_ACTIVE_STREAMS|MAXIMUM_TOTAL_STREAMS" --type ts
# Search for any hardcoded stream limits
rg -A 2 "maximum.*stream|stream.*limit" --type ts -i
Length of output: 2549
packages/core/src/v3/index.ts (1)
15-15
: LGTM! Export addition is well-placed.
The new export for runMetadata types is appropriately positioned near related exports.
packages/core/src/v3/runMetadata/index.ts (2)
84-90
: LGTM! Implementation correctly delegates to manager.
The parent and root getters maintain consistent delegation pattern with other methods.
Line range hint 1-90
: Verify stream limit changes.
The PR objective mentions increasing active streams from 2 to 5, but I don't see the stream limit configuration in these changes. Please ensure the stream limit changes are included in this PR.
✅ Verification successful
Let me search for stream limit configuration in the codebase.
Found stream limit configuration - PR objective verified
The stream limit configuration is defined in packages/core/src/v3/runMetadata/manager.ts
:
MAXIMUM_ACTIVE_STREAMS = 5
(increased from 2 as per PR objective)MAXIMUM_TOTAL_STREAMS = 10
The code includes proper limit checks and error handling:
- Checks active streams limit:
if (this.activeStreams.size >= MAXIMUM_ACTIVE_STREAMS)
- Checks total streams limit:
if (streams.length >= MAXIMUM_TOTAL_STREAMS)
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for stream limit configuration
rg -i "stream.*limit|MAX_.*STREAM"
# Search for the number 2 or 5 near stream-related code
rg -B 3 -A 3 "(2|5).*stream|stream.*(2|5)"
Length of output: 53318
Script:
#!/bin/bash
# Search for stream limit configuration in core package
rg -i "MAX.*STREAM|STREAM.*LIMIT" packages/core/
# Search for stream configuration in manager class
ast-grep --pattern 'class RunMetadataManager {
$$$
stream($$$) {
$$$
}
$$$
}'
Length of output: 1933
apps/webapp/app/routes/api.v1.tasks.batch.ts (2)
8-9
: LGTM: Import statements are well-organized.
The new imports enhance the route's capabilities with authentication, logging, and error handling.
Also applies to: 12-12
Line range hint 124-157
: LGTM: Response headers implementation is secure.
The JWT token generation follows security best practices with proper scoping and short-lived tokens.
packages/trigger-sdk/src/v3/metadata.ts (1)
10-11
: LGTM: Metadata updaters are properly implemented.
The parent and root metadata updaters are well-typed and properly integrated into the metadata object.
Also applies to: 36-37
packages/core/src/v3/runMetadata/manager.ts (3)
20-21
: LGTM: Well-implemented metadata operation queues.
The parent and root operation queues are properly implemented with type safety and clear operation types.
Also applies to: 31-75
347-364
: LGTM: Efficient flush implementation.
The flush implementation efficiently handles queued operations and includes proper error handling.
Also applies to: 397-401
10-11
: Verify impact of increased stream limits.
The increase in stream limits could impact system resources. Ensure the system can handle the increased load.
✅ Verification successful
Let me gather more information about how these streams are used and their impact.
Let me gather more information about how these streams are used in the codebase.
Based on the code analysis, I can now provide a final response about the stream limits change:
Stream limits are safely managed with proper resource controls
The implementation shows a robust stream management system with:
- Active stream tracking via a Map data structure
- Automatic cleanup of completed streams
- Stream timeout handling (60s default)
- Warning logs when limits are reached
- Graceful handling of excess streams by ignoring new ones rather than failing
The increased limits (5 active, 10 total) are reasonable as the system:
- Automatically cleans up finished streams
- Has timeout protection against hanging streams
- Implements graceful degradation when limits are reached
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Check for stream-related performance monitoring or configuration
rg -A 3 "MAXIMUM_(ACTIVE|TOTAL)_STREAMS"
Length of output: 1456
Script:
#!/bin/bash
# Check for stream usage and error handling
ast-grep --pattern 'class StandardMetadataManager {
$$$
}'
# Check for any performance monitoring or resource usage related code
rg -B 2 -A 2 "performance|memory|resource|monitor"
Length of output: 105671
Script:
#!/bin/bash
# Check for stream usage implementation and error handling
ast-grep --pattern 'class StandardMetadataManager {
private activeStreams = new Set<string>();
$$$
}'
# Check for any memory or resource monitoring related to streams
rg -B 2 -A 2 "activeStreams|totalStreams|stream.*memory|stream.*resource"
Length of output: 4129
64ce918
to
c91c6a8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 8
🧹 Nitpick comments (29)
references/nextjs-realtime/src/trigger/csv.ts (3)
15-19
: Implement retry/failover logic for network fetch issues.
Currently, a single failure to fetch the file leads to an immediate error throw. Consider adding optional retry or failover logic to improve the robustness of the CSV file retrieval process, especially if network hiccups or transient server issues occur.
25-52
: Handle malformed CSV data more comprehensively.
Although the code logs row parsing errors, be wary that certain CSV anomalies (e.g., partial rows, extra columns, or ill-formatted data) could break the parsing logic entirely. Consider implementing a more robust approach that tracks invalid rows separately for review or reprocessing, instead of silently discarding them.
75-88
: Validate row-level logic inside handleCSVRow.
Currently, the processing step simulates a random delay and logs rows. If you plan to perform business-critical logic (e.g., storing data in a database), consider adding robust validation, error handling, and retries. This ensures data consistency, especially under transient failures.references/nextjs-realtime/src/app/csv/[id]/RealtimeCSVRun.tsx (2)
46-55
: Provide structured error feedback for metadata parsing issues.
Right now, the user only sees “Failed to parse metadata.” You might want to expose or log additional details about the error, especially if the user is technical and needs more debug information.
75-93
: Ensure progress computations for partial data are correct.
If the CSV is very large or the processing is paused/stuck, users might see incomplete or misleading progress. Consider how progress gets updated for partial or paused states.references/nextjs-realtime/src/app/csv/page.tsx (1)
1-14
: Consider informing the user of file size limits and handling partial uploads.
While the dropzone approach is user-friendly, large CSV files risk partial or failed uploads. Add user guidance or fallback measures in case of partial uploads, ensuring the UI can handle incomplete CSV data gracefully.references/nextjs-realtime/src/components/ui/progress.tsx (1)
20-23
: Consider adding ARIA attributes for better accessibility.While Radix UI provides good accessibility defaults, consider adding aria-label or aria-describedby props to provide more context about what this progress bar represents.
<ProgressPrimitive.Indicator className="h-full w-full flex-1 bg-primary transition-all" style={{ transform: `translateX(-${100 - (value || 0)}%)` }} + aria-label="Progress indicator" />
references/nextjs-realtime/src/trigger/schemas.ts (2)
3-18
: Consider adding additional validation constraints for CSV fields.While the basic type coercion is good, consider adding more specific validation rules:
- Date field could use a date format validation
- Numeric fields might benefit from min/max bounds
export const CSVRow = z.object({ - Date: z.string(), + Date: z.string().refine((date) => !isNaN(Date.parse(date)), { + message: "Invalid date format", + }), Impressions: z.coerce.number(), Likes: z.coerce.number(), // ... other fields });
28-33
: Consider adding invariant validation for row counts.The metadata schema could benefit from validation ensuring processedRows doesn't exceed totalRows.
export const CSVUploadMetadataSchema = z.object({ status: CSVStatus, totalRows: z.number().int().nonnegative().optional(), inProgressRows: z.number().int().nonnegative().optional(), processedRows: z.number().int().nonnegative().optional(), -}); +}).refine( + (data) => { + if (data.totalRows != null && data.processedRows != null) { + return data.processedRows <= data.totalRows; + } + return true; + }, + { + message: "Processed rows cannot exceed total rows", + } +);references/nextjs-realtime/src/components/ImageUploadButton.tsx (2)
55-78
: Consider extracting common upload logic to reduce duplication.The
CSVUploadDropzone
component shares significant code withImageUploadDropzone
. Consider creating a reusableFileUploadDropzone
component to reduce duplication.Here's a suggested refactor:
interface FileUploadProps { endpoint: "csvUploader" | "imageUploader"; getRedirectUrl: (file: any) => string; } function FileUploadDropzone({ endpoint, getRedirectUrl }: FileUploadProps) { const router = useRouter(); return ( <UploadDropzone endpoint={endpoint} onClientUploadComplete={(res) => { console.log("Files: ", res); const firstFile = res[0]; router.push(getRedirectUrl(firstFile)); }} onUploadError={(error: Error) => { console.error(`ERROR! ${error.message}`); }} className="border-gray-600" /> ); } // Usage: export function CSVUploadDropzone() { return ( <FileUploadDropzone endpoint="csvUploader" getRedirectUrl={(file) => `/csv/${file.serverData.id}?publicAccessToken=${file.serverData.publicAccessToken}` } /> ); }
59-76
: Enhance user experience with feedback states.The upload component could benefit from additional user feedback:
- Loading state during upload
- File type validation feedback
- Progress indicator for large files
Would you like me to provide an example implementation with these improvements?
references/nextjs-realtime/src/app/api/uploadthing/core.ts (1)
53-53
: Consider adding error handling for task triggering.The task trigger should include error handling to manage potential failures gracefully.
try { const handle = await tasks.trigger<typeof handleCSVUpload>("handle-csv-upload", file); return handle; } catch (error) { console.error("Failed to trigger CSV upload handling:", error); throw new UploadThingError("Failed to process CSV file"); }packages/core/src/v3/runMetadata/operations.ts (2)
5-8
: Establish consistent rules for unapplied operations.
Currently, unapplied operations are collected without a clear explanation. Consider documenting the reasons or adding descriptive properties to each operation about why it was not applied.
70-96
: Handle potential "remove" operation failures more explicitly.
When removal operations fail because the target is not an array, you currently push to unapplied operations without feedback. Consider adding logging or handling logic to indicate why it could not remove the item (e.g., by providing an error message).apps/webapp/app/services/metadata/updateMetadata.server.ts (3)
40-74
: Potential infinite loop in flushing logic.
The while(true) + sleep approach is workable, but ensure that an unhandled rejection or severe error doesn’t leave this fiber running forever. The catchAll block is good, but adding an escape condition might be safer for graceful shutdown.
193-215
: Efficiently merging consecutive "set" operations.
Your implementation merges set operations by key. For large sets of changes, consider grouping and applying them in bulk, or implementing a last-write-wins approach.
357-394
: Direct updates bypass version checks.
#updateRunMetadataDirectly does not use metadataVersion to ensure concurrency. If concurrency conflicts are possible, consider aligning it with the optimistic locking approach used in #updateRunMetadataWithOperations.apps/webapp/app/v3/services/completeAttempt.server.ts (1)
159-160
: Duplicate metadata usage.
Similar to lines 98-99; if the same metadata flows repeatedly, ensure no duplication or potential PII leakage..changeset/slow-deers-collect.md (1)
8-8
: Enhance changeset description clarityThe description could be more detailed to better explain the feature's impact and usage. Consider expanding it to:
"Adding the ability to update parent and root run metadata from child runs/tasks, enabling better state management and data flow between related runs."
🧰 Tools
🪛 LanguageTool
[uncategorized] ~8-~8: You might be missing the article “the” here.
Context: ... "@trigger.dev/core": patch --- Adding ability to update parent run metadata from chil...(AI_EN_LECTOR_MISSING_DETERMINER_THE)
apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts (1)
18-25
: Consider enhancing error handlingWhile the current error handling covers the "not found" case, consider handling additional error scenarios:
- Validation errors from the service
- Database errors
- Rate limiting or quota exceeded scenarios
Example enhancement:
async ({ authentication, body, params }) => { const result = await updateMetadataService.call(authentication.environment, params.runId, body); if (!result) { return json({ error: "Task Run not found" }, { status: 404 }); } + if (result.error) { + return json({ error: result.error.message }, { status: result.error.status || 400 }); + } return json(result, { status: 200 }); }packages/core/src/v3/runMetadata/types.ts (1)
19-29
: Consider documenting the hierarchical metadata behavior.The interface effectively enables hierarchical metadata management through
parent
androot
getters. However, it would be beneficial to add JSDoc comments explaining:
- The relationship between parent, root, and current metadata
- The propagation behavior of metadata updates
- Any potential limitations or edge cases
apps/webapp/app/v3/services/resumeDependentParents.server.ts (1)
Line range hint
199-213
: Duplicate error message in batch dependency handling.The error message and logging in batchRunDependency method is identical to singleRunDependency. Consider extracting to a shared helper method.
Consider refactoring like this:
+ private logDependencyAttemptError(dependency: Dependency) { + logger.error( + "ResumeDependentParentsService: dependency child attempt not found", + { dependency } + ); + return { + success: false, + error: `Dependency child attempt not found for run ${dependency.taskRunId}`, + }; + } async #batchRunDependency(dependency: Dependency): Promise<Output> { // ... if (!lastAttempt) { - logger.error( - "ResumeDependentParentsService.singleRunDependency(): dependency child attempt not found", - { dependency } - ); - return { - success: false, - error: `Dependency child attempt not found for run ${dependency.taskRunId}`, - }; + return this.logDependencyAttemptError(dependency); } // ... }packages/core/src/v3/apiClient/stream.ts (1)
21-21
: LGTM! Consider adding delay parameter validation.The implementation of delayed stream termination is well-structured and thread-safe. The abort signal check prevents redundant operations.
Consider adding validation for the delay parameter:
stop: (delay?: number) => { + if (delay !== undefined && delay < 0) { + throw new Error("Delay must be a non-negative number"); + } if (delay) { setTimeout(() => {Also applies to: 67-76
packages/trigger-sdk/src/v3/metadata.ts (2)
10-11
: Well-structured metadata management implementation!The changes introduce a clean hierarchical metadata management system with:
- Parent and root metadata updaters for vertical propagation
- Consolidated updater object for better maintainability
- Chainable API design for improved usability
The hierarchical design allows for efficient metadata propagation while maintaining clear boundaries between different scopes (current/parent/root).
Also applies to: 23-31, 33-42
87-89
: Consider enhancing error handling in metadata operations.While the method chaining implementation is clean, consider wrapping the runMetadata calls in try-catch blocks to handle potential errors gracefully.
Example implementation:
function setMetadataKey(key: string, value: DeserializedJson) { + try { runMetadata.set(key, value); + } catch (error) { + console.error(`Failed to set metadata key ${key}:`, error); + throw error; + } return metadataUpdater; }Also applies to: 101-102, 154-155, 171-172, 187-188
apps/webapp/package.json (1)
121-121
: Consider using exact version for effect dependency.While the current ^3.11.7 allows for patch and minor updates, consider pinning to an exact version for better dependency stability.
- "effect": "^3.11.7", + "effect": "3.11.7",packages/core/src/v3/apiClient/runStream.ts (1)
126-126
: Consider making the timeout duration configurable.The 30-second timeout for stopping the stream is hardcoded. Consider making this configurable through options or environment variables for flexibility in different environments.
- stopRunShapeStream: () => runStreamInstance.stop(30 * 1000), + stopRunShapeStream: () => runStreamInstance.stop(options?.streamStopTimeoutMs ?? 30 * 1000),packages/core/src/v3/schemas/common.ts (1)
4-66
: Consider adding validation for edge cases in metadata operations.While the schema definitions are well-structured, consider adding:
- Maximum length validation for string keys
- Size limits for metadata values
- Depth limits for nested objects in the
value
fieldThis would prevent potential abuse or performance issues with extremely large metadata.
packages/core/src/v3/runMetadata/manager.ts (1)
Line range hint
200-259
: Consider adding retry mechanism for stream creation failures.While the error handling is good, consider adding a retry mechanism for transient failures during stream creation.
Example implementation:
private async doStream<T>( key: string, value: AsyncIterable<T> | ReadableStream<T>, updater: RunMetadataUpdater = this, signal?: AbortSignal ): Promise<AsyncIterable<T>> { + const maxRetries = 3; + const retryDelay = 1000; + + for (let attempt = 1; attempt <= maxRetries; attempt++) { try { const streamInstance = new MetadataStream({/*...*/}); // ... existing implementation return streamInstance; } catch (error) { + if (attempt === maxRetries) { updater.remove(`$$streams`, key); throw error; + } + await new Promise(resolve => setTimeout(resolve, retryDelay * attempt)); } + } }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
pnpm-lock.yaml
is excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (42)
.changeset/giant-mice-cheer.md
(1 hunks).changeset/slow-deers-collect.md
(1 hunks)apps/webapp/app/env.server.ts
(1 hunks)apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts
(1 hunks)apps/webapp/app/routes/api.v1.tasks.batch.ts
(1 hunks)apps/webapp/app/services/metadata/updateMetadata.server.ts
(1 hunks)apps/webapp/app/services/routeBuilders/apiBuilder.server.ts
(3 hunks)apps/webapp/app/v3/services/completeAttempt.server.ts
(4 hunks)apps/webapp/app/v3/services/finalizeTaskRun.server.ts
(5 hunks)apps/webapp/app/v3/services/resumeDependentParents.server.ts
(2 hunks)apps/webapp/package.json
(1 hunks)internal-packages/database/prisma/migrations/20241216212038_add_metadata_version/migration.sql
(1 hunks)internal-packages/database/prisma/schema.prisma
(1 hunks)packages/cli-v3/src/entryPoints/deploy-run-worker.ts
(3 hunks)packages/cli-v3/src/entryPoints/dev-run-worker.ts
(2 hunks)packages/core/src/v3/apiClient/index.ts
(1 hunks)packages/core/src/v3/apiClient/runStream.ts
(2 hunks)packages/core/src/v3/apiClient/stream.ts
(2 hunks)packages/core/src/v3/index.ts
(1 hunks)packages/core/src/v3/run-metadata-api.ts
(1 hunks)packages/core/src/v3/runMetadata/index.ts
(3 hunks)packages/core/src/v3/runMetadata/manager.ts
(6 hunks)packages/core/src/v3/runMetadata/noopManager.ts
(2 hunks)packages/core/src/v3/runMetadata/operations.ts
(1 hunks)packages/core/src/v3/runMetadata/types.ts
(1 hunks)packages/core/src/v3/runtime/devRuntimeManager.ts
(4 hunks)packages/core/src/v3/schemas/api.ts
(2 hunks)packages/core/src/v3/schemas/common.ts
(3 hunks)packages/react-hooks/package.json
(1 hunks)packages/react-hooks/src/hooks/useRealtime.ts
(5 hunks)packages/trigger-sdk/src/v3/metadata.ts
(10 hunks)packages/trigger-sdk/src/v3/runs.ts
(3 hunks)references/nextjs-realtime/package.json
(1 hunks)references/nextjs-realtime/src/app/api/uploadthing/core.ts
(2 hunks)references/nextjs-realtime/src/app/csv/[id]/RealtimeCSVRun.tsx
(1 hunks)references/nextjs-realtime/src/app/csv/[id]/page.tsx
(1 hunks)references/nextjs-realtime/src/app/csv/page.tsx
(1 hunks)references/nextjs-realtime/src/components/ImageUploadButton.tsx
(1 hunks)references/nextjs-realtime/src/components/ui/progress.tsx
(1 hunks)references/nextjs-realtime/src/trigger/csv.ts
(1 hunks)references/nextjs-realtime/src/trigger/schemas.ts
(1 hunks)references/v3-catalog/src/trigger/runMetadata.ts
(3 hunks)
✅ Files skipped from review due to trivial changes (1)
- internal-packages/database/prisma/migrations/20241216212038_add_metadata_version/migration.sql
🚧 Files skipped from review as they are similar to previous changes (4)
- packages/core/src/v3/index.ts
- .changeset/giant-mice-cheer.md
- references/v3-catalog/src/trigger/runMetadata.ts
- apps/webapp/app/routes/api.v1.tasks.batch.ts
🧰 Additional context used
🪛 LanguageTool
.changeset/slow-deers-collect.md
[uncategorized] ~8-~8: You might be missing the article “the” here.
Context: ... "@trigger.dev/core": patch --- Adding ability to update parent run metadata from chil...
(AI_EN_LECTOR_MISSING_DETERMINER_THE)
🔇 Additional comments (44)
references/nextjs-realtime/src/trigger/csv.ts (1)
56-58
: Confirm concurrency or rate limits for batch processing.
When calling batch.triggerAndWait, ensure that concurrency settings are well-defined or tested for large CSV files. Without proper concurrency or rate-limiting, system resources could be strained.
references/nextjs-realtime/src/app/csv/[id]/RealtimeCSVRun.tsx (1)
21-33
: Guard the environment variable usage.
The code references NEXT_PUBLIC_TRIGGER_API_URL. If this environment variable is missing or malformed, requests may fail silently. Consider adding fallback logic or user-friendly error messages in case it’s undefined.
references/nextjs-realtime/src/app/csv/[id]/page.tsx (1)
11-13
: Prevent potential accidental 404 edge cases.
The code calls notFound() if publicAccessToken is not a string. If the token is an empty string (""), it still passes this check but may be invalid. Consider verifying that it's non-empty as well.
references/nextjs-realtime/src/components/ui/progress.tsx (1)
8-25
: LGTM! Well-structured Progress component implementation.
The component follows React best practices with proper TypeScript types, ref forwarding, and clean composition of Radix UI primitives.
references/nextjs-realtime/package.json (1)
16-16
: Verify dependency versions and security implications.
The new dependencies look good, but let's verify the latest stable versions and any security advisories.
Also applies to: 19-19
✅ Verification successful
Dependencies are up-to-date and secure
The package versions being used are the latest stable releases:
- @fast-csv/[email protected] matches the latest version
- @radix-ui/[email protected] matches the latest version
The only known vulnerability in fast-csv affects versions < 4.3.6, while we're using version 5.0.2, so we're not affected.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Check for latest versions and security advisories for new dependencies
echo "Checking @fast-csv/parse..."
curl -s https://registry.npmjs.org/@fast-csv/parse/latest | jq '.version'
echo "Checking @radix-ui/react-progress..."
curl -s https://registry.npmjs.org/@radix-ui/react-progress/latest | jq '.version'
# Check for known vulnerabilities
gh api graphql -f query='
{
securityVulnerabilities(first: 5, ecosystem: NPM, package: "fast-csv") {
nodes {
advisory {
summary
severity
publishedAt
}
vulnerableVersionRange
}
}
}
'
Length of output: 753
packages/core/src/v3/runMetadata/operations.ts (3)
18-28
: Guard array manipulations that rely on keys starting with '$.'.
Ensure that all potential JSON key paths that do not begin with '$.' are properly handled. An inadvertent key mismatch can lead to incorrect data in the final metadata object.
114-118
: Overwriting the entire metadata object.
When updating all metadata with operation.value, verify that downstream code doesn’t expect existing keys to be retained. This operation entirely replaces the prior metadata object.
16-16
: Check structuredClone compatibility if Node.js versions vary.
If you expect this code to run in older Node.js environments, consider a polyfill or an alternative method for deep-cloning (e.g., JSON.parse(JSON.stringify(...))) if structuredClone is unavailable.
✅ Verification successful
Let me check the Node.js version requirements for this project.
Based on the package.json files, I can see that the minimum Node.js version requirement across the project is 18.20.0 (specified in multiple packages). Since structuredClone
was introduced in Node.js 17.0.0, it's safe to use in this codebase.
No compatibility concerns with structuredClone usage.
The codebase requires Node.js >=18.20.0 which fully supports the structuredClone API, making it safe to use without any polyfills or alternatives.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Searching where structuredClone is used or referenced in the codebase
rg "structuredClone"
Length of output: 245
Script:
#!/bin/bash
# Check package.json files for Node.js version requirements
fd package.json --exec cat {}
Length of output: 62681
apps/webapp/app/services/metadata/updateMetadata.server.ts (2)
217-278
: Validate final run status before applying parent or root operations.
In your call method, you skip updating the parent or root if the run is not final. Ensure that’s consistent with the overall logic, and confirm no partial updates cause side effects.
296-355
: Extended retries logic.
In your #updateRunMetadataWithOperations, you retry up to MAX_RETRIES times, then buffer operations again. This approach is solid. Monitor logs or metrics to ensure repeated unsuccessful retries don’t cause indefinite loops.
apps/webapp/app/v3/services/completeAttempt.server.ts (3)
15-16
: Check interplay between TaskRun and MAX_TASK_RUN_ATTEMPTS.
Ensure that any future changes in MAX_TASK_RUN_ATTEMPTS consistently reflect in code handling run attempts. Overriding this constant in different environments might cause unexpected behavior.
30-30
: Integration with updateMetadataService
Good usage of the new service. Ensure the environment variable is always defined to prevent runtime errors.
314-315
: Metadata usage in #completeAttemptFailed.
Confirm that storing completion metadata during failures does not inadvertently overwrite essential debug information or lose it if the run is retried.
internal-packages/database/prisma/schema.prisma (1)
1781-1783
: Addition of metadataVersion to TaskRun.
This is a crucial field for conflict resolution and migration strategies. Ensure that client code properly increments it, as done in the optimistic locking approach.
packages/core/src/v3/run-metadata-api.ts (1)
Line range hint 1-8
: Well-structured module with optimization consideration
Good implementation of:
- Singleton pattern for API instance
- Module organization with clear re-exports
- Tree-shaking optimization through separate file structure
apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts (2)
13-16
: Well-structured API route configuration
Good implementation with:
- Schema validation for params and body
- Explicit content size limit
- Clear HTTP method specification
15-15
: Verify content size limit implications
The 1MB limit for metadata updates should be verified against typical usage patterns.
✅ Verification successful
Let me gather more information about the metadata usage and size patterns.
Let me check one more aspect regarding the metadata validation and payload handling.
1MB metadata size limit is appropriate
The 1MB limit for metadata updates is reasonable based on the following findings:
- The metadata API endpoint is designed for storing run-time metadata and status updates, not large payloads
- The codebase implements proper validation and error handling for metadata updates
- The system prevents metadata updates for completed runs, indicating metadata is meant for tracking execution state
- Other similar size limits in the codebase (like task payloads) follow comparable patterns
- The metadata operations are handled through a batched flush mechanism, suggesting the system is optimized for smaller, frequent updates
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for existing metadata payloads to verify size appropriateness
rg -g '*.{ts,js}' -B 2 -A 2 'metadata.*payload|payload.*metadata'
Length of output: 1288
Script:
#!/bin/bash
# Search for metadata-related type definitions and schemas
ast-grep --pattern 'type $_ = { $$$metadata$$$}'
# Look for metadata-related API endpoints and handlers
rg -g '*.{ts,js}' -B 3 -A 3 'metadata.*update|update.*metadata'
# Search for any size limits or validation related to metadata
rg -g '*.{ts,js}' 'metadata.*size|size.*metadata|metadata.*limit|limit.*metadata'
Length of output: 10351
Script:
#!/bin/bash
# Look for metadata validation schemas and payload size handling
rg -g '*.{ts,js}' -B 3 -A 3 'UpdateMetadataRequestBody|metadata.*validate'
# Check for any existing size-related constants or configurations
rg -g '*.{ts,js}' 'maxContentLength|MAX_.*SIZE|SIZE_LIMIT'
Length of output: 9592
packages/core/src/v3/runMetadata/types.ts (1)
4-11
: LGTM! Well-designed interface for metadata manipulation.
The RunMetadataUpdater
interface provides a comprehensive and chainable API for metadata operations. The method names are concise and follow common conventions (e.g., set
/del
pattern).
packages/react-hooks/package.json (1)
Line range hint 1-77
: LGTM! Proper dual ESM/CommonJS module configuration.
The exports field correctly defines both ESM and CommonJS entry points, ensuring compatibility across different module systems. The configuration follows Node.js best practices for dual package hazard handling.
packages/core/src/v3/runMetadata/index.ts (2)
44-76
: LGTM! Clean implementation of metadata operations.
The implementation follows a consistent pattern:
- Proper delegation to the manager
- Method chaining support via
return this
- Consistent error propagation
95-101
: Verify parent/root metadata access patterns.
The implementation of parent and root metadata access looks correct, but we should verify the usage patterns to ensure proper metadata isolation.
✅ Verification successful
Let me gather more context about the metadata implementation to ensure proper isolation.
Let me check one more aspect to understand the metadata hierarchy implementation better.
Based on the gathered evidence, I can now provide the final response:
Parent/root metadata access patterns are properly isolated and used correctly.
The implementation shows proper metadata isolation with:
- Parent/root metadata modifications are limited to safe operations (increment, append)
- Usage is consistent across the codebase, primarily for tracking child relationships and counts
- No direct property access or unsafe operations detected on parent/root metadata
- Clear hierarchical structure maintained through the manager implementation
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for usage patterns of parent/root metadata access
rg -A 5 '\.(parent|root)\.(set|del|append|remove|increment|decrement|update)' --type ts
Length of output: 1393
Script:
#!/bin/bash
# Search for the RunMetadataUpdater class implementation and usage
ast-grep --pattern 'class RunMetadataUpdater {
$$$
}'
# Also check for any potential metadata leaks or direct access
rg -A 3 'metadata\.(current|get|getKey)' --type ts
Length of output: 2788
Script:
#!/bin/bash
# Search for metadata manager implementation to understand isolation
ast-grep --pattern 'class $_Manager {
$$$
parent {
$$$
}
root {
$$$
}
$$$
}'
# Check for any direct property access on parent/root
rg '\.(parent|root)\.' --type ts -A 2 | rg -v '(set|del|append|remove|increment|decrement|update|get|current)'
Length of output: 14014
packages/core/src/v3/runMetadata/noopManager.ts (1)
Line range hint 6-42
: LGTM! Method signature changes improve API ergonomics.
The method signature changes from void to this enable method chaining, which improves the API's usability.
apps/webapp/app/v3/services/resumeDependentParents.server.ts (2)
24-51
: LGTM! Well-structured field selection.
The taskRunDependencySelect constant provides clear and type-safe field selection, improving maintainability and reducing potential runtime errors.
83-89
: LGTM! Early return improves code flow.
Moving the DEVELOPMENT environment check earlier in the flow improves code readability and efficiency.
packages/trigger-sdk/src/v3/metadata.ts (1)
210-227
: LGTM! Well-documented refresh implementation.
The refreshMetadata function follows consistent patterns and includes proper request options handling.
packages/cli-v3/src/entryPoints/dev-run-worker.ts (1)
333-333
: LGTM: Metadata handling in task completion responses
The changes correctly include metadata in both success and error completion responses, using the appropriate metadata manager API.
Also applies to: 358-358
apps/webapp/app/env.server.ts (2)
248-248
: LGTM: Reasonable flush interval for batch operations
The 1-second default interval for flushing batch metadata operations is a good balance between reducing database load and maintaining data freshness.
241-241
: Verify impact of increased metadata size limit
The metadata size limit has been significantly increased from 4KB to 256KB. While this allows for more detailed metadata, we should verify:
- The impact on memory usage when many runs are active
- Any potential performance implications for the database
- Network bandwidth considerations for real-time updates
✅ Verification successful
Let me gather more specific information about metadata usage.
Let me try one more search with corrected file type flags.
Based on the codebase analysis, I can now provide a final response:
The increased metadata size limit is justified and safe
The increase from 4KB to 256KB is well-considered for the following reasons:
- The metadata is primarily used for tracking task run states, progress, and streaming operations, as evidenced by the extensive metadata management system in the codebase.
- The metadata is handled efficiently with:
- Batched updates through
BatchMetadataOperationsFlushInterval
- Incremental updates using operations like
append
,increment
, andremove
- Proper cleanup of stream metadata
- Batched updates through
- Real-world usage patterns in the codebase show metadata being used for:
- Task progress tracking
- Stream management for AI/ML operations
- CSV processing status
- Run state management
The implementation includes safeguards like periodic flushing and proper error handling to manage the metadata efficiently.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check current metadata sizes to validate the new limit
# Search for metadata-related database operations
rg -A 5 'metadata.*update|update.*metadata' --type sql
# Look for any existing large metadata payloads
ast-grep --pattern 'metadata = $_'
Length of output: 3791
Script:
#!/bin/bash
# Search for metadata-related code patterns
rg -l 'metadata' --type typescript
# Look for metadata size patterns and current usage
rg -A 3 'metadata.*size|size.*metadata' --type typescript
# Search for metadata storage/update patterns
ast-grep --pattern 'metadata: $_'
Length of output: 952
Script:
#!/bin/bash
# Search for metadata usage patterns in TypeScript/JavaScript files
rg -A 3 'metadata' -t js -t ts
# Look for metadata size validation or checks
rg -A 3 'metadata.*size|size.*metadata' -t js -t ts
# Search for metadata-related tests
fd -e test.ts -e test.js -x rg -l 'metadata' {}
Length of output: 73988
packages/trigger-sdk/src/v3/runs.ts (1)
Line range hint 338-391
: LGTM: Well-implemented subscription control for metadata updates
The changes provide a clean and well-documented way to control subscription lifecycle, particularly useful for updating metadata from child runs. The implementation:
- Maintains backward compatibility with the default
stopOnCompletion: true
- Provides clear documentation of the new option
- Correctly maps to the API client's interface
packages/cli-v3/src/entryPoints/deploy-run-worker.ts (1)
366-366
: LGTM! Consistent metadata handling across all completion paths.
The changes ensure that run metadata is properly included in all task completion scenarios (normal completion, execution completion, and error cases). This implementation aligns with the PR objective of enabling metadata updates from child runs.
Also applies to: 391-391, 415-415
packages/core/src/v3/apiClient/runStream.ts (1)
Line range hint 327-336
: LGTM! Improved run completion logic.
The change to use finishedAt
for determining run completion is more accurate and aligns with the PR objective by allowing metadata updates to continue until the run is truly finished.
packages/react-hooks/src/hooks/useRealtime.ts (2)
23-31
: LGTM! Well-documented option for controlling subscription behavior.
The stopOnCompletion
option is well-documented with a clear explanation of its purpose and default value. The documentation effectively communicates the use case for child run metadata updates.
102-108
: LGTM! Consistent implementation of stopOnCompletion across functions.
The stopOnCompletion
option is consistently implemented across processRealtimeRun
and processRealtimeRunWithStreams
, with proper default values and type handling.
Also applies to: 262-262, 640-646
apps/webapp/app/services/routeBuilders/apiBuilder.server.ts (3)
387-387
: LGTM! Good addition of HTTP method restriction.
The addition of the method
property to ApiKeyActionRouteBuilderOptions
enhances API security by allowing explicit HTTP method restrictions.
459-470
: LGTM! Well-implemented method validation.
The method validation is properly implemented with:
- Correct status code (405) for method not allowed
- Proper Allow header in the response
- Consistent error response format
631-641
: LGTM! Enhanced error logging.
The error logging improvements are valuable:
- Captures detailed error information including stack traces
- Includes request URL for better context
- Properly handles both Error objects and other error types
packages/core/src/v3/schemas/api.ts (1)
677-677
: LGTM! Clean schema update.
The UpdateMetadataRequestBody schema is properly updated to use FlushedRunMetadata, maintaining consistency with the new metadata handling system.
packages/core/src/v3/apiClient/index.ts (2)
626-636
: LGTM! Well-implemented metadata retrieval method.
The getRunMetadata method is properly implemented with:
- Correct HTTP method (GET)
- Proper error handling through zodfetch
- Consistent with API patterns
638-644
: LGTM! Good enhancement to subscription control.
The addition of closeOnComplete option:
- Maintains backward compatibility with default true
- Provides more flexibility in subscription management
- Well-documented through types
apps/webapp/app/v3/services/finalizeTaskRun.server.ts (1)
73-89
: LGTM! Robust error handling for metadata updates.
The implementation properly handles metadata updates with:
- Non-blocking execution
- Comprehensive error logging
- Proper error sanitization
packages/core/src/v3/schemas/common.ts (1)
322-322
: LGTM! Consistent metadata support in execution results.
The metadata field is properly added to both success and failure execution results.
Also applies to: 335-335
packages/core/src/v3/runMetadata/manager.ts (3)
113-125
: LGTM! Efficient operation queueing with optimizations.
The implementation includes:
- Duplicate operation prevention
- Operation validation before queueing
- Proper state management
305-335
: LGTM! Well-implemented flush mechanism with proper synchronization.
The flush implementation:
- Prevents concurrent flushes
- Efficiently batches operations
- Properly clears queues after flush
11-12
: Verify the impact of increased stream limits.
The increase in stream limits could impact system resources:
- Active streams increased from 2 to 5
- Total streams increased from 5 to 10
✅ Verification successful
Stream limit increase appears safe and well-managed
The increased stream limits are used with proper error handling and clear messaging:
- Stream usage is primarily for handling OpenAI completions, fetch responses, and logs
- The manager enforces limits with descriptive error messages when exceeded
- Stream handling is isolated to specific documented use cases in the realtime features
- No evidence of resource-intensive or unbounded stream usage
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Check for stream usage patterns in the codebase
rg -A 5 "stream\(" --type typescript
Length of output: 73
Script:
#!/bin/bash
# Search for stream-related patterns in all files
rg -A 5 "stream\("
# Also search for stream usage with different casing
rg -i "createstream|newstream"
# Look for stream-related class/method definitions
ast-grep --pattern 'class $_ {
$$$
stream($$$) {
$$$
}
$$$
}'
# Search for stream limit constant usage
rg "MAXIMUM_(ACTIVE|TOTAL)_STREAMS"
Length of output: 7094
export class UpdateMetadataService extends BaseService { | ||
private _bufferedOperations: Map<string, BufferedRunMetadataChangeOperation[]> = new Map(); | ||
private _flushFiber: RuntimeFiber<void> | null = null; | ||
|
||
constructor( | ||
protected readonly _prisma: PrismaClientOrTransaction = prisma, | ||
private readonly flushIntervalMs: number = 5000 | ||
) { | ||
super(); | ||
|
||
this._startFlushing(); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider concurrency control for buffered operations.
The in-memory Map might grow if flush intervals are large or operations surge. Think about queue backpressure or a maximum buffer size to prevent potential memory exhaustion.
refresh(requestOptions?: ApiRequestOptions): Promise<void> { | ||
return this.#getManager().flush(requestOptions); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix incorrect implementation of refresh method.
The refresh
method is calling flush
instead of refresh
on the manager. This appears to be a copy-paste error that could lead to unexpected behavior.
refresh(requestOptions?: ApiRequestOptions): Promise<void> {
- return this.#getManager().flush(requestOptions);
+ return this.#getManager().refresh(requestOptions);
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
refresh(requestOptions?: ApiRequestOptions): Promise<void> { | |
return this.#getManager().flush(requestOptions); | |
} | |
refresh(requestOptions?: ApiRequestOptions): Promise<void> { | |
return this.#getManager().refresh(requestOptions); | |
} |
get root(): RunMetadataUpdater { | ||
return { | ||
append: () => this.parent, | ||
set: () => this.parent, | ||
del: () => this.parent, | ||
increment: () => this.parent, | ||
decrement: () => this.parent, | ||
remove: () => this.parent, | ||
stream: () => | ||
Promise.resolve({ | ||
[Symbol.asyncIterator]: () => ({ | ||
next: () => Promise.resolve({ done: true, value: undefined }), | ||
}), | ||
}), | ||
update: () => this.parent, | ||
}; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix similar infinite recursion issue in root getter.
The root getter has the same recursive reference issue as the parent getter.
Apply this fix:
get root(): RunMetadataUpdater {
return {
- append: () => this.parent,
- set: () => this.parent,
- del: () => this.parent,
- increment: () => this.parent,
- decrement: () => this.parent,
- remove: () => this.parent,
+ append: () => this,
+ set: () => this,
+ del: () => this,
+ increment: () => this,
+ decrement: () => this,
+ remove: () => this,
stream: () =>
Promise.resolve({
[Symbol.asyncIterator]: () => ({
next: () => Promise.resolve({ done: true, value: undefined }),
}),
}),
- update: () => this.parent,
+ update: () => this,
};
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
get root(): RunMetadataUpdater { | |
return { | |
append: () => this.parent, | |
set: () => this.parent, | |
del: () => this.parent, | |
increment: () => this.parent, | |
decrement: () => this.parent, | |
remove: () => this.parent, | |
stream: () => | |
Promise.resolve({ | |
[Symbol.asyncIterator]: () => ({ | |
next: () => Promise.resolve({ done: true, value: undefined }), | |
}), | |
}), | |
update: () => this.parent, | |
}; | |
} | |
get root(): RunMetadataUpdater { | |
return { | |
append: () => this, | |
set: () => this, | |
del: () => this, | |
increment: () => this, | |
decrement: () => this, | |
remove: () => this, | |
stream: () => | |
Promise.resolve({ | |
[Symbol.asyncIterator]: () => ({ | |
next: () => Promise.resolve({ done: true, value: undefined }), | |
}), | |
}), | |
update: () => this, | |
}; | |
} |
get parent(): RunMetadataUpdater { | ||
return { | ||
append: () => this.parent, | ||
set: () => this.parent, | ||
del: () => this.parent, | ||
increment: () => this.parent, | ||
decrement: () => this.parent, | ||
remove: () => this.parent, | ||
stream: () => | ||
Promise.resolve({ | ||
[Symbol.asyncIterator]: () => ({ | ||
next: () => Promise.resolve({ done: true, value: undefined }), | ||
}), | ||
}), | ||
update: () => this.parent, | ||
}; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix potential infinite recursion in parent getter.
The parent getter has recursive references that could cause infinite loops. Each method returns this.parent
which would trigger the getter again.
Apply this fix:
get parent(): RunMetadataUpdater {
return {
- append: () => this.parent,
- set: () => this.parent,
- del: () => this.parent,
- increment: () => this.parent,
- decrement: () => this.parent,
- remove: () => this.parent,
+ append: () => this,
+ set: () => this,
+ del: () => this,
+ increment: () => this,
+ decrement: () => this,
+ remove: () => this,
stream: () =>
Promise.resolve({
[Symbol.asyncIterator]: () => ({
next: () => Promise.resolve({ done: true, value: undefined }),
}),
}),
- update: () => this.parent,
+ update: () => this,
};
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
get parent(): RunMetadataUpdater { | |
return { | |
append: () => this.parent, | |
set: () => this.parent, | |
del: () => this.parent, | |
increment: () => this.parent, | |
decrement: () => this.parent, | |
remove: () => this.parent, | |
stream: () => | |
Promise.resolve({ | |
[Symbol.asyncIterator]: () => ({ | |
next: () => Promise.resolve({ done: true, value: undefined }), | |
}), | |
}), | |
update: () => this.parent, | |
}; | |
} | |
get parent(): RunMetadataUpdater { | |
return { | |
append: () => this, | |
set: () => this, | |
del: () => this, | |
increment: () => this, | |
decrement: () => this, | |
remove: () => this, | |
stream: () => | |
Promise.resolve({ | |
[Symbol.asyncIterator]: () => ({ | |
next: () => Promise.resolve({ done: true, value: undefined }), | |
}), | |
}), | |
update: () => this, | |
}; | |
} |
async #tryFlushMetadata() { | ||
try { | ||
await runMetadata.flush(); | ||
} catch (err) {} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Improve error handling in metadata flush.
The empty catch block silently swallows errors, which could hide important issues. Consider adding error logging.
Apply this improvement:
async #tryFlushMetadata() {
try {
await runMetadata.flush();
- } catch (err) {}
+ } catch (err) {
+ console.warn("Failed to flush metadata:", err);
+ }
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
async #tryFlushMetadata() { | |
try { | |
await runMetadata.flush(); | |
} catch (err) {} | |
} | |
async #tryFlushMetadata() { | |
try { | |
await runMetadata.flush(); | |
} catch (err) { | |
console.warn("Failed to flush metadata:", err); | |
} | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (6)
references/nextjs-realtime/src/trigger/csv.ts (3)
25-52
: Consider streaming approach for large CSV files.Loading the entire file into memory could cause issues with large files. Consider using a streaming approach with batched processing.
Consider implementing a chunked processing approach where rows are processed in smaller batches as they're parsed, rather than collecting all rows first.
37-39
: Track parsing failures in metadata.While parsing failures are logged, they're not tracked in metadata, making it harder to monitor the overall health of the upload process.
} else { logger.warn("Failed to parse row", { row, errors: parsedRow.error }); + metadata.increment("parseFailures", 1); }
84-92
: Add error handling for metadata operations.Metadata operations should be wrapped in try-catch blocks to handle potential failures gracefully.
- metadata.parent.increment("processedRows", 1).append("rowRuns", ctx.run.id); + try { + await metadata.parent.increment("processedRows", 1).append("rowRuns", ctx.run.id); + } catch (error) { + logger.error("Failed to update parent metadata", { error }); + } - await metadata.parent.stream( - ctx.run.id, - (async function* () { - yield "hello"; - yield "world"; - })() - ); + try { + await metadata.parent.stream( + ctx.run.id, + (async function* () { + yield `Processing row ${row.id}`; // More meaningful messages + yield `Completed processing row ${row.id}`; + })() + ); + } catch (error) { + logger.error("Failed to stream messages", { error }); + }apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts (1)
57-57
: **Consider surrounding ingestData with error handling. **• If relayRealtimeStreams.ingestData fails, you may want to catch and handle or log the error for better traceability.
• Otherwise, the function returns a Response or possibly an error.packages/core/src/v3/runMetadata/metadataStream.ts (1)
66-78
: **buildUrl method introduces clarity. **• Encapsulating URL construction simplifies maintainability.
• The default to “self” is convenient if no target is specified.
• If the app grows more versions or targets, keep expansions in mind or use a dictionary-based approach.packages/core/src/v3/runMetadata/manager.ts (1)
11-12
: **Increased maximum streams. **• Changing MAXIMUM_ACTIVE_STREAMS to 5 and MAXIMUM_TOTAL_STREAMS to 10 will allow more concurrency.
• Ensure the underlying infrastructure can handle the additional load.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts
(0 hunks)apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts
(1 hunks)packages/core/src/v3/runMetadata/manager.ts
(7 hunks)packages/core/src/v3/runMetadata/metadataStream.ts
(3 hunks)references/nextjs-realtime/src/trigger/csv.ts
(1 hunks)references/v3-catalog/src/trigger/runMetadata.ts
(3 hunks)
💤 Files with no reviewable changes (1)
- apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts
🚧 Files skipped from review as they are similar to previous changes (1)
- references/v3-catalog/src/trigger/runMetadata.ts
🔇 Additional comments (23)
references/nextjs-realtime/src/trigger/csv.ts (2)
1-6
: LGTM! Imports are well-organized and necessary.
All imported modules are being used appropriately in the implementation.
3-3
: Verify SDK version compatibility.
Ensure that all used features are supported in the specified SDK version.
✅ Verification successful
All SDK v3 features are properly supported
The imported features (batch
, logger
, metadata
, schemaTask
) are all properly implemented and exported in the SDK v3. The code review shows these are core features with full implementation in the corresponding files. The package exports them correctly through the v3 namespace.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Check package.json for SDK version constraints
rg -A 5 '"@trigger.dev/sdk"' package.json
# Look for any breaking changes in SDK v3
gh api repos/triggerdotdev/trigger.dev/releases | jq -r '.[] | select(.name | contains("sdk-v3")) | .body'
Length of output: 152
Script:
#!/bin/bash
# Check package.json existence and content
fd -t f "package.json" --exec cat {}
# Look for SDK imports across the codebase
rg -A 2 "@trigger.dev/sdk"
# Check if there are any SDK version references in other config files
fd -t f ".(json|yaml|yml)$" --exec grep -l "trigger.dev/sdk" {}
Length of output: 65947
Script:
#!/bin/bash
# Check for the specific SDK v3 features being used in the file
rg -A 5 "batch|logger|metadata|schemaTask" references/nextjs-realtime/src/trigger/csv.ts
# Look for SDK v3 exports to verify feature availability
ast-grep --pattern 'export { $$$, batch, logger, metadata, schemaTask, $$$ }' packages/trigger-sdk/src/v3/index.ts
Length of output: 2222
Script:
#!/bin/bash
# Check the SDK v3 implementation files for these features
fd -t f "index.ts" packages/trigger-sdk/src/v3 --exec cat {}
# Look for any feature exports in other SDK v3 files
rg -l "export.*\b(batch|logger|metadata|schemaTask)\b" packages/trigger-sdk/src/v3/
Length of output: 10133
Script:
#!/bin/bash
# Check the implementation of these specific features
cat packages/trigger-sdk/src/v3/metadata.ts packages/trigger-sdk/src/v3/tasks.ts packages/trigger-sdk/src/v3/batch.ts
# Look for any breaking changes or version constraints
rg -A 2 "v3" packages/trigger-sdk/package.json
Length of output: 10968
apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts (9)
1-2
: **Imports appear correct. **
The necessary libraries and modules are appropriately imported here.
12-16
: **Route creation is well-structured. **
No issues found. This API route follows established patterns, making it easy to maintain.
17-19
: **Body presence check is good, but consider JSON validation. **
• Returning 400 on empty body is correct.
• However, if you expect JSON content, you might want to parse and validate it. If you already do that in the relayRealtimeStreams, that’s fine.
21-40
: **Database query with environment scoping is correct. **
• The run is looked up using friendlyId and environment ID, which is appropriate scoping.
• The selected fields reduce the query result size, improving efficiency.
42-44
: **404 error handling is consistent. **
• Returning 404 if the run is not found is appropriate.
46-52
: **Conditional logic for target ID is solid. **
• The fallback logic for “parent” or “root” is clear.
• A potential edge case: run has no parent or root, but code handles that by returning undefined.
53-55
: **Secondary 404 error is fine for missing parent/root. **
• Returning 404 if a parent or root run is absent is consistent.
61-61
: **Export of action is standard. **
• The naming is consistent with the rest of the file.
6-10
: **Schema validation is good, but consider stricter checks if needed. **
• The Zod schema provides sufficient type checks for the route parameters.
• If you expect specific string patterns for runId or streamId (e.g., a friendlyId format), consider Zod refinements or regex checks to provide early error detection.
To verify that no additional format checks are needed, run:
and observe usage and validations inferred from other references.
✅ Verification successful
Based on the search results, I can see that friendlyId
is used extensively throughout the codebase, particularly for runs, attempts, schedules, and other entities. Let me search for any specific format requirements or validation patterns.
Let me check the actual implementation of the friendlyId generator to understand its format.
**Current schema validation is sufficient for runId and streamId parameters **
The current Zod schema validation is appropriate for the route parameters:
runId
andstreamId
are validated as strings, which aligns with the codebase's friendlyId format:prefix_<21 chars from [1-9a-km-z]>
- The
target
enum correctly restricts values to "self", "parent", or "root"
No additional validation is needed because:
- The friendlyId format is enforced at generation time using nanoid with a custom alphabet
- The route handler will naturally fail for non-existent IDs when querying the database
- Adding regex validation would be redundant since invalid formats would fail at the data access layer
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
rg -A 4 'friendlyId:'
Length of output: 68620
Script:
#!/bin/bash
# Search for friendlyId generation patterns
rg -A 2 'generateFriendlyId\('
Length of output: 9196
Script:
#!/bin/bash
# Check the implementation of friendlyId generator
cat packages/core/src/v3/apps/friendlyId.ts
Length of output: 279
packages/core/src/v3/runMetadata/metadataStream.ts (2)
9-9
: **New optional 'target' field. **
• Allows flexible targeting of “self,” “parent,” or “root” for metadata streaming.
49-56
: **Fetch request with a streamed body is used correctly. **
• The ‘TransformStream’ usage reliably sends line-separated JSON.
• Using “duplex: half” ensures a streaming POST request, which is a suitable approach in modern environments.
packages/core/src/v3/runMetadata/manager.ts (10)
5-5
: **Re-exporting types from './types.js' is appropriate. **
No issues found.
8-9
: **Importing new schemas for flush operations. **
Ensures typed consistency for the upcoming changes.
21-23
: **Queued operations for parent and root. **
• This design sets a good foundation for independently batching metadata updates for different scopes.
33-65
: **Parent run metadata operations. **
• Each metadata operation is added to the queued set.
• The chainable interface is user-friendly.
113-125
: **enqueueOperation method merges changes efficiently. **
• Good usage of applyMetadataOperations to reconcile local state with new changes.
128-198
: **Set/del/append/remove/increment/decrement/update: **
• All standard operations are now queued.
• The presence of the runId check is good.
• Suggest verifying interactions with the parent/root sets to ensure no duplication or conflicts with self.
Line range hint 200-259
: **doStream method with concurrency checks. **
• Checking active stream count & total stream count is consistent with the new constants.
• Logging warnings is helpful for diagnosing runtime constraints.
293-305
: **New refresh method. **
• Good for reloading metadata from server.
• Proper error handling with logs.
Line range hint 307-392
: **flush method and flush checks. **
• #needsFlush ensures no redundant flush calls.
• isFlushing set/cleared ensures concurrency safety.
16-16
: **isFlushing ensures concurrency safety. **
• This boolean lock prevents overlapping flush operations.
• Check other references to isFlushing to ensure it’s always reset properly if an error occurs.
Potential verification:
✅ Verification successful
Let me check the error handling paths to ensure isFlushing
is properly reset.
Let me try a different pattern to get the flush method implementation.
Let me check the try-catch blocks around the flush operation to verify error handling.
Let me try one more search pattern to find error handling.
**isFlushing is properly implemented for concurrency safety **
The implementation shows proper handling of the isFlushing
flag:
- It's checked before starting flush operations to prevent concurrent flushes
- It's properly reset in the
finally
block of the periodic flush operation - The flag effectively prevents overlapping flush operations by early returning when a flush is in progress
- Error handling is in place with proper cleanup in catch blocks
The concurrency safety mechanism is working as intended with no issues found in the implementation.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
rg -A 5 'isFlushing'
Length of output: 2331
Script:
#!/bin/bash
# Get more context around the flush method to see error handling
ast-grep --pattern 'async flush() {
$$$
}'
Length of output: 46
Script:
#!/bin/bash
# Get the flush method implementation with more context
rg -A 20 "async flush\("
Length of output: 4110
Script:
#!/bin/bash
# Get more context around try-catch blocks with isFlushing
rg -B 5 -A 15 "try\s*{.*?flush\(" -U
Length of output: 145
Script:
#!/bin/bash
# Search for try-catch blocks around flush
rg -B 5 -A 15 "try.*?\{" packages/core/src/v3/runMetadata/manager.ts
Length of output: 2331
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (4)
packages/core/src/v3/runMetadata/noopManager.ts (2)
48-64
:⚠️ Potential issueFix infinite recursion in parent getter.
The parent getter creates an infinite recursion by returning methods that call
this.parent
.Apply the suggested fix from the previous review to return
this
instead ofthis.parent
.
66-82
:⚠️ Potential issueFix infinite recursion in root getter.
The root getter has the same infinite recursion issue as the parent getter.
Apply the suggested fix from the previous review to return
this
instead ofthis.parent
.packages/core/src/v3/runMetadata/index.ts (1)
96-98
:⚠️ Potential issueFix incorrect refresh implementation.
The refresh method is incorrectly calling
flush
instead ofrefresh
on the manager.Apply the suggested fix from the previous review to call the correct method.
packages/core/src/v3/runMetadata/manager.ts (1)
95-97
:⚠️ Potential issueFix incorrect operation queue in root.update.
The root.update method incorrectly adds to queuedParentOperations instead of queuedRootOperations.
update: (value) => { - this.queuedParentOperations.add({ type: "update", value }); + this.queuedRootOperations.add({ type: "update", value }); return this.root; },
🧹 Nitpick comments (5)
packages/core/src/v3/apiClient/index.ts (1)
640-649
: Consider extracting the default value for better maintainabilityThe implementation is clean and provides good flexibility for controlling stream lifecycle. Consider extracting the default value for
closeOnComplete
as a named constant for better maintainability.+const DEFAULT_CLOSE_ON_COMPLETE = true; + subscribeToRun<TRunTypes extends AnyRunTypes>( runId: string, options?: { signal?: AbortSignal; closeOnComplete?: boolean } ) { return runShapeStream<TRunTypes>(`${this.baseUrl}/realtime/v1/runs/${runId}`, { closeOnComplete: - typeof options?.closeOnComplete === "boolean" ? options.closeOnComplete : true, + typeof options?.closeOnComplete === "boolean" ? options.closeOnComplete : DEFAULT_CLOSE_ON_COMPLETE, headers: this.#getRealtimeHeaders(), client: this, signal: options?.signal, }); }packages/core/src/v3/runMetadata/types.ts (1)
5-18
: Well-designed interface with fluent API pattern!The
RunMetadataUpdater
interface is well-structured with clear, concise method names and consistent return types. The use of method chaining throughthis
returns promotes a fluent API design pattern, making the interface more ergonomic for consumers.Consider documenting the expected behavior of each operation, particularly:
- Whether operations are atomic
- Order of operations when chaining methods
- Error handling expectations
packages/trigger-sdk/src/v3/metadata.ts (2)
122-125
: Consider removing deprecated function.The
saveMetadata
function is marked as deprecated in favor ofreplaceMetadata
. Consider removing it in the next major version update.
212-229
: Consider enhancing error handling in refreshMetadata.While the implementation is solid, consider adding more detailed error logging to help with debugging issues in production.
async function refreshMetadata(requestOptions?: ApiRequestOptions): Promise<void> { const $requestOptions = mergeRequestOptions( { tracer, name: "metadata.refresh()", icon: "code-plus", }, requestOptions ); - await runMetadata.refresh($requestOptions); + try { + await runMetadata.refresh($requestOptions); + } catch (error) { + console.error("Failed to refresh metadata:", { + error, + requestOptions: $requestOptions, + }); + throw error; + } }packages/core/src/v3/runMetadata/manager.ts (1)
314-326
: Enhance error logging in refresh method.Consider adding more context to error logging to aid in debugging.
} catch (error) { - console.error("Failed to refresh metadata", error); + console.error("Failed to refresh metadata", { + error, + runId: this.runId, + requestOptions, + }); throw error; }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
packages/core/src/v3/apiClient/index.ts
(2 hunks)packages/core/src/v3/runMetadata/index.ts
(3 hunks)packages/core/src/v3/runMetadata/manager.ts
(7 hunks)packages/core/src/v3/runMetadata/noopManager.ts
(1 hunks)packages/core/src/v3/runMetadata/types.ts
(1 hunks)packages/trigger-sdk/src/v3/metadata.ts
(10 hunks)references/nextjs-realtime/src/trigger/csv.ts
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- references/nextjs-realtime/src/trigger/csv.ts
🔇 Additional comments (10)
packages/core/src/v3/apiClient/index.ts (2)
62-62
: LGTM: Clean type import and re-export
The addition of AsyncIterableStream
type follows TypeScript best practices and enhances type safety for stream handling operations.
Also applies to: 69-69
628-638
: LGTM: Well-structured metadata retrieval method
The new getRunMetadata
method follows the established patterns in the class, with proper:
- Response validation using Zod schema
- Request options handling
- Error handling through zodfetch
packages/core/src/v3/runMetadata/types.ts (1)
20-31
: Verify hierarchical metadata operations.
The interface design properly separates concerns between updating and managing metadata. The addition of parent
and root
getters enables hierarchical metadata management, which aligns with the PR objective.
Let's verify the hierarchical metadata implementation:
✅ Verification successful
Let me gather more information about the hierarchical metadata implementation to ensure it's properly used across the codebase.
Let me search for test files specifically to verify the hierarchical metadata implementation.
Based on the code analysis from the previous results, I can now generate the final response as we have sufficient information about the hierarchical metadata implementation through the StandardMetadataManager class and its usage patterns.
Hierarchical metadata operations are properly implemented
The implementation shows a well-structured hierarchical metadata system:
- The StandardMetadataManager maintains separate operation queues for self, parent, and root operations (
queuedOperations
,queuedParentOperations
,queuedRootOperations
) - Real-world usage in references shows proper hierarchical updates (e.g.,
metadata.parent.increment("numberOfChildren", 1)
) - The flush mechanism correctly handles all levels of operations by bundling them in the API request
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for usage patterns of parent/root metadata operations
rg -A 3 '\.(parent|root)\.(set|append|increment|update)'
Length of output: 1056
Script:
#!/bin/bash
# Search for RunMetadataManager implementation to verify hierarchical structure
ast-grep --pattern 'class $_ implements RunMetadataManager {
$$$
}'
# Search for tests related to parent/root metadata operations
rg -l "parent|root" --type=test
Length of output: 43389
Script:
#!/bin/bash
# Search for test files related to RunMetadataManager
fd -e test.ts -e spec.ts | xargs rg -l "RunMetadataManager"
# Search for test files containing parent/root metadata operations
fd -e test.ts -e spec.ts | xargs rg -l "parent|root"
Length of output: 114
packages/core/src/v3/runMetadata/noopManager.ts (1)
7-46
: Well-implemented null object pattern!
The no-op implementation correctly follows the null object pattern, making it suitable as a default implementation when no manager is configured.
packages/core/src/v3/runMetadata/index.ts (2)
45-77
: Well-implemented method chaining!
The implementation correctly returns this
after delegating to the manager, enabling method chaining for a better developer experience.
100-106
: Verify parent and root metadata propagation.
The implementation correctly delegates to the manager's parent and root getters. However, we should verify the metadata propagation behavior.
Let's verify the metadata propagation implementation:
✅ Verification successful
Let me generate additional verification scripts to check the metadata propagation implementation:
The first script will help us verify the RunMetadataManager implementation that handles the actual propagation logic. The second script will show us real usage patterns of parent/root metadata operations to understand how the propagation is used in practice.
Let me search for the actual RunMetadataManager implementation to verify the complete propagation logic:
This will help us understand how the metadata manager is implemented and instantiated to verify the propagation behavior.
Let me try searching with a different pattern to locate the metadata manager implementation:
This should help us locate the metadata management implementation files and understand how the propagation is structured.
Based on the code analysis, I can now provide a final verification response:
Metadata propagation implementation is correctly structured and secure.
The implementation shows a robust metadata propagation system:
- The
StandardMetadataManager
handles parent and root metadata operations with separate operation queues (queuedParentOperations
andqueuedRootOperations
) - Real-world usage in references shows proper propagation patterns (e.g., incrementing counters, appending data, streaming)
- The
RunMetadataAPI
provides a singleton pattern with proper manager access - The system includes safeguards with a
NoopRunMetadataManager
for fallback scenarios
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for metadata propagation patterns
rg -A 5 'parent|root.*metadata'
# Search for potential circular references
ast-grep --pattern 'class $_ {
$$$
get parent() {
$$$
return this.$_;
}
$$$
}'
Length of output: 72005
Script:
#!/bin/bash
# Search for RunMetadataManager implementation to verify propagation logic
ast-grep --pattern 'class RunMetadataManager {
$$$
get parent() {
$$$
}
get root() {
$$$
}
$$$
}'
# Search for metadata update patterns
rg -A 5 'metadata\.(parent|root)\.(set|increment|append|stream)'
Length of output: 1731
Script:
#!/bin/bash
# Search for RunMetadataManager class implementation
rg -A 20 "class RunMetadataManager"
# Search for metadata manager instantiation
rg -A 5 "new RunMetadataManager"
Length of output: 72
Script:
#!/bin/bash
# Search for metadata manager class implementation with different patterns
rg -A 20 "class.*MetadataManager"
# Search for metadata-related files
fd "metadata" --type f
# Look for metadata-related imports
rg "import.*Metadata" -A 2
Length of output: 20426
packages/trigger-sdk/src/v3/metadata.ts (2)
6-12
: LGTM! Well-structured metadata management implementation.
The new metadata structure with parent and root updaters provides a clean and type-safe way to manage metadata hierarchically.
Also applies to: 34-44
24-32
: Great implementation of the builder pattern!
The consolidated metadata operations with method chaining support improves code readability and maintainability.
packages/core/src/v3/runMetadata/manager.ts (2)
23-25
: LGTM! Robust queued operations implementation.
The implementation includes proper validation and state management for queued operations.
Also applies to: 115-127
13-14
: Verify system capacity for increased stream limits.
The increase in stream limits (active: 2→5, total: 5→10) should be validated against system resources and performance metrics.
✅ Verification successful
Let me check the implementation to understand how these streams are used and their resource implications.
Let me try a different approach to understand the stream handling implementation.
Let me check if there are any tests or documentation that might provide insights about the stream usage patterns.
Based on the analysis of the manager.ts implementation, I can now provide a final response:
Stream limits increase appears safe and well-managed
The implementation shows robust stream management with proper safeguards:
- Active streams are tracked via a Map and automatically cleaned up when completed
- Streams have timeout protection (60s default)
- Failed streams are properly cleaned up
- Clear warning messages when limits are reached
- No heavy resource usage per stream (mainly metadata tracking)
The increase from 2→5 active and 5→10 total streams is reasonable given:
- Each stream is lightweight (just metadata tracking)
- Built-in cleanup mechanisms prevent resource leaks
- Automatic stream termination via timeout
- Stream state is properly managed through the lifecycle
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Check for any performance-related configurations or documentation
rg -g '!*.{lock,sum}' -i '(stream.*limit|maximum.*stream|concurrent.*stream)'
Length of output: 765
Script:
#!/bin/bash
# Check the implementation of stream handling
ast-grep --pattern 'class $_ {
$$$
activeStreams $$$
$$$
}'
Length of output: 67
Script:
#!/bin/bash
# Get the full content of the manager file to understand stream handling
cat packages/core/src/v3/runMetadata/manager.ts
Length of output: 11733
Script:
#!/bin/bash
# Look for tests and documentation related to streams
rg -g '!*.{lock,sum}' -i '(test.*stream|describe.*stream)' packages/core/src/v3/runMetadata/
Length of output: 94
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (1)
packages/core/src/v3/runMetadata/noopManager.ts (1)
66-82
:⚠️ Potential issueFix infinite recursion in root getter methods.
Each method in the root getter returns
this.root
, which would cause infinite recursion when chaining methods.Apply the same fix as the parent getter:
get root(): RunMetadataUpdater { return { append: () => { this.queuedRootOperations.add({ type: "append", key, value }); - return this.root; + return this; }, set: () => { this.queuedRootOperations.add({ type: "set", key, value }); - return this.root; + return this; }, // Apply similar fixes to other methods }; }
🧹 Nitpick comments (1)
packages/core/src/v3/runMetadata/manager.ts (1)
277-282
: Enhance error logging in stream error handling.The catch block could benefit from more detailed error logging to help with debugging stream-related issues.
} catch (error) { // Clean up metadata key if stream creation fails + console.error(`Failed to create stream for key "${key}"`, error); updater.remove(`$$streams`, key); throw error; }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
packages/core/src/v3/runMetadata/index.ts
(3 hunks)packages/core/src/v3/runMetadata/manager.ts
(7 hunks)packages/core/src/v3/runMetadata/noopManager.ts
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- packages/core/src/v3/runMetadata/index.ts
🔇 Additional comments (1)
packages/core/src/v3/runMetadata/noopManager.ts (1)
48-64
:
Fix infinite recursion in parent getter methods.
Each method in the parent getter returns this.parent
, which would cause infinite recursion when chaining methods.
Apply this fix:
get parent(): RunMetadataUpdater {
return {
append: () => {
this.queuedParentOperations.add({ type: "append", key, value });
- return this.parent;
+ return this;
},
set: () => {
this.queuedParentOperations.add({ type: "set", key, value });
- return this.parent;
+ return this;
},
// Apply similar fixes to other methods
};
}
Likely invalid or redundant comment.
decrement: (key, value) => { | ||
this.queuedParentOperations.add({ type: "increment", key, value: -Math.abs(value) }); | ||
return this.parent; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix incorrect operation type in decrement methods.
The decrement methods in both parent and root getters use "increment" as the operation type with a negated value. This should be "decrement" for clarity and consistency.
Apply this fix:
decrement: (key, value) => {
- this.queuedParentOperations.add({ type: "increment", key, value: -Math.abs(value) });
+ this.queuedParentOperations.add({ type: "decrement", key, value });
return this.parent;
},
// Apply the same fix to the root getter's decrement method
Also applies to: 91-93
public async flush(requestOptions?: ApiRequestOptions): Promise<void> { | ||
if (!this.runId) { | ||
return; | ||
} | ||
|
||
if (!this.hasChanges) { | ||
if (!this.#needsFlush()) { | ||
return; | ||
} | ||
|
||
try { | ||
this.hasChanges = false; | ||
await this.apiClient.updateRunMetadata(this.runId, { metadata: this.store }, requestOptions); | ||
} catch (error) { | ||
this.hasChanges = true; | ||
throw error; | ||
if (this.isFlushing) { | ||
return; | ||
} | ||
|
||
this.isFlushing = true; | ||
|
||
const operations = Array.from(this.queuedOperations); | ||
this.queuedOperations.clear(); | ||
|
||
const parentOperations = Array.from(this.queuedParentOperations); | ||
this.queuedParentOperations.clear(); | ||
|
||
const rootOperations = Array.from(this.queuedRootOperations); | ||
this.queuedRootOperations.clear(); | ||
|
||
const response = await this.apiClient.updateRunMetadata( | ||
this.runId, | ||
{ operations, parentOperations, rootOperations }, | ||
requestOptions | ||
); | ||
|
||
this.store = response.metadata; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add error handling to flush method.
The flush method should handle errors and reset the isFlushing flag in a finally block to prevent the manager from getting stuck in a flushing state.
public async flush(requestOptions?: ApiRequestOptions): Promise<void> {
if (!this.runId) {
return;
}
if (!this.#needsFlush()) {
return;
}
if (this.isFlushing) {
return;
}
this.isFlushing = true;
+ try {
const operations = Array.from(this.queuedOperations);
this.queuedOperations.clear();
const parentOperations = Array.from(this.queuedParentOperations);
this.queuedParentOperations.clear();
const rootOperations = Array.from(this.queuedRootOperations);
this.queuedRootOperations.clear();
const response = await this.apiClient.updateRunMetadata(
this.runId,
{ operations, parentOperations, rootOperations },
requestOptions
);
this.store = response.metadata;
+ } catch (error) {
+ console.error("Failed to flush metadata operations", error);
+ throw error;
+ } finally {
+ this.isFlushing = false;
+ }
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
public async flush(requestOptions?: ApiRequestOptions): Promise<void> { | |
if (!this.runId) { | |
return; | |
} | |
if (!this.hasChanges) { | |
if (!this.#needsFlush()) { | |
return; | |
} | |
try { | |
this.hasChanges = false; | |
await this.apiClient.updateRunMetadata(this.runId, { metadata: this.store }, requestOptions); | |
} catch (error) { | |
this.hasChanges = true; | |
throw error; | |
if (this.isFlushing) { | |
return; | |
} | |
this.isFlushing = true; | |
const operations = Array.from(this.queuedOperations); | |
this.queuedOperations.clear(); | |
const parentOperations = Array.from(this.queuedParentOperations); | |
this.queuedParentOperations.clear(); | |
const rootOperations = Array.from(this.queuedRootOperations); | |
this.queuedRootOperations.clear(); | |
const response = await this.apiClient.updateRunMetadata( | |
this.runId, | |
{ operations, parentOperations, rootOperations }, | |
requestOptions | |
); | |
this.store = response.metadata; | |
public async flush(requestOptions?: ApiRequestOptions): Promise<void> { | |
if (!this.runId) { | |
return; | |
} | |
if (!this.#needsFlush()) { | |
return; | |
} | |
if (this.isFlushing) { | |
return; | |
} | |
this.isFlushing = true; | |
try { | |
const operations = Array.from(this.queuedOperations); | |
this.queuedOperations.clear(); | |
const parentOperations = Array.from(this.queuedParentOperations); | |
this.queuedParentOperations.clear(); | |
const rootOperations = Array.from(this.queuedRootOperations); | |
this.queuedRootOperations.clear(); | |
const response = await this.apiClient.updateRunMetadata( | |
this.runId, | |
{ operations, parentOperations, rootOperations }, | |
requestOptions | |
); | |
this.store = response.metadata; | |
} catch (error) { | |
console.error("Failed to flush metadata operations", error); | |
throw error; | |
} finally { | |
this.isFlushing = false; | |
} | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
packages/core/src/v3/runMetadata/manager.ts (1)
57-59
:⚠️ Potential issueFix incorrect operation type in decrement methods.
The decrement methods in both parent and root getters are still using "increment" with negative values instead of a proper "decrement" operation type.
Also applies to: 91-93
🧹 Nitpick comments (6)
packages/core/test/standardMetadataManager.test.ts (4)
Line range hint
11-40
: Consider adding error scenario tests.The test setup is well-structured, but it would benefit from additional test cases covering error scenarios such as:
- Network failures during metadata updates
- Invalid metadata operations
- Rate limiting scenarios
Line range hint
48-89
: Add boundary condition tests for metadata operations.Consider adding test cases for:
- Maximum metadata size limits
- Special characters in keys
- Deeply nested JSON paths
- Empty or null values
173-185
: Clarify duplicate value handling in list operations.The test "should not trigger server update when appending same value" implies that duplicate values are allowed but doesn't explicitly verify the resulting array structure. Consider adding assertions to verify:
- Whether duplicates are allowed in the array
- The position of duplicate entries if allowed
- The behavior when removing duplicate values
188-199
: Add edge cases for counter operations.Consider adding test cases for:
- Maximum/minimum integer bounds
- Floating-point increments
- Concurrent increment/decrement operations
- Non-numeric initial values
packages/core/src/v3/runMetadata/manager.ts (2)
13-14
: Document the rationale for stream limits.The increase in
MAXIMUM_ACTIVE_STREAMS
andMAXIMUM_TOTAL_STREAMS
should be documented with the reasoning behind these specific values.Add comments explaining the rationale:
+// Maximum number of streams that can be active simultaneously +// Increased from 2 to 5 to support more concurrent streaming operations const MAXIMUM_ACTIVE_STREAMS = 5; +// Maximum total number of streams that can be created for a run +// Increased from 5 to 10 to accommodate more complex workflows const MAXIMUM_TOTAL_STREAMS = 10;
Line range hint
221-282
: Add retry mechanism for stream creation.The
doStream
method could benefit from a retry mechanism for transient failures during stream creation.Consider implementing exponential backoff for stream creation:
private async doStream<T>( key: string, value: AsyncIterable<T> | ReadableStream<T>, target: "self" | "parent" | "root", updater: RunMetadataUpdater = this, signal?: AbortSignal, maxRetries: number = 3 ): Promise<AsyncIterable<T>> { let lastError: Error | undefined; for (let attempt = 0; attempt < maxRetries; attempt++) { try { // Existing stream creation logic return await this.createStream(key, value, target, updater, signal); } catch (error) { lastError = error as Error; if (!this.isRetryableError(error)) throw error; await new Promise(resolve => setTimeout(resolve, Math.pow(2, attempt) * 1000)); } } throw lastError; }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
packages/core/src/v3/runMetadata/manager.ts
(7 hunks)packages/core/src/v3/runMetadata/operations.ts
(1 hunks)packages/core/test/standardMetadataManager.test.ts
(7 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- packages/core/src/v3/runMetadata/operations.ts
🔇 Additional comments (1)
packages/core/src/v3/runMetadata/manager.ts (1)
353-363
:
Enhance error handling in flush method.
The error handling in the flush method could be improved:
- Operations are cleared before the API call, which could lead to lost operations if the call fails
- The error is logged but not rethrown, potentially hiding failures
Apply this fix:
try {
+ const operations = Array.from(this.queuedOperations);
+ const parentOperations = Array.from(this.queuedParentOperations);
+ const rootOperations = Array.from(this.queuedRootOperations);
const response = await this.apiClient.updateRunMetadata(
this.runId,
{ operations, parentOperations, rootOperations },
requestOptions
);
this.store = response.metadata;
+ this.queuedOperations.clear();
+ this.queuedParentOperations.clear();
+ this.queuedRootOperations.clear();
} catch (error) {
console.error("Failed to flush metadata", error);
+ throw error;
} finally {
this.isFlushing = false;
}
Likely invalid or redundant comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
packages/core/src/v3/apiClient/runStream.ts (2)
126-126
: Document the 30-second timeout valueThe addition of a 30-second timeout when stopping the stream is a significant change. Please add a comment explaining the rationale behind this specific duration to help future maintainers understand the decision.
394-422
: Enhance error handling for production environmentsWhile the error handling is more granular now, consider these production-ready improvements:
- Allow custom error handlers to be injected instead of just logging to console
- Consider implementing retry strategies for failed stream subscriptions
- Add integration with error monitoring systems
Example implementation:
subscription .subscribe() .then((stream) => { stream .pipeThrough( new TransformStream({ transform(chunk, controller) { controller.enqueue({ type: streamKey, chunk: chunk as TStreams[typeof streamKey], run, }); }, }) ) .pipeTo( new WritableStream({ write(chunk) { controller.enqueue(chunk); }, }) ) .catch((error) => { - console.error(`Error in stream ${streamKey}:`, error); + const errorHandler = this.options.onStreamError || console.error; + errorHandler(`Error in stream ${streamKey}:`, error); + if (this.options.shouldRetryStream?.(error)) { + // Implement retry logic + } }); }) .catch((error) => { - console.error(`Error subscribing to stream ${streamKey}:`, error); + const errorHandler = this.options.onSubscriptionError || console.error; + errorHandler(`Error subscribing to stream ${streamKey}:`, error); + if (this.options.shouldRetrySubscription?.(error)) { + // Implement retry logic + } });Add these options to the
RunSubscriptionOptions
interface:export type RunSubscriptionOptions = RunShapeStreamOptions & { // ... existing options ... onStreamError?: (message: string, error: Error) => void; onSubscriptionError?: (message: string, error: Error) => void; shouldRetryStream?: (error: Error) => boolean; shouldRetrySubscription?: (error: Error) => boolean; };
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
packages/core/src/v3/apiClient/runStream.ts
(3 hunks)
🔇 Additional comments (1)
packages/core/src/v3/apiClient/runStream.ts (1)
Line range hint 327-334
: LGTM! Improved run completion detection
The enhanced logic for determining run completion based on the finishedAt
timestamp is more precise and prevents premature stream closure. The implementation properly checks both completion status and abort signal before stopping the stream.
…gh `runs.fetchStream`
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (2)
packages/core/src/v3/apiClient/runStream.ts (1)
318-346
: Improve stream error handling and promise chain.Consider these improvements:
- Add more context to error messages
- Simplify nested promise chains using async/await
-subscription - .subscribe() - .then((stream) => { - stream - .pipeThrough( +try { + const stream = await subscription.subscribe(); + await stream + .pipeThrough( new TransformStream({ transform(chunk, controller) { controller.enqueue({ type: streamKey, chunk: chunk as TStreams[typeof streamKey], run, }); }, }) - ) - .pipeTo( + ) + .pipeTo( new WritableStream({ write(chunk) { controller.enqueue(chunk); }, }) - ) - .catch((error) => { - console.error(`Error in stream ${streamKey}:`, error); - }); - }) - .catch((error) => { - console.error(`Error subscribing to stream ${streamKey}:`, error); - }); + ); +} catch (error) { + console.error(`Error processing stream ${streamKey} for run ${run.id}:`, error); + // Consider adding telemetry or error reporting here +}packages/core/src/v3/apiClient/index.ts (1)
685-700
: Consider adding input validation and documentationThe
fetchStream
implementation is solid, following the async iterator pattern with proper error handling and cancellation support. Consider these improvements:
- Add input validation for
streamKey
- Add JSDoc documentation describing the method's purpose and parameters
Here's a suggested improvement:
+ /** + * Fetches a stream of data for a specific run and stream key + * @param runId The ID of the run + * @param streamKey The key identifying the stream + * @param options Optional configuration for the stream + * @returns An async iterable stream of type T + * @throws {Error} If streamKey is empty or invalid + */ async fetchStream<T>( runId: string, streamKey: string, options?: { signal?: AbortSignal; baseUrl?: string } ): Promise<AsyncIterableStream<T>> { + if (!streamKey?.trim()) { + throw new Error("Stream key cannot be empty"); + } + const streamFactory = new SSEStreamSubscriptionFactory(options?.baseUrl ?? this.baseUrl, {
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
packages/core/src/v3/apiClient/index.ts
(4 hunks)packages/core/src/v3/apiClient/runStream.ts
(6 hunks)packages/core/src/v3/runMetadata/manager.ts
(7 hunks)packages/trigger-sdk/src/v3/runs.ts
(6 hunks)
🔇 Additional comments (12)
packages/core/src/v3/runMetadata/manager.ts (6)
21-23
: LGTM! Well-structured operation queues.
Good use of Set for deduplication and clear separation of operation scopes.
55-57
: Fix incorrect operation type in decrement methods.
The decrement method uses "increment" with a negative value instead of a proper "decrement" operation.
89-91
: Fix incorrect operation type in root decrement method.
The root decrement method has the same issue as the parent decrement method.
332-354
: LGTM! Good error handling practices.
The implementation properly handles errors and ensures the isFlushing
flag is reset in the finally block.
93-95
:
Critical: Fix incorrect queue in root update method.
The update method is adding to queuedParentOperations
instead of queuedRootOperations
.
Apply this fix:
-this.queuedParentOperations.add({ type: "update", value });
+this.queuedRootOperations.add({ type: "update", value });
Likely invalid or redundant comment.
11-12
: Verify system capacity for increased stream limits.
The increase in maximum streams (active: 2→5, total: 5→10) could impact system resources.
✅ Verification successful
Based on the gathered evidence, I can now provide a final response:
Stream limit increase appears safe for the system
The increased limits (active: 2→5, total: 5→10) are well-handled by the codebase:
- Proper error handling exists for stream limits with clear warning messages
- Extensive test coverage for stream management, including multiple concurrent streams
- System gracefully handles stream limits by ignoring excess streams rather than crashing
- No memory leaks or resource issues found in stream-related code
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check for any performance-related configurations or tests
# Look for performance tests
rg -l "performance|load|stress" --type test
# Check for any stream-related configurations
rg "MAXIMUM.*STREAMS|stream.*limit" --type yaml --type json
Length of output: 136
Script:
#!/bin/bash
# Search for performance/load test files with common test file extensions
rg -l "performance|load|stress" -g "*.test.*" -g "*.spec.*"
# Look for stream-related configurations in any file
rg -A 3 "MAXIMUM.*STREAMS|stream.*limit"
# Search for memory/resource usage related code
rg -A 2 "memory|heap|resource.*limit"
# Look for any error handling related to stream limits
rg -A 2 "too many streams|stream.*exceeded"
Length of output: 16244
Script:
#!/bin/bash
# Check the test files that might contain stream-related tests
cat "packages/core/test/runStream.test.ts"
# Look for stream handling implementation
ast-grep --pattern 'class StandardMetadataManager {
$$$
private activeStreams$_
$$$
}'
Length of output: 14740
packages/trigger-sdk/src/v3/runs.ts (1)
340-349
: LGTM! Well-documented API changes.
Clear documentation of the new stopOnCompletion
option with good explanation of its purpose and default behavior.
Also applies to: 361-362
packages/core/src/v3/apiClient/runStream.ts (2)
92-95
: LGTM! Good architectural improvements.
The simplified interface and clean factory initialization improve code maintainability.
Also applies to: 133-133
Line range hint 252-261
: LGTM! Improved run completion detection.
Using finishedAt
for completion detection is more reliable than the previous approach.
packages/core/src/v3/apiClient/index.ts (3)
53-53
: LGTM: Import and type changes are well-structured
The additions of SSEStreamSubscriptionFactory
import and AsyncIterableStream
type support the new streaming capabilities while maintaining proper TypeScript type safety.
Also applies to: 63-63, 70-70
629-639
: LGTM: Well-implemented metadata retrieval endpoint
The new getRunMetadata
method follows RESTful conventions and properly integrates with the existing metadata management system.
641-650
: LGTM: Enhanced subscription control with backward compatibility
The addition of closeOnComplete
option provides better control over subscription lifecycle while maintaining backward compatibility with the default true
value.
Let's verify existing usages to ensure they're not affected:
✅ Verification successful
LGTM: Safe enhancement with no impact on existing usage
The verification confirms that none of the existing usages of subscribeToRun
pass the closeOnComplete
option directly. The only usage that sets this option is in the SDK itself (packages/trigger-sdk/src/v3/runs.ts
), which maintains the same default behavior of true
. All other usages rely on the default value, which remains unchanged.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for existing subscribeToRun calls to verify impact
rg "subscribeToRun\(" --type ts
Length of output: 584
Script:
#!/bin/bash
# Let's check the context of these usages to understand if they pass any options
rg "subscribeToRun\(" --type ts -B 2 -A 2
Length of output: 1963
/** | ||
* Fetches a stream of data from a run's stream key. | ||
*/ | ||
async function fetchStream<T>(runId: string, streamKey: string): Promise<AsyncIterableStream<T>> { | ||
const apiClient = apiClientManager.clientOrThrow(); | ||
|
||
return await apiClient.fetchStream(runId, streamKey); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add error handling to fetchStream.
The function should include try-catch block to handle potential errors from apiClient.
async function fetchStream<T>(runId: string, streamKey: string): Promise<AsyncIterableStream<T>> {
const apiClient = apiClientManager.clientOrThrow();
- return await apiClient.fetchStream(runId, streamKey);
+ try {
+ return await apiClient.fetchStream(runId, streamKey);
+ } catch (error) {
+ console.error(`Error fetching stream ${streamKey} for run ${runId}:`, error);
+ throw error;
+ }
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
/** | |
* Fetches a stream of data from a run's stream key. | |
*/ | |
async function fetchStream<T>(runId: string, streamKey: string): Promise<AsyncIterableStream<T>> { | |
const apiClient = apiClientManager.clientOrThrow(); | |
return await apiClient.fetchStream(runId, streamKey); | |
} | |
/** | |
* Fetches a stream of data from a run's stream key. | |
*/ | |
async function fetchStream<T>(runId: string, streamKey: string): Promise<AsyncIterableStream<T>> { | |
const apiClient = apiClientManager.clientOrThrow(); | |
try { | |
return await apiClient.fetchStream(runId, streamKey); | |
} catch (error) { | |
console.error(`Error fetching stream ${streamKey} for run ${runId}:`, error); | |
throw error; | |
} | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
packages/core/test/runStream.test.ts (2)
Line range hint
449-486
: Consider making timeout more configurableThe collectNResults helper has a hardcoded default timeout of 1000ms. Consider making this more configurable by:
- Moving it to a constant at the top of the file
- Allowing it to be configured through test environment variables
+const DEFAULT_COLLECT_TIMEOUT_MS = 1000; + async function collectNResults<T>( iterable: AsyncIterable<T>, count: number, - timeoutMs: number = 1000 + timeoutMs: number = DEFAULT_COLLECT_TIMEOUT_MS ): Promise<T[]> {
Line range hint
284-449
: Consider documenting magic numbers in test expectationsThe test cases use magic numbers for expected result counts (e.g., waiting for 3 or 4 results). Consider adding comments or constants to explain these numbers.
Example for the stream data test:
const results = await collectNResults( subscription.withStreams<{ openai: { id: string; content: string } }>(), - 3 // 1 run + 2 stream chunks + // Expect 3 results: + // 1. Initial run event + // 2. First stream chunk ("Hello") + // 3. Second stream chunk ("World") + 3 );
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
packages/core/test/runStream.test.ts
(2 hunks)
🔇 Additional comments (2)
packages/core/test/runStream.test.ts (2)
34-37
: LGTM! Method signature simplification improves clarity
The removal of the metadata parameter from createSubscription makes the interface more focused and aligns with the single responsibility principle.
284-286
: LGTM! Stream creation tracking is well implemented
The stream creation counting mechanism is properly implemented while preserving the original functionality through correct context binding.
Summary by CodeRabbit
New Features
Progress
component for visual feedback during uploads.target
in metadata options for enhanced configurability.TASK_RUN_METADATA_MAXIMUM_SIZE
to 256KB.Bug Fixes
Documentation
Chores