Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to update parent and root run metadata from children #1563

Open
wants to merge 13 commits into
base: main
Choose a base branch
from

Conversation

ericallam
Copy link
Member

@ericallam ericallam commented Dec 13, 2024

Summary by CodeRabbit

  • New Features

    • Increased maximum active streams from 2 to 5 and total streams from 5 to 10.
    • Added functionality to update parent run metadata from child runs/tasks.
    • Introduced a new CSV upload route and processing capabilities for real-time uploads.
    • Enhanced subscription options to control behavior upon run completion.
    • Added a new Progress component for visual feedback during uploads.
    • New optional property target in metadata options for enhanced configurability.
    • Updated TASK_RUN_METADATA_MAXIMUM_SIZE to 256KB.
    • Added a new method to fetch streams of data from a run's stream key.
  • Bug Fixes

    • Improved error handling and logging during task execution and metadata updates.
  • Documentation

    • Updated documentation for new subscription options and CSV processing features.
  • Chores

    • Updated package dependencies and configuration files for improved functionality.

Copy link

changeset-bot bot commented Dec 13, 2024

🦋 Changeset detected

Latest commit: 203d7a0

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 12 packages
Name Type
trigger.dev Patch
@trigger.dev/react-hooks Patch
@trigger.dev/sdk Patch
@trigger.dev/core Patch
references-nextjs-realtime Patch
@trigger.dev/build Patch
@internal/redis-worker Patch
@internal/zod-worker Patch
@trigger.dev/rsc Patch
@trigger.dev/database Patch
@trigger.dev/otlp-importer Patch
@internal/testcontainers Patch

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

Copy link
Contributor

coderabbitai bot commented Dec 13, 2024

Warning

There were issues while running some tools. Please review the errors and either fix the tool’s configuration or disable the tool if it’s a critical failure.

🔧 eslint

If the error stems from missing dependencies, add them to the package.json file. For unrecoverable errors (e.g., due to private dependencies), disable the tool in the CodeRabbit configuration.

packages/core/test/runStream.test.ts

Oops! Something went wrong! :(

ESLint: 8.45.0

ESLint couldn't find the config "custom" to extend from. Please check that the name of the config is correct.

The config "custom" was referenced from the config file in "/.eslintrc.js".

If you still have problems, please stop by https://eslint.org/chat/help to chat with the team.

Walkthrough

This pull request introduces updates to the configuration of active and total streams within the "trigger.dev" patch, increasing the number of active streams from 2 to 5 and the total number of streams from 5 to 10. Additionally, it enhances the StandardMetadataManager class with new properties and methods for managing queued operations, refines API routes for metadata updates, and introduces new types and schemas for metadata operations. These changes collectively improve the handling and tracking of metadata across the application.

Changes

File Change Summary
.changeset/giant-mice-cheer.md Configuration update for stream handling
packages/core/src/v3/runMetadata/manager.ts Increased stream limits, added queued operations management
apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts Simplified metadata update route using new service
apps/webapp/app/services/metadata/updateMetadata.server.ts New UpdateMetadataService for handling metadata updates
packages/core/src/v3/runMetadata/index.ts Updated method names and added new metadata management methods
internal-packages/database/prisma/schema.prisma Added metadataVersion to TaskRun model
apps/webapp/app/env.server.ts Increased metadata size limit and added flush interval
apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts New API route for handling real-time stream ingestion
packages/core/src/v3/runMetadata/metadataStream.ts Enhanced metadata options with target specification
packages/core/src/v3/schemas/common.ts Introduced new operations for run metadata management

Possibly related PRs

Poem

🐰 Metadata Hops and Streams Unfold
In Trigger's realm, where data grows bold,
Streams expand, from two to five,
Metadata dances, now more alive!
Rabbit's code leaps with joyful might,
Tracking runs from morning to night! 🚀

Tip

CodeRabbit's docstrings feature is now available as part of our Early Access Program! Simply use the command @coderabbitai generate docstrings to have CodeRabbit automatically generate docstrings for your pull request. We would love to hear your feedback on Discord.


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (1)
packages/core/src/v3/runMetadata/manager.ts (1)

Line range hint 246-254: Enhance warning messages with more actionable information

The warning messages could be more helpful by suggesting what users should do when they hit these limits.

Consider updating the warning messages to be more actionable:

-      console.warn(
-        `Exceeded the maximum number of active streams (${MAXIMUM_ACTIVE_STREAMS}). The "${key}" stream will be ignored.`
-      );
+      console.warn(
+        `Exceeded the maximum number of active streams (${MAXIMUM_ACTIVE_STREAMS}). The "${key}" stream will be ignored. ` +
+        `Wait for some existing streams to complete before creating new ones.`
+      );

-      console.warn(
-        `Exceeded the maximum number of total streams (${MAXIMUM_TOTAL_STREAMS}). The "${key}" stream will be ignored.`
-      );
+      console.warn(
+        `Exceeded the maximum number of total streams (${MAXIMUM_TOTAL_STREAMS}). The "${key}" stream will be ignored. ` +
+        `Consider batching or combining related streams to stay within limits.`
+      );

Also applies to: 259-267

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2bf04ec and 8f1d8e1.

📒 Files selected for processing (2)
  • .changeset/giant-mice-cheer.md (1 hunks)
  • packages/core/src/v3/runMetadata/manager.ts (1 hunks)
🔇 Additional comments (2)
.changeset/giant-mice-cheer.md (1)

1-5: LGTM! Changeset correctly documents the stream limit increases

The patch version bump and change description accurately reflect both the increase in active streams (2→5) and total streams (5→10).

packages/core/src/v3/runMetadata/manager.ts (1)

9-10: LGTM! Constants updated to match PR objectives

The constant values have been correctly updated:

  • MAXIMUM_ACTIVE_STREAMS increased from 2 to 5 (matches PR objective)
  • MAXIMUM_TOTAL_STREAMS increased from 5 to 10 (maintains the 2:1 ratio)

Copy link

pkg-pr-new bot commented Dec 13, 2024

@trigger.dev/core

npm i https://pkg.pr.new/triggerdotdev/trigger.dev/@trigger.dev/core@1563

@trigger.dev/build

npm i https://pkg.pr.new/triggerdotdev/trigger.dev/@trigger.dev/build@1563

@trigger.dev/rsc

npm i https://pkg.pr.new/triggerdotdev/trigger.dev/@trigger.dev/rsc@1563

@trigger.dev/sdk

npm i https://pkg.pr.new/triggerdotdev/trigger.dev/@trigger.dev/sdk@1563

trigger.dev

npm i https://pkg.pr.new/triggerdotdev/trigger.dev@1563

@trigger.dev/react-hooks

npm i https://pkg.pr.new/triggerdotdev/trigger.dev/@trigger.dev/react-hooks@1563

commit: 203d7a0

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🔭 Outside diff range comments (2)
apps/webapp/app/services/routeBuilders/apiBuilder.server.ts (1)

Line range hint 474-479: Validate actual request body size instead of relying on 'Content-Length' header

Relying solely on the Content-Length header for enforcing maxContentLength may not be reliable, as this header can be missing or manipulated. Consider reading the request body in chunks and enforcing the size limit based on the actual data received to ensure proper handling of oversized requests.

apps/webapp/app/routes/api.v1.tasks.batch.ts (1)

Line range hint 91-98: Consider sanitizing error logs.

The error logging includes the full error stack trace which might expose sensitive information in production logs.

Consider sanitizing the error object:

-        error: {
-          message: (error as Error).message,
-          stack: (error as Error).stack,
-        },
+        error: error instanceof Error ? error.message : String(error),
+        code: error instanceof ServiceValidationError ? 'VALIDATION_ERROR' : 'UNKNOWN_ERROR'
🧹 Nitpick comments (4)
apps/webapp/app/services/metadata/updateMetadata.server.ts (1)

45-47: Consider throwing a ServiceValidationError when the task run is not found

Currently, when a task run is not found, the method returns without any indication of the issue. Throwing a ServiceValidationError with a message like "Task Run not found" would provide clearer feedback and allow consistent error handling upstream.

Apply this diff to implement the change:

if (!taskRun) {
-  return;
+  throw new ServiceValidationError("Task Run not found");
}
packages/core/src/v3/runMetadata/types.ts (1)

13-26: Consider documenting the stream method's behavior.

The interface looks good, but the stream method could benefit from documentation explaining:

  • The relationship between streams and metadata
  • The purpose of the AbortSignal
  • Expected behavior when maximum stream limits are reached

Consider adding JSDoc comments:

/**
 * Streams data while updating metadata. Limited by MAXIMUM_ACTIVE_STREAMS.
 * @param key - Metadata key to update
 * @param value - AsyncIterable or ReadableStream to process
 * @param signal - Optional AbortSignal to cancel the stream
 * @returns AsyncIterable of processed values
 * @throws When maximum active streams limit is reached
 */
stream<T>(
  key: string,
  value: AsyncIterable<T> | ReadableStream<T>,
  signal?: AbortSignal
): Promise<AsyncIterable<T>>;
packages/core/src/v3/runMetadata/noopManager.ts (1)

41-61: LGTM! Consider adding documentation.

The no-op implementations for parent and root getters correctly follow the null object pattern. Consider adding JSDoc comments to explain the purpose of these getters for better maintainability.

apps/webapp/app/routes/api.v1.tasks.batch.ts (1)

Line range hint 76-77: Extract magic number into a constant.

The 30-day TTL default is hardcoded. Consider extracting it into a named constant for better maintainability.

+const DEFAULT_IDEMPOTENCY_KEY_TTL_DAYS = 30;
+
 const idempotencyKeyExpiresAt =
   resolveIdempotencyKeyTTL(idempotencyKeyTTL) ??
-  new Date(Date.now() + 24 * 60 * 60 * 1000 * 30);
+  new Date(Date.now() + 24 * 60 * 60 * 1000 * DEFAULT_IDEMPOTENCY_KEY_TTL_DAYS);
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8f1d8e1 and 64ce918.

📒 Files selected for processing (14)
  • .changeset/giant-mice-cheer.md (1 hunks)
  • apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts (1 hunks)
  • apps/webapp/app/routes/api.v1.tasks.batch.ts (1 hunks)
  • apps/webapp/app/services/metadata/updateMetadata.server.ts (1 hunks)
  • apps/webapp/app/services/routeBuilders/apiBuilder.server.ts (2 hunks)
  • packages/core/src/v3/index.ts (1 hunks)
  • packages/core/src/v3/runMetadata/index.ts (2 hunks)
  • packages/core/src/v3/runMetadata/manager.ts (5 hunks)
  • packages/core/src/v3/runMetadata/noopManager.ts (2 hunks)
  • packages/core/src/v3/runMetadata/types.ts (1 hunks)
  • packages/core/src/v3/schemas/api.ts (1 hunks)
  • packages/react-hooks/package.json (1 hunks)
  • packages/trigger-sdk/src/v3/metadata.ts (2 hunks)
  • references/v3-catalog/src/trigger/runMetadata.ts (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • .changeset/giant-mice-cheer.md
🔇 Additional comments (17)
apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts (1)

1-29: Refactoring simplifies the API route and improves maintainability

The use of createActionApiRoute streamlines the route handling by integrating parameter validation, authentication, and method enforcement. This reduces code duplication and enhances readability.

apps/webapp/app/services/routeBuilders/apiBuilder.server.ts (1)

459-471: HTTP method validation enhances API security

The introduction of method validation ensures that only the specified HTTP methods are accepted, preventing unintended access and enhancing security.

packages/core/src/v3/schemas/api.ts (1)

672-729: New metadata operation schemas are well-structured and comprehensive

The addition of various RunMetadata*Operation schemas provides a flexible approach to handle metadata changes. The use of a discriminated union in RunMetadataChangeOperation ensures type safety and clear validation of operations.

packages/core/src/v3/runMetadata/types.ts (1)

4-11: LGTM! Well-structured metadata updater interface.

The RunMetadataUpdater interface provides a comprehensive set of methods for metadata manipulation with clear, consistent naming and proper typing.

references/v3-catalog/src/trigger/runMetadata.ts (2)

24-25: LGTM! Proper parent metadata update.

The increment operation correctly updates the parent's count of child tasks.


6-7: Verify the initial numberOfChildren value.

Setting numberOfChildren to 2 seems arbitrary. Consider:

  1. Starting from 0 and letting child tasks increment it
  2. Documenting why 2 is the initial value
✅ Verification successful

Initial numberOfChildren value is correct and intentional

The initial value of 2 is correct as it represents the expected number of child tasks that will be triggered in the execution flow:

  1. runMetadataChildTask is triggered directly from runMetadataTask
  2. runMetadataChildTask2 is triggered from within runMetadataChildTask

The subsequent increment operations in child tasks are used for tracking additional dynamic children, not for counting these two statically known child tasks.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for other occurrences of numberOfChildren to understand the expected initial value
rg "numberOfChildren" --type ts

Length of output: 331


Script:

#!/bin/bash
# Let's look at the full context of runMetadata.ts to understand the metadata structure and child task relationships
cat references/v3-catalog/src/trigger/runMetadata.ts

Length of output: 1789

packages/react-hooks/package.json (2)

77-77: LGTM! Added newline at EOF.

Following the best practice of ending files with a newline character.


Line range hint 1-1: Verify stream limit changes across codebase.

The PR aims to increase active streams from 2 to 5, but the core changes to MAXIMUM_ACTIVE_STREAMS and MAXIMUM_TOTAL_STREAMS constants aren't visible in the reviewed files.

✅ Verification successful

Stream limit changes are correctly implemented in manager.ts

The verification confirms that the PR's objective to increase active streams from 2 to 5 is properly implemented in packages/core/src/v3/runMetadata/manager.ts. The constants are defined as:

  • MAXIMUM_ACTIVE_STREAMS = 5 (increased from 2)
  • MAXIMUM_TOTAL_STREAMS = 10

These limits are consistently enforced with appropriate warning messages when exceeded.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for stream limit constants and their usage
rg -A 2 "MAXIMUM_ACTIVE_STREAMS|MAXIMUM_TOTAL_STREAMS" --type ts

# Search for any hardcoded stream limits
rg -A 2 "maximum.*stream|stream.*limit" --type ts -i

Length of output: 2549

packages/core/src/v3/index.ts (1)

15-15: LGTM! Export addition is well-placed.

The new export for runMetadata types is appropriately positioned near related exports.

packages/core/src/v3/runMetadata/index.ts (2)

84-90: LGTM! Implementation correctly delegates to manager.

The parent and root getters maintain consistent delegation pattern with other methods.


Line range hint 1-90: Verify stream limit changes.

The PR objective mentions increasing active streams from 2 to 5, but I don't see the stream limit configuration in these changes. Please ensure the stream limit changes are included in this PR.

✅ Verification successful

Let me search for stream limit configuration in the codebase.


Found stream limit configuration - PR objective verified

The stream limit configuration is defined in packages/core/src/v3/runMetadata/manager.ts:

  • MAXIMUM_ACTIVE_STREAMS = 5 (increased from 2 as per PR objective)
  • MAXIMUM_TOTAL_STREAMS = 10

The code includes proper limit checks and error handling:

  • Checks active streams limit: if (this.activeStreams.size >= MAXIMUM_ACTIVE_STREAMS)
  • Checks total streams limit: if (streams.length >= MAXIMUM_TOTAL_STREAMS)
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for stream limit configuration
rg -i "stream.*limit|MAX_.*STREAM" 

# Search for the number 2 or 5 near stream-related code
rg -B 3 -A 3 "(2|5).*stream|stream.*(2|5)"

Length of output: 53318


Script:

#!/bin/bash
# Search for stream limit configuration in core package
rg -i "MAX.*STREAM|STREAM.*LIMIT" packages/core/

# Search for stream configuration in manager class
ast-grep --pattern 'class RunMetadataManager {
  $$$
  stream($$$) {
    $$$
  }
  $$$
}'

Length of output: 1933

apps/webapp/app/routes/api.v1.tasks.batch.ts (2)

8-9: LGTM: Import statements are well-organized.

The new imports enhance the route's capabilities with authentication, logging, and error handling.

Also applies to: 12-12


Line range hint 124-157: LGTM: Response headers implementation is secure.

The JWT token generation follows security best practices with proper scoping and short-lived tokens.

packages/trigger-sdk/src/v3/metadata.ts (1)

10-11: LGTM: Metadata updaters are properly implemented.

The parent and root metadata updaters are well-typed and properly integrated into the metadata object.

Also applies to: 36-37

packages/core/src/v3/runMetadata/manager.ts (3)

20-21: LGTM: Well-implemented metadata operation queues.

The parent and root operation queues are properly implemented with type safety and clear operation types.

Also applies to: 31-75


347-364: LGTM: Efficient flush implementation.

The flush implementation efficiently handles queued operations and includes proper error handling.

Also applies to: 397-401


10-11: Verify impact of increased stream limits.

The increase in stream limits could impact system resources. Ensure the system can handle the increased load.

✅ Verification successful

Let me gather more information about how these streams are used and their impact.


Let me gather more information about how these streams are used in the codebase.


Based on the code analysis, I can now provide a final response about the stream limits change:

Stream limits are safely managed with proper resource controls

The implementation shows a robust stream management system with:

  • Active stream tracking via a Map data structure
  • Automatic cleanup of completed streams
  • Stream timeout handling (60s default)
  • Warning logs when limits are reached
  • Graceful handling of excess streams by ignoring new ones rather than failing

The increased limits (5 active, 10 total) are reasonable as the system:

  1. Automatically cleans up finished streams
  2. Has timeout protection against hanging streams
  3. Implements graceful degradation when limits are reached
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check for stream-related performance monitoring or configuration
rg -A 3 "MAXIMUM_(ACTIVE|TOTAL)_STREAMS" 

Length of output: 1456


Script:

#!/bin/bash
# Check for stream usage and error handling
ast-grep --pattern 'class StandardMetadataManager {
  $$$
}'

# Check for any performance monitoring or resource usage related code
rg -B 2 -A 2 "performance|memory|resource|monitor" 

Length of output: 105671


Script:

#!/bin/bash
# Check for stream usage implementation and error handling
ast-grep --pattern 'class StandardMetadataManager {
  private activeStreams = new Set<string>();
  $$$
}'

# Check for any memory or resource monitoring related to streams
rg -B 2 -A 2 "activeStreams|totalStreams|stream.*memory|stream.*resource"

Length of output: 4129

references/v3-catalog/src/trigger/runMetadata.ts Outdated Show resolved Hide resolved
@ericallam ericallam changed the title Increase the number of active streams from 2 to 5 Add ability to update parent and root run metadata from children Dec 16, 2024
@ericallam ericallam closed this Dec 16, 2024
@ericallam ericallam reopened this Dec 18, 2024
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

🧹 Nitpick comments (29)
references/nextjs-realtime/src/trigger/csv.ts (3)

15-19: Implement retry/failover logic for network fetch issues.
Currently, a single failure to fetch the file leads to an immediate error throw. Consider adding optional retry or failover logic to improve the robustness of the CSV file retrieval process, especially if network hiccups or transient server issues occur.


25-52: Handle malformed CSV data more comprehensively.
Although the code logs row parsing errors, be wary that certain CSV anomalies (e.g., partial rows, extra columns, or ill-formatted data) could break the parsing logic entirely. Consider implementing a more robust approach that tracks invalid rows separately for review or reprocessing, instead of silently discarding them.


75-88: Validate row-level logic inside handleCSVRow.
Currently, the processing step simulates a random delay and logs rows. If you plan to perform business-critical logic (e.g., storing data in a database), consider adding robust validation, error handling, and retries. This ensures data consistency, especially under transient failures.

references/nextjs-realtime/src/app/csv/[id]/RealtimeCSVRun.tsx (2)

46-55: Provide structured error feedback for metadata parsing issues.
Right now, the user only sees “Failed to parse metadata.” You might want to expose or log additional details about the error, especially if the user is technical and needs more debug information.


75-93: Ensure progress computations for partial data are correct.
If the CSV is very large or the processing is paused/stuck, users might see incomplete or misleading progress. Consider how progress gets updated for partial or paused states.

references/nextjs-realtime/src/app/csv/page.tsx (1)

1-14: Consider informing the user of file size limits and handling partial uploads.
While the dropzone approach is user-friendly, large CSV files risk partial or failed uploads. Add user guidance or fallback measures in case of partial uploads, ensuring the UI can handle incomplete CSV data gracefully.

references/nextjs-realtime/src/components/ui/progress.tsx (1)

20-23: Consider adding ARIA attributes for better accessibility.

While Radix UI provides good accessibility defaults, consider adding aria-label or aria-describedby props to provide more context about what this progress bar represents.

 <ProgressPrimitive.Indicator
   className="h-full w-full flex-1 bg-primary transition-all"
   style={{ transform: `translateX(-${100 - (value || 0)}%)` }}
+  aria-label="Progress indicator"
 />
references/nextjs-realtime/src/trigger/schemas.ts (2)

3-18: Consider adding additional validation constraints for CSV fields.

While the basic type coercion is good, consider adding more specific validation rules:

  • Date field could use a date format validation
  • Numeric fields might benefit from min/max bounds
 export const CSVRow = z.object({
-  Date: z.string(),
+  Date: z.string().refine((date) => !isNaN(Date.parse(date)), {
+    message: "Invalid date format",
+  }),
   Impressions: z.coerce.number(),
   Likes: z.coerce.number(),
   // ... other fields
 });

28-33: Consider adding invariant validation for row counts.

The metadata schema could benefit from validation ensuring processedRows doesn't exceed totalRows.

 export const CSVUploadMetadataSchema = z.object({
   status: CSVStatus,
   totalRows: z.number().int().nonnegative().optional(),
   inProgressRows: z.number().int().nonnegative().optional(),
   processedRows: z.number().int().nonnegative().optional(),
-});
+}).refine(
+  (data) => {
+    if (data.totalRows != null && data.processedRows != null) {
+      return data.processedRows <= data.totalRows;
+    }
+    return true;
+  },
+  {
+    message: "Processed rows cannot exceed total rows",
+  }
+);
references/nextjs-realtime/src/components/ImageUploadButton.tsx (2)

55-78: Consider extracting common upload logic to reduce duplication.

The CSVUploadDropzone component shares significant code with ImageUploadDropzone. Consider creating a reusable FileUploadDropzone component to reduce duplication.

Here's a suggested refactor:

interface FileUploadProps {
  endpoint: "csvUploader" | "imageUploader";
  getRedirectUrl: (file: any) => string;
}

function FileUploadDropzone({ endpoint, getRedirectUrl }: FileUploadProps) {
  const router = useRouter();
  
  return (
    <UploadDropzone
      endpoint={endpoint}
      onClientUploadComplete={(res) => {
        console.log("Files: ", res);
        const firstFile = res[0];
        router.push(getRedirectUrl(firstFile));
      }}
      onUploadError={(error: Error) => {
        console.error(`ERROR! ${error.message}`);
      }}
      className="border-gray-600"
    />
  );
}

// Usage:
export function CSVUploadDropzone() {
  return (
    <FileUploadDropzone
      endpoint="csvUploader"
      getRedirectUrl={(file) => 
        `/csv/${file.serverData.id}?publicAccessToken=${file.serverData.publicAccessToken}`
      }
    />
  );
}

59-76: Enhance user experience with feedback states.

The upload component could benefit from additional user feedback:

  1. Loading state during upload
  2. File type validation feedback
  3. Progress indicator for large files

Would you like me to provide an example implementation with these improvements?

references/nextjs-realtime/src/app/api/uploadthing/core.ts (1)

53-53: Consider adding error handling for task triggering.

The task trigger should include error handling to manage potential failures gracefully.

try {
  const handle = await tasks.trigger<typeof handleCSVUpload>("handle-csv-upload", file);
  return handle;
} catch (error) {
  console.error("Failed to trigger CSV upload handling:", error);
  throw new UploadThingError("Failed to process CSV file");
}
packages/core/src/v3/runMetadata/operations.ts (2)

5-8: Establish consistent rules for unapplied operations.
Currently, unapplied operations are collected without a clear explanation. Consider documenting the reasons or adding descriptive properties to each operation about why it was not applied.


70-96: Handle potential "remove" operation failures more explicitly.
When removal operations fail because the target is not an array, you currently push to unapplied operations without feedback. Consider adding logging or handling logic to indicate why it could not remove the item (e.g., by providing an error message).

apps/webapp/app/services/metadata/updateMetadata.server.ts (3)

40-74: Potential infinite loop in flushing logic.
The while(true) + sleep approach is workable, but ensure that an unhandled rejection or severe error doesn’t leave this fiber running forever. The catchAll block is good, but adding an escape condition might be safer for graceful shutdown.


193-215: Efficiently merging consecutive "set" operations.
Your implementation merges set operations by key. For large sets of changes, consider grouping and applying them in bulk, or implementing a last-write-wins approach.


357-394: Direct updates bypass version checks.
#updateRunMetadataDirectly does not use metadataVersion to ensure concurrency. If concurrency conflicts are possible, consider aligning it with the optimistic locking approach used in #updateRunMetadataWithOperations.

apps/webapp/app/v3/services/completeAttempt.server.ts (1)

159-160: Duplicate metadata usage.
Similar to lines 98-99; if the same metadata flows repeatedly, ensure no duplication or potential PII leakage.

.changeset/slow-deers-collect.md (1)

8-8: Enhance changeset description clarity

The description could be more detailed to better explain the feature's impact and usage. Consider expanding it to:

"Adding the ability to update parent and root run metadata from child runs/tasks, enabling better state management and data flow between related runs."

🧰 Tools
🪛 LanguageTool

[uncategorized] ~8-~8: You might be missing the article “the” here.
Context: ... "@trigger.dev/core": patch --- Adding ability to update parent run metadata from chil...

(AI_EN_LECTOR_MISSING_DETERMINER_THE)

apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts (1)

18-25: Consider enhancing error handling

While the current error handling covers the "not found" case, consider handling additional error scenarios:

  • Validation errors from the service
  • Database errors
  • Rate limiting or quota exceeded scenarios

Example enhancement:

 async ({ authentication, body, params }) => {
   const result = await updateMetadataService.call(authentication.environment, params.runId, body);

   if (!result) {
     return json({ error: "Task Run not found" }, { status: 404 });
   }
+  if (result.error) {
+    return json({ error: result.error.message }, { status: result.error.status || 400 });
+  }

   return json(result, { status: 200 });
 }
packages/core/src/v3/runMetadata/types.ts (1)

19-29: Consider documenting the hierarchical metadata behavior.

The interface effectively enables hierarchical metadata management through parent and root getters. However, it would be beneficial to add JSDoc comments explaining:

  • The relationship between parent, root, and current metadata
  • The propagation behavior of metadata updates
  • Any potential limitations or edge cases
apps/webapp/app/v3/services/resumeDependentParents.server.ts (1)

Line range hint 199-213: Duplicate error message in batch dependency handling.

The error message and logging in batchRunDependency method is identical to singleRunDependency. Consider extracting to a shared helper method.

Consider refactoring like this:

+ private logDependencyAttemptError(dependency: Dependency) {
+   logger.error(
+     "ResumeDependentParentsService: dependency child attempt not found",
+     { dependency }
+   );
+   return {
+     success: false,
+     error: `Dependency child attempt not found for run ${dependency.taskRunId}`,
+   };
+ }

  async #batchRunDependency(dependency: Dependency): Promise<Output> {
    // ...
    if (!lastAttempt) {
-     logger.error(
-       "ResumeDependentParentsService.singleRunDependency(): dependency child attempt not found",
-       { dependency }
-     );
-     return {
-       success: false,
-       error: `Dependency child attempt not found for run ${dependency.taskRunId}`,
-     };
+     return this.logDependencyAttemptError(dependency);
    }
    // ...
  }
packages/core/src/v3/apiClient/stream.ts (1)

21-21: LGTM! Consider adding delay parameter validation.

The implementation of delayed stream termination is well-structured and thread-safe. The abort signal check prevents redundant operations.

Consider adding validation for the delay parameter:

 stop: (delay?: number) => {
+  if (delay !== undefined && delay < 0) {
+    throw new Error("Delay must be a non-negative number");
+  }
   if (delay) {
     setTimeout(() => {

Also applies to: 67-76

packages/trigger-sdk/src/v3/metadata.ts (2)

10-11: Well-structured metadata management implementation!

The changes introduce a clean hierarchical metadata management system with:

  • Parent and root metadata updaters for vertical propagation
  • Consolidated updater object for better maintainability
  • Chainable API design for improved usability

The hierarchical design allows for efficient metadata propagation while maintaining clear boundaries between different scopes (current/parent/root).

Also applies to: 23-31, 33-42


87-89: Consider enhancing error handling in metadata operations.

While the method chaining implementation is clean, consider wrapping the runMetadata calls in try-catch blocks to handle potential errors gracefully.

Example implementation:

 function setMetadataKey(key: string, value: DeserializedJson) {
+  try {
     runMetadata.set(key, value);
+  } catch (error) {
+    console.error(`Failed to set metadata key ${key}:`, error);
+    throw error;
+  }
   return metadataUpdater;
 }

Also applies to: 101-102, 154-155, 171-172, 187-188

apps/webapp/package.json (1)

121-121: Consider using exact version for effect dependency.

While the current ^3.11.7 allows for patch and minor updates, consider pinning to an exact version for better dependency stability.

-    "effect": "^3.11.7",
+    "effect": "3.11.7",
packages/core/src/v3/apiClient/runStream.ts (1)

126-126: Consider making the timeout duration configurable.

The 30-second timeout for stopping the stream is hardcoded. Consider making this configurable through options or environment variables for flexibility in different environments.

-    stopRunShapeStream: () => runStreamInstance.stop(30 * 1000),
+    stopRunShapeStream: () => runStreamInstance.stop(options?.streamStopTimeoutMs ?? 30 * 1000),
packages/core/src/v3/schemas/common.ts (1)

4-66: Consider adding validation for edge cases in metadata operations.

While the schema definitions are well-structured, consider adding:

  1. Maximum length validation for string keys
  2. Size limits for metadata values
  3. Depth limits for nested objects in the value field

This would prevent potential abuse or performance issues with extremely large metadata.

packages/core/src/v3/runMetadata/manager.ts (1)

Line range hint 200-259: Consider adding retry mechanism for stream creation failures.

While the error handling is good, consider adding a retry mechanism for transient failures during stream creation.

Example implementation:

private async doStream<T>(
  key: string,
  value: AsyncIterable<T> | ReadableStream<T>,
  updater: RunMetadataUpdater = this,
  signal?: AbortSignal
): Promise<AsyncIterable<T>> {
+ const maxRetries = 3;
+ const retryDelay = 1000;
+ 
+ for (let attempt = 1; attempt <= maxRetries; attempt++) {
    try {
      const streamInstance = new MetadataStream({/*...*/});
      // ... existing implementation
      return streamInstance;
    } catch (error) {
+     if (attempt === maxRetries) {
        updater.remove(`$$streams`, key);
        throw error;
+     }
+     await new Promise(resolve => setTimeout(resolve, retryDelay * attempt));
    }
+ }
}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 64ce918 and c91c6a8.

⛔ Files ignored due to path filters (1)
  • pnpm-lock.yaml is excluded by !**/pnpm-lock.yaml
📒 Files selected for processing (42)
  • .changeset/giant-mice-cheer.md (1 hunks)
  • .changeset/slow-deers-collect.md (1 hunks)
  • apps/webapp/app/env.server.ts (1 hunks)
  • apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts (1 hunks)
  • apps/webapp/app/routes/api.v1.tasks.batch.ts (1 hunks)
  • apps/webapp/app/services/metadata/updateMetadata.server.ts (1 hunks)
  • apps/webapp/app/services/routeBuilders/apiBuilder.server.ts (3 hunks)
  • apps/webapp/app/v3/services/completeAttempt.server.ts (4 hunks)
  • apps/webapp/app/v3/services/finalizeTaskRun.server.ts (5 hunks)
  • apps/webapp/app/v3/services/resumeDependentParents.server.ts (2 hunks)
  • apps/webapp/package.json (1 hunks)
  • internal-packages/database/prisma/migrations/20241216212038_add_metadata_version/migration.sql (1 hunks)
  • internal-packages/database/prisma/schema.prisma (1 hunks)
  • packages/cli-v3/src/entryPoints/deploy-run-worker.ts (3 hunks)
  • packages/cli-v3/src/entryPoints/dev-run-worker.ts (2 hunks)
  • packages/core/src/v3/apiClient/index.ts (1 hunks)
  • packages/core/src/v3/apiClient/runStream.ts (2 hunks)
  • packages/core/src/v3/apiClient/stream.ts (2 hunks)
  • packages/core/src/v3/index.ts (1 hunks)
  • packages/core/src/v3/run-metadata-api.ts (1 hunks)
  • packages/core/src/v3/runMetadata/index.ts (3 hunks)
  • packages/core/src/v3/runMetadata/manager.ts (6 hunks)
  • packages/core/src/v3/runMetadata/noopManager.ts (2 hunks)
  • packages/core/src/v3/runMetadata/operations.ts (1 hunks)
  • packages/core/src/v3/runMetadata/types.ts (1 hunks)
  • packages/core/src/v3/runtime/devRuntimeManager.ts (4 hunks)
  • packages/core/src/v3/schemas/api.ts (2 hunks)
  • packages/core/src/v3/schemas/common.ts (3 hunks)
  • packages/react-hooks/package.json (1 hunks)
  • packages/react-hooks/src/hooks/useRealtime.ts (5 hunks)
  • packages/trigger-sdk/src/v3/metadata.ts (10 hunks)
  • packages/trigger-sdk/src/v3/runs.ts (3 hunks)
  • references/nextjs-realtime/package.json (1 hunks)
  • references/nextjs-realtime/src/app/api/uploadthing/core.ts (2 hunks)
  • references/nextjs-realtime/src/app/csv/[id]/RealtimeCSVRun.tsx (1 hunks)
  • references/nextjs-realtime/src/app/csv/[id]/page.tsx (1 hunks)
  • references/nextjs-realtime/src/app/csv/page.tsx (1 hunks)
  • references/nextjs-realtime/src/components/ImageUploadButton.tsx (1 hunks)
  • references/nextjs-realtime/src/components/ui/progress.tsx (1 hunks)
  • references/nextjs-realtime/src/trigger/csv.ts (1 hunks)
  • references/nextjs-realtime/src/trigger/schemas.ts (1 hunks)
  • references/v3-catalog/src/trigger/runMetadata.ts (3 hunks)
✅ Files skipped from review due to trivial changes (1)
  • internal-packages/database/prisma/migrations/20241216212038_add_metadata_version/migration.sql
🚧 Files skipped from review as they are similar to previous changes (4)
  • packages/core/src/v3/index.ts
  • .changeset/giant-mice-cheer.md
  • references/v3-catalog/src/trigger/runMetadata.ts
  • apps/webapp/app/routes/api.v1.tasks.batch.ts
🧰 Additional context used
🪛 LanguageTool
.changeset/slow-deers-collect.md

[uncategorized] ~8-~8: You might be missing the article “the” here.
Context: ... "@trigger.dev/core": patch --- Adding ability to update parent run metadata from chil...

(AI_EN_LECTOR_MISSING_DETERMINER_THE)

🔇 Additional comments (44)
references/nextjs-realtime/src/trigger/csv.ts (1)

56-58: Confirm concurrency or rate limits for batch processing.
When calling batch.triggerAndWait, ensure that concurrency settings are well-defined or tested for large CSV files. Without proper concurrency or rate-limiting, system resources could be strained.

references/nextjs-realtime/src/app/csv/[id]/RealtimeCSVRun.tsx (1)

21-33: Guard the environment variable usage.
The code references NEXT_PUBLIC_TRIGGER_API_URL. If this environment variable is missing or malformed, requests may fail silently. Consider adding fallback logic or user-friendly error messages in case it’s undefined.

references/nextjs-realtime/src/app/csv/[id]/page.tsx (1)

11-13: Prevent potential accidental 404 edge cases.
The code calls notFound() if publicAccessToken is not a string. If the token is an empty string (""), it still passes this check but may be invalid. Consider verifying that it's non-empty as well.

references/nextjs-realtime/src/components/ui/progress.tsx (1)

8-25: LGTM! Well-structured Progress component implementation.

The component follows React best practices with proper TypeScript types, ref forwarding, and clean composition of Radix UI primitives.

references/nextjs-realtime/package.json (1)

16-16: Verify dependency versions and security implications.

The new dependencies look good, but let's verify the latest stable versions and any security advisories.

Also applies to: 19-19

✅ Verification successful

Dependencies are up-to-date and secure

The package versions being used are the latest stable releases:

The only known vulnerability in fast-csv affects versions < 4.3.6, while we're using version 5.0.2, so we're not affected.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check for latest versions and security advisories for new dependencies

echo "Checking @fast-csv/parse..."
curl -s https://registry.npmjs.org/@fast-csv/parse/latest | jq '.version'

echo "Checking @radix-ui/react-progress..."
curl -s https://registry.npmjs.org/@radix-ui/react-progress/latest | jq '.version'

# Check for known vulnerabilities
gh api graphql -f query='
{
  securityVulnerabilities(first: 5, ecosystem: NPM, package: "fast-csv") {
    nodes {
      advisory {
        summary
        severity
        publishedAt
      }
      vulnerableVersionRange
    }
  }
}
'

Length of output: 753

packages/core/src/v3/runMetadata/operations.ts (3)

18-28: Guard array manipulations that rely on keys starting with '$.'.
Ensure that all potential JSON key paths that do not begin with '$.' are properly handled. An inadvertent key mismatch can lead to incorrect data in the final metadata object.


114-118: Overwriting the entire metadata object.
When updating all metadata with operation.value, verify that downstream code doesn’t expect existing keys to be retained. This operation entirely replaces the prior metadata object.


16-16: Check structuredClone compatibility if Node.js versions vary.
If you expect this code to run in older Node.js environments, consider a polyfill or an alternative method for deep-cloning (e.g., JSON.parse(JSON.stringify(...))) if structuredClone is unavailable.

✅ Verification successful

Let me check the Node.js version requirements for this project.


Based on the package.json files, I can see that the minimum Node.js version requirement across the project is 18.20.0 (specified in multiple packages). Since structuredClone was introduced in Node.js 17.0.0, it's safe to use in this codebase.

No compatibility concerns with structuredClone usage.
The codebase requires Node.js >=18.20.0 which fully supports the structuredClone API, making it safe to use without any polyfills or alternatives.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Searching where structuredClone is used or referenced in the codebase
rg "structuredClone"

Length of output: 245


Script:

#!/bin/bash
# Check package.json files for Node.js version requirements
fd package.json --exec cat {}

Length of output: 62681

apps/webapp/app/services/metadata/updateMetadata.server.ts (2)

217-278: Validate final run status before applying parent or root operations.
In your call method, you skip updating the parent or root if the run is not final. Ensure that’s consistent with the overall logic, and confirm no partial updates cause side effects.


296-355: Extended retries logic.
In your #updateRunMetadataWithOperations, you retry up to MAX_RETRIES times, then buffer operations again. This approach is solid. Monitor logs or metrics to ensure repeated unsuccessful retries don’t cause indefinite loops.

apps/webapp/app/v3/services/completeAttempt.server.ts (3)

15-16: Check interplay between TaskRun and MAX_TASK_RUN_ATTEMPTS.
Ensure that any future changes in MAX_TASK_RUN_ATTEMPTS consistently reflect in code handling run attempts. Overriding this constant in different environments might cause unexpected behavior.


30-30: Integration with updateMetadataService
Good usage of the new service. Ensure the environment variable is always defined to prevent runtime errors.


314-315: Metadata usage in #completeAttemptFailed.
Confirm that storing completion metadata during failures does not inadvertently overwrite essential debug information or lose it if the run is retried.

internal-packages/database/prisma/schema.prisma (1)

1781-1783: Addition of metadataVersion to TaskRun.
This is a crucial field for conflict resolution and migration strategies. Ensure that client code properly increments it, as done in the optimistic locking approach.

packages/core/src/v3/run-metadata-api.ts (1)

Line range hint 1-8: Well-structured module with optimization consideration

Good implementation of:

  • Singleton pattern for API instance
  • Module organization with clear re-exports
  • Tree-shaking optimization through separate file structure
apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts (2)

13-16: Well-structured API route configuration

Good implementation with:

  • Schema validation for params and body
  • Explicit content size limit
  • Clear HTTP method specification

15-15: Verify content size limit implications

The 1MB limit for metadata updates should be verified against typical usage patterns.

✅ Verification successful

Let me gather more information about the metadata usage and size patterns.


Let me check one more aspect regarding the metadata validation and payload handling.


1MB metadata size limit is appropriate

The 1MB limit for metadata updates is reasonable based on the following findings:

  • The metadata API endpoint is designed for storing run-time metadata and status updates, not large payloads
  • The codebase implements proper validation and error handling for metadata updates
  • The system prevents metadata updates for completed runs, indicating metadata is meant for tracking execution state
  • Other similar size limits in the codebase (like task payloads) follow comparable patterns
  • The metadata operations are handled through a batched flush mechanism, suggesting the system is optimized for smaller, frequent updates
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for existing metadata payloads to verify size appropriateness
rg -g '*.{ts,js}' -B 2 -A 2 'metadata.*payload|payload.*metadata'

Length of output: 1288


Script:

#!/bin/bash
# Search for metadata-related type definitions and schemas
ast-grep --pattern 'type $_ = { $$$metadata$$$}'

# Look for metadata-related API endpoints and handlers
rg -g '*.{ts,js}' -B 3 -A 3 'metadata.*update|update.*metadata'

# Search for any size limits or validation related to metadata
rg -g '*.{ts,js}' 'metadata.*size|size.*metadata|metadata.*limit|limit.*metadata'

Length of output: 10351


Script:

#!/bin/bash
# Look for metadata validation schemas and payload size handling
rg -g '*.{ts,js}' -B 3 -A 3 'UpdateMetadataRequestBody|metadata.*validate'

# Check for any existing size-related constants or configurations
rg -g '*.{ts,js}' 'maxContentLength|MAX_.*SIZE|SIZE_LIMIT'

Length of output: 9592

packages/core/src/v3/runMetadata/types.ts (1)

4-11: LGTM! Well-designed interface for metadata manipulation.

The RunMetadataUpdater interface provides a comprehensive and chainable API for metadata operations. The method names are concise and follow common conventions (e.g., set/del pattern).

packages/react-hooks/package.json (1)

Line range hint 1-77: LGTM! Proper dual ESM/CommonJS module configuration.

The exports field correctly defines both ESM and CommonJS entry points, ensuring compatibility across different module systems. The configuration follows Node.js best practices for dual package hazard handling.

packages/core/src/v3/runMetadata/index.ts (2)

44-76: LGTM! Clean implementation of metadata operations.

The implementation follows a consistent pattern:

  • Proper delegation to the manager
  • Method chaining support via return this
  • Consistent error propagation

95-101: Verify parent/root metadata access patterns.

The implementation of parent and root metadata access looks correct, but we should verify the usage patterns to ensure proper metadata isolation.

✅ Verification successful

Let me gather more context about the metadata implementation to ensure proper isolation.


Let me check one more aspect to understand the metadata hierarchy implementation better.


Based on the gathered evidence, I can now provide the final response:

Parent/root metadata access patterns are properly isolated and used correctly.

The implementation shows proper metadata isolation with:

  • Parent/root metadata modifications are limited to safe operations (increment, append)
  • Usage is consistent across the codebase, primarily for tracking child relationships and counts
  • No direct property access or unsafe operations detected on parent/root metadata
  • Clear hierarchical structure maintained through the manager implementation
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for usage patterns of parent/root metadata access
rg -A 5 '\.(parent|root)\.(set|del|append|remove|increment|decrement|update)' --type ts

Length of output: 1393


Script:

#!/bin/bash
# Search for the RunMetadataUpdater class implementation and usage
ast-grep --pattern 'class RunMetadataUpdater {
  $$$
}'

# Also check for any potential metadata leaks or direct access
rg -A 3 'metadata\.(current|get|getKey)' --type ts

Length of output: 2788


Script:

#!/bin/bash
# Search for metadata manager implementation to understand isolation
ast-grep --pattern 'class $_Manager {
  $$$
  parent {
    $$$
  }
  root {
    $$$
  }
  $$$
}'

# Check for any direct property access on parent/root
rg '\.(parent|root)\.' --type ts -A 2 | rg -v '(set|del|append|remove|increment|decrement|update|get|current)'

Length of output: 14014

packages/core/src/v3/runMetadata/noopManager.ts (1)

Line range hint 6-42: LGTM! Method signature changes improve API ergonomics.

The method signature changes from void to this enable method chaining, which improves the API's usability.

apps/webapp/app/v3/services/resumeDependentParents.server.ts (2)

24-51: LGTM! Well-structured field selection.

The taskRunDependencySelect constant provides clear and type-safe field selection, improving maintainability and reducing potential runtime errors.


83-89: LGTM! Early return improves code flow.

Moving the DEVELOPMENT environment check earlier in the flow improves code readability and efficiency.

packages/trigger-sdk/src/v3/metadata.ts (1)

210-227: LGTM! Well-documented refresh implementation.

The refreshMetadata function follows consistent patterns and includes proper request options handling.

packages/cli-v3/src/entryPoints/dev-run-worker.ts (1)

333-333: LGTM: Metadata handling in task completion responses

The changes correctly include metadata in both success and error completion responses, using the appropriate metadata manager API.

Also applies to: 358-358

apps/webapp/app/env.server.ts (2)

248-248: LGTM: Reasonable flush interval for batch operations

The 1-second default interval for flushing batch metadata operations is a good balance between reducing database load and maintaining data freshness.


241-241: Verify impact of increased metadata size limit

The metadata size limit has been significantly increased from 4KB to 256KB. While this allows for more detailed metadata, we should verify:

  1. The impact on memory usage when many runs are active
  2. Any potential performance implications for the database
  3. Network bandwidth considerations for real-time updates
✅ Verification successful

Let me gather more specific information about metadata usage.


Let me try one more search with corrected file type flags.


Based on the codebase analysis, I can now provide a final response:

The increased metadata size limit is justified and safe

The increase from 4KB to 256KB is well-considered for the following reasons:

  1. The metadata is primarily used for tracking task run states, progress, and streaming operations, as evidenced by the extensive metadata management system in the codebase.
  2. The metadata is handled efficiently with:
    • Batched updates through BatchMetadataOperationsFlushInterval
    • Incremental updates using operations like append, increment, and remove
    • Proper cleanup of stream metadata
  3. Real-world usage patterns in the codebase show metadata being used for:
    • Task progress tracking
    • Stream management for AI/ML operations
    • CSV processing status
    • Run state management

The implementation includes safeguards like periodic flushing and proper error handling to manage the metadata efficiently.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check current metadata sizes to validate the new limit

# Search for metadata-related database operations
rg -A 5 'metadata.*update|update.*metadata' --type sql

# Look for any existing large metadata payloads
ast-grep --pattern 'metadata = $_'

Length of output: 3791


Script:

#!/bin/bash
# Search for metadata-related code patterns
rg -l 'metadata' --type typescript

# Look for metadata size patterns and current usage
rg -A 3 'metadata.*size|size.*metadata' --type typescript

# Search for metadata storage/update patterns
ast-grep --pattern 'metadata: $_'

Length of output: 952


Script:

#!/bin/bash
# Search for metadata usage patterns in TypeScript/JavaScript files
rg -A 3 'metadata' -t js -t ts

# Look for metadata size validation or checks
rg -A 3 'metadata.*size|size.*metadata' -t js -t ts

# Search for metadata-related tests
fd -e test.ts -e test.js -x rg -l 'metadata' {}

Length of output: 73988

packages/trigger-sdk/src/v3/runs.ts (1)

Line range hint 338-391: LGTM: Well-implemented subscription control for metadata updates

The changes provide a clean and well-documented way to control subscription lifecycle, particularly useful for updating metadata from child runs. The implementation:

  • Maintains backward compatibility with the default stopOnCompletion: true
  • Provides clear documentation of the new option
  • Correctly maps to the API client's interface
packages/cli-v3/src/entryPoints/deploy-run-worker.ts (1)

366-366: LGTM! Consistent metadata handling across all completion paths.

The changes ensure that run metadata is properly included in all task completion scenarios (normal completion, execution completion, and error cases). This implementation aligns with the PR objective of enabling metadata updates from child runs.

Also applies to: 391-391, 415-415

packages/core/src/v3/apiClient/runStream.ts (1)

Line range hint 327-336: LGTM! Improved run completion logic.

The change to use finishedAt for determining run completion is more accurate and aligns with the PR objective by allowing metadata updates to continue until the run is truly finished.

packages/react-hooks/src/hooks/useRealtime.ts (2)

23-31: LGTM! Well-documented option for controlling subscription behavior.

The stopOnCompletion option is well-documented with a clear explanation of its purpose and default value. The documentation effectively communicates the use case for child run metadata updates.


102-108: LGTM! Consistent implementation of stopOnCompletion across functions.

The stopOnCompletion option is consistently implemented across processRealtimeRun and processRealtimeRunWithStreams, with proper default values and type handling.

Also applies to: 262-262, 640-646

apps/webapp/app/services/routeBuilders/apiBuilder.server.ts (3)

387-387: LGTM! Good addition of HTTP method restriction.

The addition of the method property to ApiKeyActionRouteBuilderOptions enhances API security by allowing explicit HTTP method restrictions.


459-470: LGTM! Well-implemented method validation.

The method validation is properly implemented with:

  • Correct status code (405) for method not allowed
  • Proper Allow header in the response
  • Consistent error response format

631-641: LGTM! Enhanced error logging.

The error logging improvements are valuable:

  • Captures detailed error information including stack traces
  • Includes request URL for better context
  • Properly handles both Error objects and other error types
packages/core/src/v3/schemas/api.ts (1)

677-677: LGTM! Clean schema update.

The UpdateMetadataRequestBody schema is properly updated to use FlushedRunMetadata, maintaining consistency with the new metadata handling system.

packages/core/src/v3/apiClient/index.ts (2)

626-636: LGTM! Well-implemented metadata retrieval method.

The getRunMetadata method is properly implemented with:

  • Correct HTTP method (GET)
  • Proper error handling through zodfetch
  • Consistent with API patterns

638-644: LGTM! Good enhancement to subscription control.

The addition of closeOnComplete option:

  • Maintains backward compatibility with default true
  • Provides more flexibility in subscription management
  • Well-documented through types
apps/webapp/app/v3/services/finalizeTaskRun.server.ts (1)

73-89: LGTM! Robust error handling for metadata updates.

The implementation properly handles metadata updates with:

  • Non-blocking execution
  • Comprehensive error logging
  • Proper error sanitization
packages/core/src/v3/schemas/common.ts (1)

322-322: LGTM! Consistent metadata support in execution results.

The metadata field is properly added to both success and failure execution results.

Also applies to: 335-335

packages/core/src/v3/runMetadata/manager.ts (3)

113-125: LGTM! Efficient operation queueing with optimizations.

The implementation includes:

  • Duplicate operation prevention
  • Operation validation before queueing
  • Proper state management

305-335: LGTM! Well-implemented flush mechanism with proper synchronization.

The flush implementation:

  • Prevents concurrent flushes
  • Efficiently batches operations
  • Properly clears queues after flush

11-12: Verify the impact of increased stream limits.

The increase in stream limits could impact system resources:

  • Active streams increased from 2 to 5
  • Total streams increased from 5 to 10
✅ Verification successful

Stream limit increase appears safe and well-managed

The increased stream limits are used with proper error handling and clear messaging:

  • Stream usage is primarily for handling OpenAI completions, fetch responses, and logs
  • The manager enforces limits with descriptive error messages when exceeded
  • Stream handling is isolated to specific documented use cases in the realtime features
  • No evidence of resource-intensive or unbounded stream usage
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check for stream usage patterns in the codebase
rg -A 5 "stream\(" --type typescript

Length of output: 73


Script:

#!/bin/bash
# Search for stream-related patterns in all files
rg -A 5 "stream\(" 

# Also search for stream usage with different casing
rg -i "createstream|newstream" 

# Look for stream-related class/method definitions
ast-grep --pattern 'class $_ {
  $$$
  stream($$$) {
    $$$
  }
  $$$
}'

# Search for stream limit constant usage
rg "MAXIMUM_(ACTIVE|TOTAL)_STREAMS"

Length of output: 7094

Comment on lines +27 to +39
export class UpdateMetadataService extends BaseService {
private _bufferedOperations: Map<string, BufferedRunMetadataChangeOperation[]> = new Map();
private _flushFiber: RuntimeFiber<void> | null = null;

constructor(
protected readonly _prisma: PrismaClientOrTransaction = prisma,
private readonly flushIntervalMs: number = 5000
) {
super();

this._startFlushing();
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider concurrency control for buffered operations.
The in-memory Map might grow if flush intervals are large or operations surge. Think about queue backpressure or a maximum buffer size to prevent potential memory exhaustion.

Comment on lines 91 to 93
refresh(requestOptions?: ApiRequestOptions): Promise<void> {
return this.#getManager().flush(requestOptions);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix incorrect implementation of refresh method.

The refresh method is calling flush instead of refresh on the manager. This appears to be a copy-paste error that could lead to unexpected behavior.

  refresh(requestOptions?: ApiRequestOptions): Promise<void> {
-   return this.#getManager().flush(requestOptions);
+   return this.#getManager().refresh(requestOptions);
  }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
refresh(requestOptions?: ApiRequestOptions): Promise<void> {
return this.#getManager().flush(requestOptions);
}
refresh(requestOptions?: ApiRequestOptions): Promise<void> {
return this.#getManager().refresh(requestOptions);
}

Comment on lines 62 to 78
get root(): RunMetadataUpdater {
return {
append: () => this.parent,
set: () => this.parent,
del: () => this.parent,
increment: () => this.parent,
decrement: () => this.parent,
remove: () => this.parent,
stream: () =>
Promise.resolve({
[Symbol.asyncIterator]: () => ({
next: () => Promise.resolve({ done: true, value: undefined }),
}),
}),
update: () => this.parent,
};
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix similar infinite recursion issue in root getter.

The root getter has the same recursive reference issue as the parent getter.

Apply this fix:

  get root(): RunMetadataUpdater {
    return {
-     append: () => this.parent,
-     set: () => this.parent,
-     del: () => this.parent,
-     increment: () => this.parent,
-     decrement: () => this.parent,
-     remove: () => this.parent,
+     append: () => this,
+     set: () => this,
+     del: () => this,
+     increment: () => this,
+     decrement: () => this,
+     remove: () => this,
      stream: () =>
        Promise.resolve({
          [Symbol.asyncIterator]: () => ({
            next: () => Promise.resolve({ done: true, value: undefined }),
          }),
        }),
-     update: () => this.parent,
+     update: () => this,
    };
  }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
get root(): RunMetadataUpdater {
return {
append: () => this.parent,
set: () => this.parent,
del: () => this.parent,
increment: () => this.parent,
decrement: () => this.parent,
remove: () => this.parent,
stream: () =>
Promise.resolve({
[Symbol.asyncIterator]: () => ({
next: () => Promise.resolve({ done: true, value: undefined }),
}),
}),
update: () => this.parent,
};
}
get root(): RunMetadataUpdater {
return {
append: () => this,
set: () => this,
del: () => this,
increment: () => this,
decrement: () => this,
remove: () => this,
stream: () =>
Promise.resolve({
[Symbol.asyncIterator]: () => ({
next: () => Promise.resolve({ done: true, value: undefined }),
}),
}),
update: () => this,
};
}

Comment on lines +44 to +60
get parent(): RunMetadataUpdater {
return {
append: () => this.parent,
set: () => this.parent,
del: () => this.parent,
increment: () => this.parent,
decrement: () => this.parent,
remove: () => this.parent,
stream: () =>
Promise.resolve({
[Symbol.asyncIterator]: () => ({
next: () => Promise.resolve({ done: true, value: undefined }),
}),
}),
update: () => this.parent,
};
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix potential infinite recursion in parent getter.

The parent getter has recursive references that could cause infinite loops. Each method returns this.parent which would trigger the getter again.

Apply this fix:

  get parent(): RunMetadataUpdater {
    return {
-     append: () => this.parent,
-     set: () => this.parent,
-     del: () => this.parent,
-     increment: () => this.parent,
-     decrement: () => this.parent,
-     remove: () => this.parent,
+     append: () => this,
+     set: () => this,
+     del: () => this,
+     increment: () => this,
+     decrement: () => this,
+     remove: () => this,
      stream: () =>
        Promise.resolve({
          [Symbol.asyncIterator]: () => ({
            next: () => Promise.resolve({ done: true, value: undefined }),
          }),
        }),
-     update: () => this.parent,
+     update: () => this,
    };
  }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
get parent(): RunMetadataUpdater {
return {
append: () => this.parent,
set: () => this.parent,
del: () => this.parent,
increment: () => this.parent,
decrement: () => this.parent,
remove: () => this.parent,
stream: () =>
Promise.resolve({
[Symbol.asyncIterator]: () => ({
next: () => Promise.resolve({ done: true, value: undefined }),
}),
}),
update: () => this.parent,
};
}
get parent(): RunMetadataUpdater {
return {
append: () => this,
set: () => this,
del: () => this,
increment: () => this,
decrement: () => this,
remove: () => this,
stream: () =>
Promise.resolve({
[Symbol.asyncIterator]: () => ({
next: () => Promise.resolve({ done: true, value: undefined }),
}),
}),
update: () => this,
};
}

Comment on lines +102 to +106
async #tryFlushMetadata() {
try {
await runMetadata.flush();
} catch (err) {}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve error handling in metadata flush.

The empty catch block silently swallows errors, which could hide important issues. Consider adding error logging.

Apply this improvement:

  async #tryFlushMetadata() {
    try {
      await runMetadata.flush();
-   } catch (err) {}
+   } catch (err) {
+     console.warn("Failed to flush metadata:", err);
+   }
  }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async #tryFlushMetadata() {
try {
await runMetadata.flush();
} catch (err) {}
}
async #tryFlushMetadata() {
try {
await runMetadata.flush();
} catch (err) {
console.warn("Failed to flush metadata:", err);
}
}

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (6)
references/nextjs-realtime/src/trigger/csv.ts (3)

25-52: Consider streaming approach for large CSV files.

Loading the entire file into memory could cause issues with large files. Consider using a streaming approach with batched processing.

Consider implementing a chunked processing approach where rows are processed in smaller batches as they're parsed, rather than collecting all rows first.


37-39: Track parsing failures in metadata.

While parsing failures are logged, they're not tracked in metadata, making it harder to monitor the overall health of the upload process.

         } else {
           logger.warn("Failed to parse row", { row, errors: parsedRow.error });
+          metadata.increment("parseFailures", 1);
         }

84-92: Add error handling for metadata operations.

Metadata operations should be wrapped in try-catch blocks to handle potential failures gracefully.

-    metadata.parent.increment("processedRows", 1).append("rowRuns", ctx.run.id);
+    try {
+      await metadata.parent.increment("processedRows", 1).append("rowRuns", ctx.run.id);
+    } catch (error) {
+      logger.error("Failed to update parent metadata", { error });
+    }

-    await metadata.parent.stream(
-      ctx.run.id,
-      (async function* () {
-        yield "hello";
-        yield "world";
-      })()
-    );
+    try {
+      await metadata.parent.stream(
+        ctx.run.id,
+        (async function* () {
+          yield `Processing row ${row.id}`; // More meaningful messages
+          yield `Completed processing row ${row.id}`;
+        })()
+      );
+    } catch (error) {
+      logger.error("Failed to stream messages", { error });
+    }
apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts (1)

57-57: **Consider surrounding ingestData with error handling. **

• If relayRealtimeStreams.ingestData fails, you may want to catch and handle or log the error for better traceability.
• Otherwise, the function returns a Response or possibly an error.

packages/core/src/v3/runMetadata/metadataStream.ts (1)

66-78: **buildUrl method introduces clarity. **

• Encapsulating URL construction simplifies maintainability.
• The default to “self” is convenient if no target is specified.
• If the app grows more versions or targets, keep expansions in mind or use a dictionary-based approach.

packages/core/src/v3/runMetadata/manager.ts (1)

11-12: **Increased maximum streams. **

• Changing MAXIMUM_ACTIVE_STREAMS to 5 and MAXIMUM_TOTAL_STREAMS to 10 will allow more concurrency.
• Ensure the underlying infrastructure can handle the additional load.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c91c6a8 and b558a1e.

📒 Files selected for processing (6)
  • apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts (0 hunks)
  • apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts (1 hunks)
  • packages/core/src/v3/runMetadata/manager.ts (7 hunks)
  • packages/core/src/v3/runMetadata/metadataStream.ts (3 hunks)
  • references/nextjs-realtime/src/trigger/csv.ts (1 hunks)
  • references/v3-catalog/src/trigger/runMetadata.ts (3 hunks)
💤 Files with no reviewable changes (1)
  • apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts
🚧 Files skipped from review as they are similar to previous changes (1)
  • references/v3-catalog/src/trigger/runMetadata.ts
🔇 Additional comments (23)
references/nextjs-realtime/src/trigger/csv.ts (2)

1-6: LGTM! Imports are well-organized and necessary.

All imported modules are being used appropriately in the implementation.


3-3: Verify SDK version compatibility.

Ensure that all used features are supported in the specified SDK version.

✅ Verification successful

All SDK v3 features are properly supported

The imported features (batch, logger, metadata, schemaTask) are all properly implemented and exported in the SDK v3. The code review shows these are core features with full implementation in the corresponding files. The package exports them correctly through the v3 namespace.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check package.json for SDK version constraints
rg -A 5 '"@trigger.dev/sdk"' package.json

# Look for any breaking changes in SDK v3
gh api repos/triggerdotdev/trigger.dev/releases | jq -r '.[] | select(.name | contains("sdk-v3")) | .body'

Length of output: 152


Script:

#!/bin/bash
# Check package.json existence and content
fd -t f "package.json" --exec cat {}

# Look for SDK imports across the codebase
rg -A 2 "@trigger.dev/sdk" 

# Check if there are any SDK version references in other config files
fd -t f ".(json|yaml|yml)$" --exec grep -l "trigger.dev/sdk" {}

Length of output: 65947


Script:

#!/bin/bash
# Check for the specific SDK v3 features being used in the file
rg -A 5 "batch|logger|metadata|schemaTask" references/nextjs-realtime/src/trigger/csv.ts

# Look for SDK v3 exports to verify feature availability
ast-grep --pattern 'export { $$$, batch, logger, metadata, schemaTask, $$$ }' packages/trigger-sdk/src/v3/index.ts

Length of output: 2222


Script:

#!/bin/bash
# Check the SDK v3 implementation files for these features
fd -t f "index.ts" packages/trigger-sdk/src/v3 --exec cat {}

# Look for any feature exports in other SDK v3 files
rg -l "export.*\b(batch|logger|metadata|schemaTask)\b" packages/trigger-sdk/src/v3/

Length of output: 10133


Script:

#!/bin/bash
# Check the implementation of these specific features
cat packages/trigger-sdk/src/v3/metadata.ts packages/trigger-sdk/src/v3/tasks.ts packages/trigger-sdk/src/v3/batch.ts

# Look for any breaking changes or version constraints
rg -A 2 "v3" packages/trigger-sdk/package.json

Length of output: 10968

apps/webapp/app/routes/realtime.v1.streams.$runId.$target.$streamId.ts (9)

1-2: **Imports appear correct. **

The necessary libraries and modules are appropriately imported here.


12-16: **Route creation is well-structured. **

No issues found. This API route follows established patterns, making it easy to maintain.


17-19: **Body presence check is good, but consider JSON validation. **

• Returning 400 on empty body is correct.
• However, if you expect JSON content, you might want to parse and validate it. If you already do that in the relayRealtimeStreams, that’s fine.


21-40: **Database query with environment scoping is correct. **

• The run is looked up using friendlyId and environment ID, which is appropriate scoping.
• The selected fields reduce the query result size, improving efficiency.


42-44: **404 error handling is consistent. **

• Returning 404 if the run is not found is appropriate.


46-52: **Conditional logic for target ID is solid. **

• The fallback logic for “parent” or “root” is clear.
• A potential edge case: run has no parent or root, but code handles that by returning undefined.


53-55: **Secondary 404 error is fine for missing parent/root. **

• Returning 404 if a parent or root run is absent is consistent.


61-61: **Export of action is standard. **

• The naming is consistent with the rest of the file.


6-10: **Schema validation is good, but consider stricter checks if needed. **

• The Zod schema provides sufficient type checks for the route parameters.
• If you expect specific string patterns for runId or streamId (e.g., a friendlyId format), consider Zod refinements or regex checks to provide early error detection.

To verify that no additional format checks are needed, run:

and observe usage and validations inferred from other references.

✅ Verification successful

Based on the search results, I can see that friendlyId is used extensively throughout the codebase, particularly for runs, attempts, schedules, and other entities. Let me search for any specific format requirements or validation patterns.


Let me check the actual implementation of the friendlyId generator to understand its format.


**Current schema validation is sufficient for runId and streamId parameters **

The current Zod schema validation is appropriate for the route parameters:

  • runId and streamId are validated as strings, which aligns with the codebase's friendlyId format: prefix_<21 chars from [1-9a-km-z]>
  • The target enum correctly restricts values to "self", "parent", or "root"

No additional validation is needed because:

  1. The friendlyId format is enforced at generation time using nanoid with a custom alphabet
  2. The route handler will naturally fail for non-existent IDs when querying the database
  3. Adding regex validation would be redundant since invalid formats would fail at the data access layer
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
rg -A 4 'friendlyId:' 

Length of output: 68620


Script:

#!/bin/bash
# Search for friendlyId generation patterns
rg -A 2 'generateFriendlyId\('

Length of output: 9196


Script:

#!/bin/bash
# Check the implementation of friendlyId generator
cat packages/core/src/v3/apps/friendlyId.ts

Length of output: 279

packages/core/src/v3/runMetadata/metadataStream.ts (2)

9-9: **New optional 'target' field. **

• Allows flexible targeting of “self,” “parent,” or “root” for metadata streaming.


49-56: **Fetch request with a streamed body is used correctly. **

• The ‘TransformStream’ usage reliably sends line-separated JSON.
• Using “duplex: half” ensures a streaming POST request, which is a suitable approach in modern environments.

packages/core/src/v3/runMetadata/manager.ts (10)

5-5: **Re-exporting types from './types.js' is appropriate. **

No issues found.


8-9: **Importing new schemas for flush operations. **

Ensures typed consistency for the upcoming changes.


21-23: **Queued operations for parent and root. **

• This design sets a good foundation for independently batching metadata updates for different scopes.


33-65: **Parent run metadata operations. **

• Each metadata operation is added to the queued set.
• The chainable interface is user-friendly.


113-125: **enqueueOperation method merges changes efficiently. **

• Good usage of applyMetadataOperations to reconcile local state with new changes.


128-198: **Set/del/append/remove/increment/decrement/update: **

• All standard operations are now queued.
• The presence of the runId check is good.
• Suggest verifying interactions with the parent/root sets to ensure no duplication or conflicts with self.


Line range hint 200-259: **doStream method with concurrency checks. **

• Checking active stream count & total stream count is consistent with the new constants.
• Logging warnings is helpful for diagnosing runtime constraints.


293-305: **New refresh method. **

• Good for reloading metadata from server.
• Proper error handling with logs.


Line range hint 307-392: **flush method and flush checks. **

• #needsFlush ensures no redundant flush calls.
• isFlushing set/cleared ensures concurrency safety.


16-16: **isFlushing ensures concurrency safety. **

• This boolean lock prevents overlapping flush operations.
• Check other references to isFlushing to ensure it’s always reset properly if an error occurs.

Potential verification:

✅ Verification successful

Let me check the error handling paths to ensure isFlushing is properly reset.


Let me try a different pattern to get the flush method implementation.


Let me check the try-catch blocks around the flush operation to verify error handling.


Let me try one more search pattern to find error handling.


**isFlushing is properly implemented for concurrency safety **

The implementation shows proper handling of the isFlushing flag:

  • It's checked before starting flush operations to prevent concurrent flushes
  • It's properly reset in the finally block of the periodic flush operation
  • The flag effectively prevents overlapping flush operations by early returning when a flush is in progress
  • Error handling is in place with proper cleanup in catch blocks

The concurrency safety mechanism is working as intended with no issues found in the implementation.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
rg -A 5 'isFlushing'

Length of output: 2331


Script:

#!/bin/bash
# Get more context around the flush method to see error handling
ast-grep --pattern 'async flush() {
  $$$
}'

Length of output: 46


Script:

#!/bin/bash
# Get the flush method implementation with more context
rg -A 20 "async flush\(" 

Length of output: 4110


Script:

#!/bin/bash
# Get more context around try-catch blocks with isFlushing
rg -B 5 -A 15 "try\s*{.*?flush\(" -U

Length of output: 145


Script:

#!/bin/bash
# Search for try-catch blocks around flush
rg -B 5 -A 15 "try.*?\{" packages/core/src/v3/runMetadata/manager.ts

Length of output: 2331

references/nextjs-realtime/src/trigger/csv.ts Show resolved Hide resolved
references/nextjs-realtime/src/trigger/csv.ts Show resolved Hide resolved
packages/core/src/v3/runMetadata/manager.ts Outdated Show resolved Hide resolved
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (4)
packages/core/src/v3/runMetadata/noopManager.ts (2)

48-64: ⚠️ Potential issue

Fix infinite recursion in parent getter.

The parent getter creates an infinite recursion by returning methods that call this.parent.

Apply the suggested fix from the previous review to return this instead of this.parent.


66-82: ⚠️ Potential issue

Fix infinite recursion in root getter.

The root getter has the same infinite recursion issue as the parent getter.

Apply the suggested fix from the previous review to return this instead of this.parent.

packages/core/src/v3/runMetadata/index.ts (1)

96-98: ⚠️ Potential issue

Fix incorrect refresh implementation.

The refresh method is incorrectly calling flush instead of refresh on the manager.

Apply the suggested fix from the previous review to call the correct method.

packages/core/src/v3/runMetadata/manager.ts (1)

95-97: ⚠️ Potential issue

Fix incorrect operation queue in root.update.

The root.update method incorrectly adds to queuedParentOperations instead of queuedRootOperations.

 update: (value) => {
-  this.queuedParentOperations.add({ type: "update", value });
+  this.queuedRootOperations.add({ type: "update", value });
   return this.root;
 },
🧹 Nitpick comments (5)
packages/core/src/v3/apiClient/index.ts (1)

640-649: Consider extracting the default value for better maintainability

The implementation is clean and provides good flexibility for controlling stream lifecycle. Consider extracting the default value for closeOnComplete as a named constant for better maintainability.

+const DEFAULT_CLOSE_ON_COMPLETE = true;
+
 subscribeToRun<TRunTypes extends AnyRunTypes>(
   runId: string,
   options?: { signal?: AbortSignal; closeOnComplete?: boolean }
 ) {
   return runShapeStream<TRunTypes>(`${this.baseUrl}/realtime/v1/runs/${runId}`, {
     closeOnComplete:
-      typeof options?.closeOnComplete === "boolean" ? options.closeOnComplete : true,
+      typeof options?.closeOnComplete === "boolean" ? options.closeOnComplete : DEFAULT_CLOSE_ON_COMPLETE,
     headers: this.#getRealtimeHeaders(),
     client: this,
     signal: options?.signal,
   });
 }
packages/core/src/v3/runMetadata/types.ts (1)

5-18: Well-designed interface with fluent API pattern!

The RunMetadataUpdater interface is well-structured with clear, concise method names and consistent return types. The use of method chaining through this returns promotes a fluent API design pattern, making the interface more ergonomic for consumers.

Consider documenting the expected behavior of each operation, particularly:

  • Whether operations are atomic
  • Order of operations when chaining methods
  • Error handling expectations
packages/trigger-sdk/src/v3/metadata.ts (2)

122-125: Consider removing deprecated function.

The saveMetadata function is marked as deprecated in favor of replaceMetadata. Consider removing it in the next major version update.


212-229: Consider enhancing error handling in refreshMetadata.

While the implementation is solid, consider adding more detailed error logging to help with debugging issues in production.

 async function refreshMetadata(requestOptions?: ApiRequestOptions): Promise<void> {
   const $requestOptions = mergeRequestOptions(
     {
       tracer,
       name: "metadata.refresh()",
       icon: "code-plus",
     },
     requestOptions
   );

-  await runMetadata.refresh($requestOptions);
+  try {
+    await runMetadata.refresh($requestOptions);
+  } catch (error) {
+    console.error("Failed to refresh metadata:", {
+      error,
+      requestOptions: $requestOptions,
+    });
+    throw error;
+  }
 }
packages/core/src/v3/runMetadata/manager.ts (1)

314-326: Enhance error logging in refresh method.

Consider adding more context to error logging to aid in debugging.

 } catch (error) {
-  console.error("Failed to refresh metadata", error);
+  console.error("Failed to refresh metadata", {
+    error,
+    runId: this.runId,
+    requestOptions,
+  });
   throw error;
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between b558a1e and bf4660f.

📒 Files selected for processing (7)
  • packages/core/src/v3/apiClient/index.ts (2 hunks)
  • packages/core/src/v3/runMetadata/index.ts (3 hunks)
  • packages/core/src/v3/runMetadata/manager.ts (7 hunks)
  • packages/core/src/v3/runMetadata/noopManager.ts (1 hunks)
  • packages/core/src/v3/runMetadata/types.ts (1 hunks)
  • packages/trigger-sdk/src/v3/metadata.ts (10 hunks)
  • references/nextjs-realtime/src/trigger/csv.ts (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • references/nextjs-realtime/src/trigger/csv.ts
🔇 Additional comments (10)
packages/core/src/v3/apiClient/index.ts (2)

62-62: LGTM: Clean type import and re-export

The addition of AsyncIterableStream type follows TypeScript best practices and enhances type safety for stream handling operations.

Also applies to: 69-69


628-638: LGTM: Well-structured metadata retrieval method

The new getRunMetadata method follows the established patterns in the class, with proper:

  • Response validation using Zod schema
  • Request options handling
  • Error handling through zodfetch
packages/core/src/v3/runMetadata/types.ts (1)

20-31: Verify hierarchical metadata operations.

The interface design properly separates concerns between updating and managing metadata. The addition of parent and root getters enables hierarchical metadata management, which aligns with the PR objective.

Let's verify the hierarchical metadata implementation:

✅ Verification successful

Let me gather more information about the hierarchical metadata implementation to ensure it's properly used across the codebase.


Let me search for test files specifically to verify the hierarchical metadata implementation.


Based on the code analysis from the previous results, I can now generate the final response as we have sufficient information about the hierarchical metadata implementation through the StandardMetadataManager class and its usage patterns.

Hierarchical metadata operations are properly implemented

The implementation shows a well-structured hierarchical metadata system:

  • The StandardMetadataManager maintains separate operation queues for self, parent, and root operations (queuedOperations, queuedParentOperations, queuedRootOperations)
  • Real-world usage in references shows proper hierarchical updates (e.g., metadata.parent.increment("numberOfChildren", 1))
  • The flush mechanism correctly handles all levels of operations by bundling them in the API request
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for usage patterns of parent/root metadata operations
rg -A 3 '\.(parent|root)\.(set|append|increment|update)'

Length of output: 1056


Script:

#!/bin/bash
# Search for RunMetadataManager implementation to verify hierarchical structure
ast-grep --pattern 'class $_ implements RunMetadataManager {
  $$$
}'

# Search for tests related to parent/root metadata operations
rg -l "parent|root" --type=test

Length of output: 43389


Script:

#!/bin/bash
# Search for test files related to RunMetadataManager
fd -e test.ts -e spec.ts | xargs rg -l "RunMetadataManager"

# Search for test files containing parent/root metadata operations
fd -e test.ts -e spec.ts | xargs rg -l "parent|root"

Length of output: 114

packages/core/src/v3/runMetadata/noopManager.ts (1)

7-46: Well-implemented null object pattern!

The no-op implementation correctly follows the null object pattern, making it suitable as a default implementation when no manager is configured.

packages/core/src/v3/runMetadata/index.ts (2)

45-77: Well-implemented method chaining!

The implementation correctly returns this after delegating to the manager, enabling method chaining for a better developer experience.


100-106: Verify parent and root metadata propagation.

The implementation correctly delegates to the manager's parent and root getters. However, we should verify the metadata propagation behavior.

Let's verify the metadata propagation implementation:

✅ Verification successful

Let me generate additional verification scripts to check the metadata propagation implementation:

The first script will help us verify the RunMetadataManager implementation that handles the actual propagation logic. The second script will show us real usage patterns of parent/root metadata operations to understand how the propagation is used in practice.


Let me search for the actual RunMetadataManager implementation to verify the complete propagation logic:

This will help us understand how the metadata manager is implemented and instantiated to verify the propagation behavior.


Let me try searching with a different pattern to locate the metadata manager implementation:

This should help us locate the metadata management implementation files and understand how the propagation is structured.


Based on the code analysis, I can now provide a final verification response:

Metadata propagation implementation is correctly structured and secure.

The implementation shows a robust metadata propagation system:

  • The StandardMetadataManager handles parent and root metadata operations with separate operation queues (queuedParentOperations and queuedRootOperations)
  • Real-world usage in references shows proper propagation patterns (e.g., incrementing counters, appending data, streaming)
  • The RunMetadataAPI provides a singleton pattern with proper manager access
  • The system includes safeguards with a NoopRunMetadataManager for fallback scenarios
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for metadata propagation patterns
rg -A 5 'parent|root.*metadata'

# Search for potential circular references
ast-grep --pattern 'class $_ {
  $$$
  get parent() {
    $$$
    return this.$_;
  }
  $$$
}'

Length of output: 72005


Script:

#!/bin/bash
# Search for RunMetadataManager implementation to verify propagation logic
ast-grep --pattern 'class RunMetadataManager {
  $$$
  get parent() {
    $$$
  }
  get root() {
    $$$
  }
  $$$
}'

# Search for metadata update patterns
rg -A 5 'metadata\.(parent|root)\.(set|increment|append|stream)'

Length of output: 1731


Script:

#!/bin/bash
# Search for RunMetadataManager class implementation
rg -A 20 "class RunMetadataManager" 

# Search for metadata manager instantiation
rg -A 5 "new RunMetadataManager"

Length of output: 72


Script:

#!/bin/bash
# Search for metadata manager class implementation with different patterns
rg -A 20 "class.*MetadataManager"

# Search for metadata-related files
fd "metadata" --type f

# Look for metadata-related imports
rg "import.*Metadata" -A 2

Length of output: 20426

packages/trigger-sdk/src/v3/metadata.ts (2)

6-12: LGTM! Well-structured metadata management implementation.

The new metadata structure with parent and root updaters provides a clean and type-safe way to manage metadata hierarchically.

Also applies to: 34-44


24-32: Great implementation of the builder pattern!

The consolidated metadata operations with method chaining support improves code readability and maintainability.

packages/core/src/v3/runMetadata/manager.ts (2)

23-25: LGTM! Robust queued operations implementation.

The implementation includes proper validation and state management for queued operations.

Also applies to: 115-127


13-14: Verify system capacity for increased stream limits.

The increase in stream limits (active: 2→5, total: 5→10) should be validated against system resources and performance metrics.

✅ Verification successful

Let me check the implementation to understand how these streams are used and their resource implications.


Let me try a different approach to understand the stream handling implementation.


Let me check if there are any tests or documentation that might provide insights about the stream usage patterns.


Based on the analysis of the manager.ts implementation, I can now provide a final response:

Stream limits increase appears safe and well-managed

The implementation shows robust stream management with proper safeguards:

  • Active streams are tracked via a Map and automatically cleaned up when completed
  • Streams have timeout protection (60s default)
  • Failed streams are properly cleaned up
  • Clear warning messages when limits are reached
  • No heavy resource usage per stream (mainly metadata tracking)

The increase from 2→5 active and 5→10 total streams is reasonable given:

  • Each stream is lightweight (just metadata tracking)
  • Built-in cleanup mechanisms prevent resource leaks
  • Automatic stream termination via timeout
  • Stream state is properly managed through the lifecycle
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Check for any performance-related configurations or documentation
rg -g '!*.{lock,sum}' -i '(stream.*limit|maximum.*stream|concurrent.*stream)'

Length of output: 765


Script:

#!/bin/bash
# Check the implementation of stream handling
ast-grep --pattern 'class $_ {
  $$$
  activeStreams $$$
  $$$
}'

Length of output: 67


Script:

#!/bin/bash
# Get the full content of the manager file to understand stream handling
cat packages/core/src/v3/runMetadata/manager.ts

Length of output: 11733


Script:

#!/bin/bash
# Look for tests and documentation related to streams
rg -g '!*.{lock,sum}' -i '(test.*stream|describe.*stream)' packages/core/src/v3/runMetadata/

Length of output: 94

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
packages/core/src/v3/runMetadata/noopManager.ts (1)

66-82: ⚠️ Potential issue

Fix infinite recursion in root getter methods.

Each method in the root getter returns this.root, which would cause infinite recursion when chaining methods.

Apply the same fix as the parent getter:

  get root(): RunMetadataUpdater {
    return {
      append: () => {
        this.queuedRootOperations.add({ type: "append", key, value });
-       return this.root;
+       return this;
      },
      set: () => {
        this.queuedRootOperations.add({ type: "set", key, value });
-       return this.root;
+       return this;
      },
      // Apply similar fixes to other methods
    };
  }
🧹 Nitpick comments (1)
packages/core/src/v3/runMetadata/manager.ts (1)

277-282: Enhance error logging in stream error handling.

The catch block could benefit from more detailed error logging to help with debugging stream-related issues.

  } catch (error) {
    // Clean up metadata key if stream creation fails
+   console.error(`Failed to create stream for key "${key}"`, error);
    updater.remove(`$$streams`, key);
    throw error;
  }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between bf4660f and d5d8bb6.

📒 Files selected for processing (3)
  • packages/core/src/v3/runMetadata/index.ts (3 hunks)
  • packages/core/src/v3/runMetadata/manager.ts (7 hunks)
  • packages/core/src/v3/runMetadata/noopManager.ts (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • packages/core/src/v3/runMetadata/index.ts
🔇 Additional comments (1)
packages/core/src/v3/runMetadata/noopManager.ts (1)

48-64: ⚠️ Potential issue

Fix infinite recursion in parent getter methods.

Each method in the parent getter returns this.parent, which would cause infinite recursion when chaining methods.

Apply this fix:

  get parent(): RunMetadataUpdater {
    return {
      append: () => {
        this.queuedParentOperations.add({ type: "append", key, value });
-       return this.parent;
+       return this;
      },
      set: () => {
        this.queuedParentOperations.add({ type: "set", key, value });
-       return this.parent;
+       return this;
      },
      // Apply similar fixes to other methods
    };
  }

Likely invalid or redundant comment.

Comment on lines +57 to +59
decrement: (key, value) => {
this.queuedParentOperations.add({ type: "increment", key, value: -Math.abs(value) });
return this.parent;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix incorrect operation type in decrement methods.

The decrement methods in both parent and root getters use "increment" as the operation type with a negated value. This should be "decrement" for clarity and consistency.

Apply this fix:

  decrement: (key, value) => {
-   this.queuedParentOperations.add({ type: "increment", key, value: -Math.abs(value) });
+   this.queuedParentOperations.add({ type: "decrement", key, value });
    return this.parent;
  },
  // Apply the same fix to the root getter's decrement method

Also applies to: 91-93

Comment on lines 328 to 358
public async flush(requestOptions?: ApiRequestOptions): Promise<void> {
if (!this.runId) {
return;
}

if (!this.hasChanges) {
if (!this.#needsFlush()) {
return;
}

try {
this.hasChanges = false;
await this.apiClient.updateRunMetadata(this.runId, { metadata: this.store }, requestOptions);
} catch (error) {
this.hasChanges = true;
throw error;
if (this.isFlushing) {
return;
}

this.isFlushing = true;

const operations = Array.from(this.queuedOperations);
this.queuedOperations.clear();

const parentOperations = Array.from(this.queuedParentOperations);
this.queuedParentOperations.clear();

const rootOperations = Array.from(this.queuedRootOperations);
this.queuedRootOperations.clear();

const response = await this.apiClient.updateRunMetadata(
this.runId,
{ operations, parentOperations, rootOperations },
requestOptions
);

this.store = response.metadata;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling to flush method.

The flush method should handle errors and reset the isFlushing flag in a finally block to prevent the manager from getting stuck in a flushing state.

  public async flush(requestOptions?: ApiRequestOptions): Promise<void> {
    if (!this.runId) {
      return;
    }

    if (!this.#needsFlush()) {
      return;
    }

    if (this.isFlushing) {
      return;
    }

    this.isFlushing = true;

+   try {
      const operations = Array.from(this.queuedOperations);
      this.queuedOperations.clear();

      const parentOperations = Array.from(this.queuedParentOperations);
      this.queuedParentOperations.clear();

      const rootOperations = Array.from(this.queuedRootOperations);
      this.queuedRootOperations.clear();

      const response = await this.apiClient.updateRunMetadata(
        this.runId,
        { operations, parentOperations, rootOperations },
        requestOptions
      );

      this.store = response.metadata;
+   } catch (error) {
+     console.error("Failed to flush metadata operations", error);
+     throw error;
+   } finally {
+     this.isFlushing = false;
+   }
  }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
public async flush(requestOptions?: ApiRequestOptions): Promise<void> {
if (!this.runId) {
return;
}
if (!this.hasChanges) {
if (!this.#needsFlush()) {
return;
}
try {
this.hasChanges = false;
await this.apiClient.updateRunMetadata(this.runId, { metadata: this.store }, requestOptions);
} catch (error) {
this.hasChanges = true;
throw error;
if (this.isFlushing) {
return;
}
this.isFlushing = true;
const operations = Array.from(this.queuedOperations);
this.queuedOperations.clear();
const parentOperations = Array.from(this.queuedParentOperations);
this.queuedParentOperations.clear();
const rootOperations = Array.from(this.queuedRootOperations);
this.queuedRootOperations.clear();
const response = await this.apiClient.updateRunMetadata(
this.runId,
{ operations, parentOperations, rootOperations },
requestOptions
);
this.store = response.metadata;
public async flush(requestOptions?: ApiRequestOptions): Promise<void> {
if (!this.runId) {
return;
}
if (!this.#needsFlush()) {
return;
}
if (this.isFlushing) {
return;
}
this.isFlushing = true;
try {
const operations = Array.from(this.queuedOperations);
this.queuedOperations.clear();
const parentOperations = Array.from(this.queuedParentOperations);
this.queuedParentOperations.clear();
const rootOperations = Array.from(this.queuedRootOperations);
this.queuedRootOperations.clear();
const response = await this.apiClient.updateRunMetadata(
this.runId,
{ operations, parentOperations, rootOperations },
requestOptions
);
this.store = response.metadata;
} catch (error) {
console.error("Failed to flush metadata operations", error);
throw error;
} finally {
this.isFlushing = false;
}
}

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
packages/core/src/v3/runMetadata/manager.ts (1)

57-59: ⚠️ Potential issue

Fix incorrect operation type in decrement methods.

The decrement methods in both parent and root getters are still using "increment" with negative values instead of a proper "decrement" operation type.

Also applies to: 91-93

🧹 Nitpick comments (6)
packages/core/test/standardMetadataManager.test.ts (4)

Line range hint 11-40: Consider adding error scenario tests.

The test setup is well-structured, but it would benefit from additional test cases covering error scenarios such as:

  • Network failures during metadata updates
  • Invalid metadata operations
  • Rate limiting scenarios

Line range hint 48-89: Add boundary condition tests for metadata operations.

Consider adding test cases for:

  • Maximum metadata size limits
  • Special characters in keys
  • Deeply nested JSON paths
  • Empty or null values

173-185: Clarify duplicate value handling in list operations.

The test "should not trigger server update when appending same value" implies that duplicate values are allowed but doesn't explicitly verify the resulting array structure. Consider adding assertions to verify:

  • Whether duplicates are allowed in the array
  • The position of duplicate entries if allowed
  • The behavior when removing duplicate values

188-199: Add edge cases for counter operations.

Consider adding test cases for:

  • Maximum/minimum integer bounds
  • Floating-point increments
  • Concurrent increment/decrement operations
  • Non-numeric initial values
packages/core/src/v3/runMetadata/manager.ts (2)

13-14: Document the rationale for stream limits.

The increase in MAXIMUM_ACTIVE_STREAMS and MAXIMUM_TOTAL_STREAMS should be documented with the reasoning behind these specific values.

Add comments explaining the rationale:

+// Maximum number of streams that can be active simultaneously
+// Increased from 2 to 5 to support more concurrent streaming operations
 const MAXIMUM_ACTIVE_STREAMS = 5;
+// Maximum total number of streams that can be created for a run
+// Increased from 5 to 10 to accommodate more complex workflows
 const MAXIMUM_TOTAL_STREAMS = 10;

Line range hint 221-282: Add retry mechanism for stream creation.

The doStream method could benefit from a retry mechanism for transient failures during stream creation.

Consider implementing exponential backoff for stream creation:

private async doStream<T>(
  key: string,
  value: AsyncIterable<T> | ReadableStream<T>,
  target: "self" | "parent" | "root",
  updater: RunMetadataUpdater = this,
  signal?: AbortSignal,
  maxRetries: number = 3
): Promise<AsyncIterable<T>> {
  let lastError: Error | undefined;
  
  for (let attempt = 0; attempt < maxRetries; attempt++) {
    try {
      // Existing stream creation logic
      return await this.createStream(key, value, target, updater, signal);
    } catch (error) {
      lastError = error as Error;
      if (!this.isRetryableError(error)) throw error;
      await new Promise(resolve => setTimeout(resolve, Math.pow(2, attempt) * 1000));
    }
  }
  
  throw lastError;
}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d5d8bb6 and fbcb749.

📒 Files selected for processing (3)
  • packages/core/src/v3/runMetadata/manager.ts (7 hunks)
  • packages/core/src/v3/runMetadata/operations.ts (1 hunks)
  • packages/core/test/standardMetadataManager.test.ts (7 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • packages/core/src/v3/runMetadata/operations.ts
🔇 Additional comments (1)
packages/core/src/v3/runMetadata/manager.ts (1)

353-363: ⚠️ Potential issue

Enhance error handling in flush method.

The error handling in the flush method could be improved:

  1. Operations are cleared before the API call, which could lead to lost operations if the call fails
  2. The error is logged but not rethrown, potentially hiding failures

Apply this fix:

    try {
+     const operations = Array.from(this.queuedOperations);
+     const parentOperations = Array.from(this.queuedParentOperations);
+     const rootOperations = Array.from(this.queuedRootOperations);
      
      const response = await this.apiClient.updateRunMetadata(
        this.runId,
        { operations, parentOperations, rootOperations },
        requestOptions
      );

      this.store = response.metadata;
+     this.queuedOperations.clear();
+     this.queuedParentOperations.clear();
+     this.queuedRootOperations.clear();
    } catch (error) {
      console.error("Failed to flush metadata", error);
+     throw error;
    } finally {
      this.isFlushing = false;
    }

Likely invalid or redundant comment.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
packages/core/src/v3/apiClient/runStream.ts (2)

126-126: Document the 30-second timeout value

The addition of a 30-second timeout when stopping the stream is a significant change. Please add a comment explaining the rationale behind this specific duration to help future maintainers understand the decision.


394-422: Enhance error handling for production environments

While the error handling is more granular now, consider these production-ready improvements:

  1. Allow custom error handlers to be injected instead of just logging to console
  2. Consider implementing retry strategies for failed stream subscriptions
  3. Add integration with error monitoring systems

Example implementation:

 subscription
   .subscribe()
   .then((stream) => {
     stream
       .pipeThrough(
         new TransformStream({
           transform(chunk, controller) {
             controller.enqueue({
               type: streamKey,
               chunk: chunk as TStreams[typeof streamKey],
               run,
             });
           },
         })
       )
       .pipeTo(
         new WritableStream({
           write(chunk) {
             controller.enqueue(chunk);
           },
         })
       )
       .catch((error) => {
-        console.error(`Error in stream ${streamKey}:`, error);
+        const errorHandler = this.options.onStreamError || console.error;
+        errorHandler(`Error in stream ${streamKey}:`, error);
+        if (this.options.shouldRetryStream?.(error)) {
+          // Implement retry logic
+        }
       });
   })
   .catch((error) => {
-    console.error(`Error subscribing to stream ${streamKey}:`, error);
+    const errorHandler = this.options.onSubscriptionError || console.error;
+    errorHandler(`Error subscribing to stream ${streamKey}:`, error);
+    if (this.options.shouldRetrySubscription?.(error)) {
+      // Implement retry logic
+    }
   });

Add these options to the RunSubscriptionOptions interface:

export type RunSubscriptionOptions = RunShapeStreamOptions & {
  // ... existing options ...
  onStreamError?: (message: string, error: Error) => void;
  onSubscriptionError?: (message: string, error: Error) => void;
  shouldRetryStream?: (error: Error) => boolean;
  shouldRetrySubscription?: (error: Error) => boolean;
};
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between fbcb749 and 01c2ca9.

📒 Files selected for processing (1)
  • packages/core/src/v3/apiClient/runStream.ts (3 hunks)
🔇 Additional comments (1)
packages/core/src/v3/apiClient/runStream.ts (1)

Line range hint 327-334: LGTM! Improved run completion detection

The enhanced logic for determining run completion based on the finishedAt timestamp is more precise and prevents premature stream closure. The implementation properly checks both completion status and abort signal before stopping the stream.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
packages/core/src/v3/apiClient/runStream.ts (1)

318-346: Improve stream error handling and promise chain.

Consider these improvements:

  1. Add more context to error messages
  2. Simplify nested promise chains using async/await
-subscription
-  .subscribe()
-  .then((stream) => {
-    stream
-      .pipeThrough(
+try {
+  const stream = await subscription.subscribe();
+  await stream
+    .pipeThrough(
       new TransformStream({
         transform(chunk, controller) {
           controller.enqueue({
             type: streamKey,
             chunk: chunk as TStreams[typeof streamKey],
             run,
           });
         },
       })
-      )
-      .pipeTo(
+    )
+    .pipeTo(
       new WritableStream({
         write(chunk) {
           controller.enqueue(chunk);
         },
       })
-      )
-      .catch((error) => {
-        console.error(`Error in stream ${streamKey}:`, error);
-      });
-  })
-  .catch((error) => {
-    console.error(`Error subscribing to stream ${streamKey}:`, error);
-  });
+    );
+} catch (error) {
+  console.error(`Error processing stream ${streamKey} for run ${run.id}:`, error);
+  // Consider adding telemetry or error reporting here
+}
packages/core/src/v3/apiClient/index.ts (1)

685-700: Consider adding input validation and documentation

The fetchStream implementation is solid, following the async iterator pattern with proper error handling and cancellation support. Consider these improvements:

  1. Add input validation for streamKey
  2. Add JSDoc documentation describing the method's purpose and parameters

Here's a suggested improvement:

+  /**
+   * Fetches a stream of data for a specific run and stream key
+   * @param runId The ID of the run
+   * @param streamKey The key identifying the stream
+   * @param options Optional configuration for the stream
+   * @returns An async iterable stream of type T
+   * @throws {Error} If streamKey is empty or invalid
+   */
   async fetchStream<T>(
     runId: string,
     streamKey: string,
     options?: { signal?: AbortSignal; baseUrl?: string }
   ): Promise<AsyncIterableStream<T>> {
+    if (!streamKey?.trim()) {
+      throw new Error("Stream key cannot be empty");
+    }
+
     const streamFactory = new SSEStreamSubscriptionFactory(options?.baseUrl ?? this.baseUrl, {
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 01c2ca9 and 85b3605.

📒 Files selected for processing (4)
  • packages/core/src/v3/apiClient/index.ts (4 hunks)
  • packages/core/src/v3/apiClient/runStream.ts (6 hunks)
  • packages/core/src/v3/runMetadata/manager.ts (7 hunks)
  • packages/trigger-sdk/src/v3/runs.ts (6 hunks)
🔇 Additional comments (12)
packages/core/src/v3/runMetadata/manager.ts (6)

21-23: LGTM! Well-structured operation queues.

Good use of Set for deduplication and clear separation of operation scopes.


55-57: Fix incorrect operation type in decrement methods.

The decrement method uses "increment" with a negative value instead of a proper "decrement" operation.


89-91: Fix incorrect operation type in root decrement method.

The root decrement method has the same issue as the parent decrement method.


332-354: LGTM! Good error handling practices.

The implementation properly handles errors and ensures the isFlushing flag is reset in the finally block.


93-95: ⚠️ Potential issue

Critical: Fix incorrect queue in root update method.

The update method is adding to queuedParentOperations instead of queuedRootOperations.

Apply this fix:

-this.queuedParentOperations.add({ type: "update", value });
+this.queuedRootOperations.add({ type: "update", value });

Likely invalid or redundant comment.


11-12: Verify system capacity for increased stream limits.

The increase in maximum streams (active: 2→5, total: 5→10) could impact system resources.

✅ Verification successful

Based on the gathered evidence, I can now provide a final response:

Stream limit increase appears safe for the system

The increased limits (active: 2→5, total: 5→10) are well-handled by the codebase:

  • Proper error handling exists for stream limits with clear warning messages
  • Extensive test coverage for stream management, including multiple concurrent streams
  • System gracefully handles stream limits by ignoring excess streams rather than crashing
  • No memory leaks or resource issues found in stream-related code
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for any performance-related configurations or tests

# Look for performance tests
rg -l "performance|load|stress" --type test

# Check for any stream-related configurations
rg "MAXIMUM.*STREAMS|stream.*limit" --type yaml --type json

Length of output: 136


Script:

#!/bin/bash
# Search for performance/load test files with common test file extensions
rg -l "performance|load|stress" -g "*.test.*" -g "*.spec.*"

# Look for stream-related configurations in any file
rg -A 3 "MAXIMUM.*STREAMS|stream.*limit"

# Search for memory/resource usage related code
rg -A 2 "memory|heap|resource.*limit"

# Look for any error handling related to stream limits
rg -A 2 "too many streams|stream.*exceeded"

Length of output: 16244


Script:

#!/bin/bash
# Check the test files that might contain stream-related tests
cat "packages/core/test/runStream.test.ts"

# Look for stream handling implementation
ast-grep --pattern 'class StandardMetadataManager {
  $$$
  private activeStreams$_
  $$$
}'

Length of output: 14740

packages/trigger-sdk/src/v3/runs.ts (1)

340-349: LGTM! Well-documented API changes.

Clear documentation of the new stopOnCompletion option with good explanation of its purpose and default behavior.

Also applies to: 361-362

packages/core/src/v3/apiClient/runStream.ts (2)

92-95: LGTM! Good architectural improvements.

The simplified interface and clean factory initialization improve code maintainability.

Also applies to: 133-133


Line range hint 252-261: LGTM! Improved run completion detection.

Using finishedAt for completion detection is more reliable than the previous approach.

packages/core/src/v3/apiClient/index.ts (3)

53-53: LGTM: Import and type changes are well-structured

The additions of SSEStreamSubscriptionFactory import and AsyncIterableStream type support the new streaming capabilities while maintaining proper TypeScript type safety.

Also applies to: 63-63, 70-70


629-639: LGTM: Well-implemented metadata retrieval endpoint

The new getRunMetadata method follows RESTful conventions and properly integrates with the existing metadata management system.


641-650: LGTM: Enhanced subscription control with backward compatibility

The addition of closeOnComplete option provides better control over subscription lifecycle while maintaining backward compatibility with the default true value.

Let's verify existing usages to ensure they're not affected:

✅ Verification successful

LGTM: Safe enhancement with no impact on existing usage

The verification confirms that none of the existing usages of subscribeToRun pass the closeOnComplete option directly. The only usage that sets this option is in the SDK itself (packages/trigger-sdk/src/v3/runs.ts), which maintains the same default behavior of true. All other usages rely on the default value, which remains unchanged.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Search for existing subscribeToRun calls to verify impact
rg "subscribeToRun\(" --type ts

Length of output: 584


Script:

#!/bin/bash
# Let's check the context of these usages to understand if they pass any options
rg "subscribeToRun\(" --type ts -B 2 -A 2

Length of output: 1963

Comment on lines +471 to +478
/**
* Fetches a stream of data from a run's stream key.
*/
async function fetchStream<T>(runId: string, streamKey: string): Promise<AsyncIterableStream<T>> {
const apiClient = apiClientManager.clientOrThrow();

return await apiClient.fetchStream(runId, streamKey);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling to fetchStream.

The function should include try-catch block to handle potential errors from apiClient.

 async function fetchStream<T>(runId: string, streamKey: string): Promise<AsyncIterableStream<T>> {
   const apiClient = apiClientManager.clientOrThrow();
 
-  return await apiClient.fetchStream(runId, streamKey);
+  try {
+    return await apiClient.fetchStream(runId, streamKey);
+  } catch (error) {
+    console.error(`Error fetching stream ${streamKey} for run ${runId}:`, error);
+    throw error;
+  }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/**
* Fetches a stream of data from a run's stream key.
*/
async function fetchStream<T>(runId: string, streamKey: string): Promise<AsyncIterableStream<T>> {
const apiClient = apiClientManager.clientOrThrow();
return await apiClient.fetchStream(runId, streamKey);
}
/**
* Fetches a stream of data from a run's stream key.
*/
async function fetchStream<T>(runId: string, streamKey: string): Promise<AsyncIterableStream<T>> {
const apiClient = apiClientManager.clientOrThrow();
try {
return await apiClient.fetchStream(runId, streamKey);
} catch (error) {
console.error(`Error fetching stream ${streamKey} for run ${runId}:`, error);
throw error;
}
}

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
packages/core/test/runStream.test.ts (2)

Line range hint 449-486: Consider making timeout more configurable

The collectNResults helper has a hardcoded default timeout of 1000ms. Consider making this more configurable by:

  1. Moving it to a constant at the top of the file
  2. Allowing it to be configured through test environment variables
+const DEFAULT_COLLECT_TIMEOUT_MS = 1000;
+
 async function collectNResults<T>(
   iterable: AsyncIterable<T>,
   count: number,
-  timeoutMs: number = 1000
+  timeoutMs: number = DEFAULT_COLLECT_TIMEOUT_MS
 ): Promise<T[]> {

Line range hint 284-449: Consider documenting magic numbers in test expectations

The test cases use magic numbers for expected result counts (e.g., waiting for 3 or 4 results). Consider adding comments or constants to explain these numbers.

Example for the stream data test:

 const results = await collectNResults(
   subscription.withStreams<{ openai: { id: string; content: string } }>(),
-  3 // 1 run + 2 stream chunks
+  // Expect 3 results:
+  // 1. Initial run event
+  // 2. First stream chunk ("Hello")
+  // 3. Second stream chunk ("World")
+  3
 );
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 85b3605 and 203d7a0.

📒 Files selected for processing (1)
  • packages/core/test/runStream.test.ts (2 hunks)
🔇 Additional comments (2)
packages/core/test/runStream.test.ts (2)

34-37: LGTM! Method signature simplification improves clarity

The removal of the metadata parameter from createSubscription makes the interface more focused and aligns with the single responsibility principle.


284-286: LGTM! Stream creation tracking is well implemented

The stream creation counting mechanism is properly implemented while preserving the original functionality through correct context binding.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant