-
-
Notifications
You must be signed in to change notification settings - Fork 609
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Upgrades and fixes to Realtime and Realtime streams #1549
Conversation
🦋 Changeset detectedLatest commit: b58a646 The changes in this PR will be included in the next version bump. This PR includes changesets to release 12 packages
Not sure what this means? Click here to learn what changesets are. Click here if you're a maintainer who wants to add another changeset to this PR |
Warning Rate limit exceeded@ericallam has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 16 minutes and 42 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📒 Files selected for processing (2)
WalkthroughThis pull request introduces significant updates across various components of the project, primarily focusing on enhancing the functionality of the streaming SDK and improving data ingestion methods. Key changes include the introduction of the Changes
Possibly related PRs
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (3)
apps/webapp/app/services/realtime/databaseRealtimeStreams.server.ts (1)
57-63
: Consider batch processing for better performanceThe switch from batch processing to individual inserts might impact performance when dealing with high-volume streams. Consider using a buffering strategy to batch inserts while maintaining real-time characteristics.
-await this.options.prisma.realtimeStreamChunk.create({ - data: { - runId, - key: streamId, - sequence: sequence++, - value, - }, -}); +const BATCH_SIZE = 100; +let buffer = []; +buffer.push({ + runId, + key: streamId, + sequence: sequence++, + value, +}); + +if (buffer.length >= BATCH_SIZE) { + await this.options.prisma.realtimeStreamChunk.createMany({ + data: buffer, + }); + buffer = []; +}packages/core/src/v3/apiClient/stream.ts (1)
225-231
: Consider reducing debug logging in productionThe extensive console logging might impact performance in production. Consider using a debug flag or environment variable to control logging.
-console.log("LineTransformStream", { - chunk, - lines, - fullLines, - buffer: this.buffer, - streamId, -}); +if (process.env.DEBUG_STREAMS === 'true') { + console.log("LineTransformStream", { + chunk, + lines, + fullLines, + buffer: this.buffer, + streamId, + }); +}packages/core/src/v3/apiClient/runStream.ts (1)
217-238
: Consider optimizing the transform pipelineThe current implementation has multiple transform stages with logging, which might impact performance. Consider:
- Combining transforms where possible
- Making logging conditional
return zodShapeStream(SubscribeRealtimeStreamChunkRawShape, this.url, this.options) .pipeThrough( new TransformStream({ transform(chunk, controller) { - console.log("ElectricStreamSubscription chunk.value", chunk.value); + if (process.env.DEBUG_STREAMS === 'true') { + console.log("ElectricStreamSubscription chunk.value", chunk.value); + } controller.enqueue(chunk.value); }, }) ) .pipeThrough(new LineTransformStream(this.url)) .pipeThrough( new TransformStream({ transform(chunk, controller) { for (const line of chunk) { - console.log("ElectricStreamSubscription line", line); + if (process.env.DEBUG_STREAMS === 'true') { + console.log("ElectricStreamSubscription line", line); + } controller.enqueue(safeParseJSON(line)); } }, }) );
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (4)
.changeset/rude-walls-help.md
(1 hunks)apps/webapp/app/services/realtime/databaseRealtimeStreams.server.ts
(2 hunks)packages/core/src/v3/apiClient/runStream.ts
(2 hunks)packages/core/src/v3/apiClient/stream.ts
(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- .changeset/rude-walls-help.md
🔇 Additional comments (2)
apps/webapp/app/services/realtime/databaseRealtimeStreams.server.ts (1)
47-49
: LGTM: Stream termination condition is now more explicit
The simplified condition if (done || !value)
properly handles both stream completion and empty chunks.
packages/core/src/v3/apiClient/stream.ts (1)
238-244
: LGTM: Proper handling of incomplete data
The flush implementation correctly handles any remaining data in the buffer when the stream ends.
for (const line of chunk) { | ||
console.log("ElectricStreamSubscription line", line); | ||
|
||
controller.enqueue(safeParseJSON(line)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Consider handling JSON parse errors explicitly
The safeParseJSON
function silently returns the raw string on parse errors. Consider adding explicit error handling for malformed JSON.
-controller.enqueue(safeParseJSON(line));
+try {
+ const parsed = safeParseJSON(line);
+ if (typeof parsed === 'string' && line !== parsed) {
+ console.warn(`Failed to parse JSON for line: ${line}`);
+ }
+ controller.enqueue(parsed);
+} catch (error) {
+ console.error(`Error processing line: ${line}`, error);
+}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
controller.enqueue(safeParseJSON(line)); | |
try { | |
const parsed = safeParseJSON(line); | |
if (typeof parsed === 'string' && line !== parsed) { | |
console.warn(`Failed to parse JSON for line: ${line}`); | |
} | |
controller.enqueue(parsed); | |
} catch (error) { | |
console.error(`Error processing line: ${line}`, error); | |
} |
@trigger.dev/build
@trigger.dev/react-hooks
@trigger.dev/sdk
@trigger.dev/rsc
trigger.dev
@trigger.dev/core
commit: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Outside diff range and nitpick comments (7)
apps/webapp/server.ts (1)
84-95
: Consider adding request logging for blocked endpoints.The middleware implementation correctly restricts access to realtime endpoints while maintaining the healthcheck endpoint. However, adding logging for blocked requests would help with monitoring and debugging.
Consider adding logging like this:
if (!req.url.startsWith("/realtime") && req.url !== "/healthcheck") { + console.log(`Blocked request to: ${req.method} ${req.url}`); res.status(404).send("Not Found"); return; }
apps/webapp/test/authorizationRateLimitMiddleware.test.ts (2)
Line range hint
3-5
: Consider documenting test timeout rationaleThe 30-second timeout is set globally for all tests. Consider:
- Adding a comment explaining why 30 seconds was chosen
- Using per-test timeouts for long-running tests
- Documenting which tests might approach this timeout
Example documentation:
+ // Set a longer timeout for rate limiting tests as they include + // multiple sleep operations to test time-based behaviors vi.setConfig({ testTimeout: 30_000 }); // 30 seconds timeout
Line range hint
171-315
: Consider adding tests for concurrent scenariosWhile the test suite is comprehensive, consider adding these scenarios:
- Concurrent requests hitting the rate limiter simultaneously
- Redis connection errors/timeouts
- Race conditions in token bucket refill
Example test structure:
redisTest("should handle concurrent requests correctly", async ({ redis }) => { // Setup rate limiter with small window // Fire multiple requests simultaneously using Promise.all // Verify correct number of requests succeeded/failed }); redisTest("should handle Redis errors gracefully", async ({ redis }) => { // Setup rate limiter // Simulate Redis connection issues // Verify fallback behavior });packages/core/src/v3/runMetadata/metadataStream.ts (2)
68-69
: Handle errors in async iterator returned by[Symbol.asyncIterator]
.Errors occurring in the
consumerStream
may not be propagated to the consumer of the iterator.Ensure that errors from the stream are properly caught and forwarded.
72-81
: Manage cancellation instreamToAsyncIterator
.When the consumer stops iteration early, the underlying stream reader should be canceled to prevent resource leaks.
Modify the function to handle cancellation:
async function* streamToAsyncIterator<T>(stream: ReadableStream<T>): AsyncIterableIterator<T> { const reader = stream.getReader(); try { while (true) { const { done, value } = await reader.read(); if (done) return; yield value; } } catch (error) { // Optionally handle errors from reader throw error; } finally { + await reader.cancel(); reader.releaseLock(); } }
packages/core/src/v3/runMetadata/manager.ts (2)
Line range hint
232-243
: Inconsistent parameter naming instream
method.The parameter
value
was renamed tosource
in the method signature butvalue
is still used within the method. This inconsistency can lead to errors.Apply this diff to ensure consistent naming:
public async stream<T>( key: string, - value: AsyncIterable<T> | ReadableStream<T>, + source: AsyncIterable<T> | ReadableStream<T>, signal?: AbortSignal ): Promise<AsyncIterable<T>> { - const $value = value as AsyncIterable<T>; + const $source = source as AsyncIterable<T>; if (!this.runId) { - return $value; + return $source; } // ... existing code ... const streamInstance = new MetadataStream({ key, runId: this.runId, - source: $value, + source: $source, baseUrl: this.streamsBaseUrl, headers: this.apiClient.getHeaders(), signal, version: this.streamsVersion, });
236-236
: Redundant setting of$$streamsBaseUrl
in metadata.Setting
$$streamsBaseUrl
every time a new stream is created might be unnecessary if it doesn't change between streams.Consider setting
$$streamsBaseUrl
once during initialization rather than on every stream creation.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (7)
apps/webapp/server.ts
(1 hunks)apps/webapp/test/authorizationRateLimitMiddleware.test.ts
(1 hunks)packages/core/src/v3/runMetadata/manager.ts
(1 hunks)packages/core/src/v3/runMetadata/metadataStream.ts
(2 hunks)references/nextjs-realtime/package.json
(1 hunks)references/nextjs-realtime/src/components/TriggerButtonWithStreaming.tsx
(0 hunks)references/nextjs-realtime/src/trigger/ai.ts
(0 hunks)
💤 Files with no reviewable changes (2)
- references/nextjs-realtime/src/trigger/ai.ts
- references/nextjs-realtime/src/components/TriggerButtonWithStreaming.tsx
🧰 Additional context used
🪛 Biome (1.9.4)
packages/core/src/v3/runMetadata/metadataStream.ts
[error] 15-15: void is confusing inside a union type.
Unsafe fix: Use undefined instead.
(lint/suspicious/noConfusingVoidType)
🔇 Additional comments (2)
references/nextjs-realtime/package.json (1)
10-11
: LGTM! Deployment script added correctly.
The new deploy
script is properly added and aligns with the existing trigger.dev CLI usage pattern.
Let's verify the trigger.dev CLI installation and version compatibility:
✅ Verification successful
Trigger.dev CLI and SDK versions are properly aligned
The verification confirms that both the CLI (trigger.dev
) and SDK (@trigger.dev/sdk
) packages are using the same workspace version ^3
, ensuring compatibility for the deployment script to work correctly.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify trigger.dev CLI installation and version compatibility
# Expected: The CLI package should be listed in devDependencies with a matching version
# Check if trigger.dev CLI package exists in package.json
jq '.devDependencies["trigger.dev"]' references/nextjs-realtime/package.json
# Check for any potential version mismatches between SDK and CLI
jq '[.dependencies["@trigger.dev/sdk"], .devDependencies["trigger.dev"]]' references/nextjs-realtime/package.json
Length of output: 248
apps/webapp/test/authorizationRateLimitMiddleware.test.ts (1)
18-18
: Consider alternatives to skipping tests in CI
Skipping these tests in GitHub Actions could lead to reduced test coverage in CI. Consider these alternatives:
- Set up Redis in GitHub Actions using service containers
- Mock Redis for CI environment
- Create a separate test suite for Redis-dependent tests
Let's check if other test files have similar patterns:
✅ Verification successful
Tests are intentionally skipped in CI due to Redis dependency
The skip pattern is consistent across Redis-dependent test files. The test file shows comprehensive Redis integration tests that:
- Use
@internal/testcontainers
for Redis setup - Test various rate limiting scenarios including token bucket, fixed window, and sliding window
- Verify edge cases and caching behavior
Since these tests require a Redis instance and are properly tested in a containerized environment locally, skipping them in CI is a valid approach. No changes needed.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for other test files that might be skipped in CI
rg -l "skipIf.*GITHUB_ACTIONS" "test/"
# Check if Redis is used in other test files
rg -l "redisTest" "test/"
Length of output: 154
Script:
#!/bin/bash
# Search for test files with skipIf and GITHUB_ACTIONS from repository root
fd -e test.ts -e test.js | xargs rg -l "skipIf.*GITHUB_ACTIONS"
# Check Redis usage in test files
fd -e test.ts -e test.js | xargs rg -l "redis"
# Look at the actual test file to understand Redis setup
fd "authorizationRateLimitMiddleware.test.ts" -x cat
Length of output: 13980
private initializeServerStream(): Promise<Response> { | ||
const serverStream = this.serverStream.pipeThrough( | ||
new TransformStream<T, string>({ | ||
async transform(chunk, controller) { | ||
controller.enqueue(JSON.stringify(chunk) + "\n"); | ||
}, | ||
}) | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Ensure proper error handling in initializeServerStream
.
The fetch
request may fail or throw an exception, which isn't currently handled. This could lead to unhandled promise rejections.
Consider wrapping the fetch
call in a try-catch
block or handling errors appropriately.
+try {
return fetch(
`${this.options.baseUrl}/realtime/${this.options.version ?? "v1"}/streams/${
this.options.runId
}/${this.options.key}`,
{
method: "POST",
headers: this.options.headers ?? {},
body: serverStream,
// @ts-expect-error
duplex: "half",
signal: this.controller.signal,
}
);
+} catch (error) {
+ // Handle error, possibly by rejecting the promise or logging
+ console.error('Failed to initialize server stream:', error);
+ throw error;
+}
Committable suggestion skipped: line range outside the PR's diff.
private serverIterator: AsyncIterator<T>; | ||
private consumerIterator: AsyncIterator<T>; | ||
private serverStream: ReadableStream<T>; | ||
private consumerStream: ReadableStream<T>; | ||
private streamPromise: Promise<void | Response>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Avoid using 'void' in union types for clarity.
Using void
in a union type can be confusing. It is better to use undefined
instead to represent the absence of a value.
Apply this diff to improve type clarity:
-private streamPromise: Promise<void | Response>;
+private streamPromise: Promise<Response | undefined>;
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
private streamPromise: Promise<void | Response>; | |
private streamPromise: Promise<Response | undefined>; |
🧰 Tools
🪛 Biome (1.9.4)
[error] 15-15: void is confusing inside a union type.
Unsafe fix: Use undefined instead.
(lint/suspicious/noConfusingVoidType)
private createTeeStreams() { | ||
const readableSource = new ReadableStream<T>({ | ||
start: async (controller) => { | ||
for await (const value of this.options.source) { | ||
controller.enqueue(value); | ||
} | ||
return queue.shift()!; | ||
|
||
controller.close(); | ||
}, | ||
}); | ||
|
||
return { | ||
serverIterator: teeIterator(this.serverQueue), | ||
consumerIterator: teeIterator(this.consumerQueue), | ||
}; | ||
return readableSource.tee(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Handle errors during stream creation in createTeeStreams
.
If an error occurs while iterating over this.options.source
, it may not be properly propagated, leading to unhandled exceptions. Consider adding error handling to manage exceptions during iteration.
You can modify the start
method to catch and forward errors:
const readableSource = new ReadableStream<T>({
start: async (controller) => {
+ try {
for await (const value of this.options.source) {
controller.enqueue(value);
}
-
controller.close();
+ } catch (error) {
+ controller.error(error);
+ } finally {
+ controller.close();
+ }
},
});
Committable suggestion skipped: line range outside the PR's diff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (2)
apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts (2)
186-207
: Consider Using Promises for Buffer Waiting MechanismInstead of using
setTimeout
with polling in thewaitForBuffer
method, consider utilizingPromises
orasync/await
with event-based notification to wait for the buffer's availability. This can improve efficiency by reducing unnecessary polling and can make the code more readable.
215-221
: Explicitly Set Default Values for Buffer Waiting OptionsIn the
initializeRelayRealtimeStreams
function, thewaitForBufferTimeout
andwaitForBufferInterval
options are not specified. Explicitly setting these default values can improve clarity and maintainability.Apply this diff to specify the default options:
function initializeRelayRealtimeStreams() { return new RelayRealtimeStreams({ ttl: 1000 * 60 * 5, // 5 minutes fallbackIngestor: v1RealtimeStreams, fallbackResponder: v1RealtimeStreams, + waitForBufferTimeout: 5000, // Default timeout + waitForBufferInterval: 50, // Default interval }); }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
pnpm-lock.yaml
is excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (12)
apps/webapp/app/presenters/v3/ApiRunListPresenter.server.ts
(1 hunks)apps/webapp/app/presenters/v3/SpanPresenter.server.ts
(1 hunks)apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts
(5 hunks)apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts
(1 hunks)apps/webapp/app/services/realtime/utils.server.ts
(1 hunks)docker/docker-compose.yml
(1 hunks)internal-packages/testcontainers/src/utils.ts
(1 hunks)packages/core/package.json
(1 hunks)packages/core/src/v3/apiClient/runStream.ts
(3 hunks)packages/core/src/v3/apiClient/stream.ts
(2 hunks)packages/core/src/v3/runMetadata/manager.ts
(2 hunks)packages/core/src/v3/utils/ioSerialization.ts
(2 hunks)
✅ Files skipped from review due to trivial changes (1)
- apps/webapp/app/presenters/v3/SpanPresenter.server.ts
🚧 Files skipped from review as they are similar to previous changes (1)
- packages/core/src/v3/runMetadata/manager.ts
👮 Files not reviewed due to content moderation or server errors (3)
- apps/webapp/app/presenters/v3/ApiRunListPresenter.server.ts
- packages/core/src/v3/utils/ioSerialization.ts
- packages/core/src/v3/apiClient/runStream.ts
🔇 Additional comments (10)
packages/core/package.json (1)
185-185
: Verify beta version stability and breaking changes
Upgrading to a beta version (1.0.0-beta.1
) of @electric-sql/client
may introduce breaking changes. Please ensure:
- This is the latest beta version available
- Breaking changes are documented and addressed
- The beta version is stable enough for production use
docker/docker-compose.yml (2)
68-68
: LGTM! Good security practice with digest pinning
The Electric SQL version upgrade matches the package dependency, and using a SHA256 digest ensures image immutability and security.
Line range hint 77-89
: Verify impact of removing otel-collector service
The OpenTelemetry collector service has been commented out. Please verify that this won't impact observability requirements.
✅ Verification successful
Let me gather more information about the OpenTelemetry configuration and usage.
Let me check the actual OpenTelemetry configuration and related code to understand the setup better.
Let me check how the application is actually consuming the OpenTelemetry data to understand if removing the collector impacts the system.
Based on the gathered information, I can now provide a final response:
The OpenTelemetry collector service can be safely commented out
The collector is configured as a pass-through component that forwards telemetry data to http://host.docker.internal:3030/otel
. The application already has a direct OTLP exporter (otlpExporter.server.ts
) that can handle telemetry data without the collector. The commented-out configuration appears to be an optional intermediate collector that's not critical to the system's observability:
- The application uses direct OTLP export capabilities through
@trigger.dev/otlp-importer
- The collector was only acting as a proxy to forward data to the host machine
- No services are explicitly dependent on the collector's ports (4317/4318)
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Check for OpenTelemetry dependencies and usage
rg -l "opentelemetry" --type ts --type js
Length of output: 2542
Script:
#!/bin/bash
# Check for OTLP endpoint configurations and usage
rg "otlp" -A 3 --type ts --type js --type yaml --type yml
# Check for any other telemetry collector configurations
fd -e yaml -e yml -x grep -l "otel-collector"
Length of output: 159
Script:
#!/bin/bash
# Check the OpenTelemetry configuration file
cat docker/otel-collector-config.yaml
# Look for OTLP endpoint configurations in code
rg "otlp|opentelemetry" -A 3 apps/webapp/app/v3/otlpExporter.server.ts
# Check for environment variables or configuration related to telemetry
rg "OTEL_" -A 3 --type ts --type js --type env
Length of output: 1043
Script:
#!/bin/bash
# Check how the application handles telemetry data
rg -A 5 "endpoint.*otel" apps/webapp
# Check for direct OTLP endpoint usage in core packages
rg -A 5 "endpoint.*otel" packages/core
# Look for alternative telemetry configurations
rg -A 5 "telemetry" packages/core/src/v3/config.ts
Length of output: 413
internal-packages/testcontainers/src/utils.ts (1)
58-60
: LGTM! Consistent version upgrade
The Electric SQL container version and digest match the docker-compose.yml configuration, maintaining consistency across the codebase.
apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts (1)
39-111
: streamResponse
Method Implementation is Sound
The streamResponse
method correctly handles the retrieval of buffered streams and falls back to the fallbackResponder
when necessary. The use of LineTransformStream
for processing streamed data enhances readability and efficiency.
apps/webapp/app/services/realtime/utils.server.ts (1)
1-33
: LineTransformStream
Implementation is Correct and Efficient
The LineTransformStream
class effectively handles incoming string data, buffering incomplete lines and emitting arrays of complete lines. This ensures proper processing of streamed text data and improves the reliability of data handling in real-time applications.
apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts (2)
93-103
: Enhanced Stream Processing with LineTransformStream
The addition of LineTransformStream
improves the handling of incoming data by properly processing and formatting each line before sending it in the response. This enhances the reliability and readability of the streamed data.
145-146
:
Correct the Condition to Handle Empty String Values
The condition if (done || !value)
may inadvertently skip over empty string values (""
), which could be valid data. Modify the condition to ensure all values are correctly processed.
Apply this diff to fix the condition:
if (done || !value) {
break;
}
+if (done) {
+ break;
+}
Likely invalid or redundant comment.
packages/core/src/v3/apiClient/stream.ts (2)
28-28
: Updated Version Header to 1.0.0-beta.1
The x-trigger-electric-version
header has been updated to 1.0.0-beta.1
, ensuring compatibility with the expected API version.
206-238
: LineTransformStream
Implementation is Effective
The LineTransformStream
class effectively processes incoming string data into arrays of complete lines, improving data handling in streaming operations. Its inclusion enhances the robustness of the API client.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (1)
apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts (1)
137-141
: Unnecessaryasync
keyword inhandleLocalIngestion
methodThe method
handleLocalIngestion
is marked asasync
but does not contain anyawait
statements or return aPromise
. Consider removing theasync
keyword unless you plan to add asynchronous operations within this method.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts
(3 hunks)apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts
(1 hunks)
🔇 Additional comments (5)
apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts (2)
159-160
: Unused finalized
property in RelayedStreamRecord
The finalized
property is defined in RelayedStreamRecord
but is not used elsewhere in the code. Verify if this property is necessary or if additional logic should be implemented to utilize it.
86-99
: Stream transformation pipeline appears correct
The stream transformation pipeline effectively decodes, processes lines, and encodes the stream for the response. This implementation aligns with the expected functionality for handling real-time data streams.
apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts (3)
4-4
: Import statement updated appropriately
The import of relayRealtimeStreams
is correctly added to replace v1RealtimeStreams
for the updated streaming functionality.
20-20
: Switch to relayRealtimeStreams
in action
function
The action
function now uses relayRealtimeStreams.ingestData
, which aligns with the new streaming implementation.
Line range hint 55-59
: Update loader
function to use relayRealtimeStreams
The loader
function correctly utilizes relayRealtimeStreams.streamResponse
for handling stream responses.
cab69db
to
bb54d09
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Caution
Inline review comments failed to post. This is likely due to GitHub's limits when posting large numbers of comments.
Actionable comments posted: 2
🧹 Outside diff range and nitpick comments (1)
apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts (1)
178-178
: Remove unusedfinalized
property or implement its logicThe
finalized
property inRelayedStreamRecord
is initialized tofalse
(line 178) but is never updated or utilized elsewhere in the code. Additionally, the comment at lines 121-123 references relying onfinalized
, but there is no implementation that sets this property totrue
when the stream is completed.Consider implementing logic to set
finalized
totrue
when the stream has finished processing. If the property is not needed, remove it to avoid confusion.Also applies to: 121-123
🛑 Comments failed to post (2)
apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts (2)
81-95:
⚠️ Potential issuePotential race condition when locking the stream record
In the
streamResponse
method, the check and update ofrecord.locked
are not atomic operations. If multiple requests access the same record simultaneously, there's a possibility that both seerecord.locked
asfalse
and proceed to set it totrue
, leading to multiple readers accessing the stream.Consider introducing a synchronization mechanism to ensure that only one reader can lock and access the stream at a time. For example, using a mutex or a promise-based lock to prevent race conditions.
207-209:
⚠️ Potential issueEnsure streams are properly closed when deleting buffers
In the
deleteBuffer
method, when a buffer is deleted, the associated stream is not being canceled or closed. This can lead to resource leaks since the stream may remain open even after the buffer is removed from the_buffers
map.Apply this diff to fix the issue:
private deleteBuffer(bufferKey: string) { + const record = this._buffers.get(bufferKey); + if (record) { + // Cancel the stream to release resources + record.stream.cancel(); + } this._buffers.delete(bufferKey); }Committable suggestion skipped: line range outside the PR's diff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (4)
packages/core/src/v3/runMetadata/metadataStream.ts (1)
18-22
: Add error handling in the constructor.While the initialization is clean, consider adding error handling to catch potential stream creation failures.
constructor(private options: MetadataOptions<T>) { + try { const [serverStream, consumerStream] = this.createTeeStreams(); this.serverStream = serverStream; this.consumerStream = consumerStream; this.streamPromise = this.initializeServerStream(); + } catch (error) { + throw new Error(`Failed to initialize streams: ${error.message}`); + } }apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts (1)
145-155
: Consider implementing batch processing for better performance.While the current implementation is clean, consider batching multiple values before writing to Redis to improve performance under high load.
+ const batchSize = 100; + const batch: string[] = []; while (true) { const { done, value } = await reader.read(); if (done || !value) { break; } + batch.push(value); + if (batch.length >= batchSize) { + const pipeline = redis.pipeline(); + for (const item of batch) { + pipeline.xadd(streamKey, "MAXLEN", "~", "1000", "*", "data", item); + } + await pipeline.exec(); + batch.length = 0; + } - await redis.xadd(streamKey, "MAXLEN", "~", "1000", "*", "data", value); } + if (batch.length > 0) { + const pipeline = redis.pipeline(); + for (const item of batch) { + pipeline.xadd(streamKey, "MAXLEN", "~", "1000", "*", "data", item); + } + await pipeline.exec(); + }packages/core/src/v3/apiClient/runStream.ts (2)
217-234
: Well-structured stream processing pipelineThe stream transformation pipeline is well-organized with clear separation of concerns:
- Shape validation using zodShapeStream
- Value extraction
- Line-by-line processing
- JSON parsing
However, consider adding error handling for the pipeline to gracefully handle stream processing failures.
async subscribe(): Promise<ReadableStream<unknown>> { return zodShapeStream(SubscribeRealtimeStreamChunkRawShape, this.url, this.options) .pipeThrough( new TransformStream({ transform(chunk, controller) { controller.enqueue(chunk.value); + }, + catch(error) { + console.error("Error transforming chunk:", error); }, }) ) .pipeThrough(new LineTransformStream()) .pipeThrough( new TransformStream({ transform(chunk, controller) { for (const line of chunk) { controller.enqueue(safeParseJSON(line)); } + }, + catch(error) { + console.error("Error processing lines:", error); }, }) ); }
280-282
: Consider using type guards for metadata fieldsThe string type checking for metadata fields could be more type-safe using custom type guards.
+interface StreamMetadata extends Record<string, unknown> { + $$streamsVersion?: string; + $$streamsBaseUrl?: string; +} + +function isStreamMetadata(metadata: Record<string, unknown>): metadata is StreamMetadata { + return true; +} + const $baseUrl = - typeof metadata.$$streamsBaseUrl === "string" ? metadata.$$streamsBaseUrl : baseUrl; + isStreamMetadata(metadata) && metadata.$$streamsBaseUrl ? metadata.$$streamsBaseUrl : baseUrl;
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
pnpm-lock.yaml
is excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (21)
.changeset/rude-walls-help.md
(1 hunks)apps/webapp/app/presenters/v3/ApiRunListPresenter.server.ts
(1 hunks)apps/webapp/app/presenters/v3/SpanPresenter.server.ts
(1 hunks)apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts
(3 hunks)apps/webapp/app/services/realtime/databaseRealtimeStreams.server.ts
(2 hunks)apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts
(5 hunks)apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts
(1 hunks)apps/webapp/app/services/realtime/utils.server.ts
(1 hunks)apps/webapp/server.ts
(1 hunks)apps/webapp/test/authorizationRateLimitMiddleware.test.ts
(1 hunks)docker/docker-compose.yml
(1 hunks)internal-packages/testcontainers/src/utils.ts
(1 hunks)packages/core/package.json
(1 hunks)packages/core/src/v3/apiClient/runStream.ts
(3 hunks)packages/core/src/v3/apiClient/stream.ts
(2 hunks)packages/core/src/v3/runMetadata/manager.ts
(2 hunks)packages/core/src/v3/runMetadata/metadataStream.ts
(2 hunks)packages/core/src/v3/utils/ioSerialization.ts
(2 hunks)references/nextjs-realtime/package.json
(1 hunks)references/nextjs-realtime/src/components/TriggerButtonWithStreaming.tsx
(0 hunks)references/nextjs-realtime/src/trigger/ai.ts
(0 hunks)
💤 Files with no reviewable changes (2)
- references/nextjs-realtime/src/components/TriggerButtonWithStreaming.tsx
- references/nextjs-realtime/src/trigger/ai.ts
🚧 Files skipped from review as they are similar to previous changes (15)
- .changeset/rude-walls-help.md
- apps/webapp/server.ts
- apps/webapp/app/services/realtime/utils.server.ts
- apps/webapp/test/authorizationRateLimitMiddleware.test.ts
- packages/core/package.json
- apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts
- references/nextjs-realtime/package.json
- packages/core/src/v3/utils/ioSerialization.ts
- packages/core/src/v3/apiClient/stream.ts
- internal-packages/testcontainers/src/utils.ts
- apps/webapp/app/services/realtime/databaseRealtimeStreams.server.ts
- apps/webapp/app/presenters/v3/SpanPresenter.server.ts
- docker/docker-compose.yml
- apps/webapp/app/presenters/v3/ApiRunListPresenter.server.ts
- apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts
🧰 Additional context used
🪛 Biome (1.9.4)
packages/core/src/v3/runMetadata/metadataStream.ts
[error] 15-15: void is confusing inside a union type.
Unsafe fix: Use undefined instead.
(lint/suspicious/noConfusingVoidType)
🔇 Additional comments (7)
packages/core/src/v3/runMetadata/metadataStream.ts (3)
5-5
: LGTM: Improved type definitions for better stream handling.
The change from AsyncIterator
to AsyncIterable
and the use of ReadableStream
properties align better with modern streaming patterns and provide more flexibility in handling different stream types.
Also applies to: 13-14
72-82
: LGTM: Well-implemented stream to iterator conversion.
The implementation properly handles stream reading and cleanup, with appropriate error handling and reader lock release.
25-36
: 🛠️ Refactor suggestion
Enhance error handling in stream creation.
The stream creation logic should handle potential errors during iteration.
private createTeeStreams() {
const readableSource = new ReadableStream<T>({
start: async (controller) => {
+ try {
for await (const value of this.options.source) {
controller.enqueue(value);
}
controller.close();
+ } catch (error) {
+ controller.error(error);
+ }
},
});
return readableSource.tee();
}
Likely invalid or redundant comment.
apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts (1)
92-103
: LGTM: Well-structured stream transformation pipeline.
The stream processing pipeline is well-organized with clear separation of concerns:
- Line transformation
- SSE format conversion
- Text encoding
packages/core/src/v3/runMetadata/manager.ts (1)
Line range hint 236-254
: LGTM: Comprehensive stream metadata management.
The implementation properly:
- Handles stream initialization
- Manages metadata
- Includes proper cleanup
- Sets all required stream-related metadata
packages/core/src/v3/apiClient/runStream.ts (2)
19-24
: LGTM: Clean import organization
The new imports are well-structured and appropriately scoped for the streaming functionality.
217-234
: Consider handling JSON parse errors explicitly
The safeParseJSON
function silently returns the raw string on parse errors. Consider adding explicit error handling for malformed JSON.
bb54d09
to
7014057
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Outside diff range and nitpick comments (10)
packages/core/src/v3/utils/ioSerialization.ts (1)
407-420
: Consider performance optimizations for key filteringThe implementation is secure and correctly handles the filtering logic. However, for better performance with large datasets or frequent parsing operations, consider these optimizations:
- Use Set for more efficient key lookups
- Implement memoization when the same options are used frequently
function makeSafeReviver(options?: ReplacerOptions) { if (!options) { return undefined; } + // Convert array to Set for O(1) lookups + const filteredKeysSet = new Set(options.filteredKeys); + return function reviver(key: string, value: any) { // Check if the key should be filtered out - if (options?.filteredKeys?.includes(key)) { + if (filteredKeysSet.has(key)) { return undefined; } return value; }; }references/nextjs-realtime/src/app/realtime/[id]/page.tsx (1)
13-17
: Consider error boundary for RunRealtimeComparisonWhile the implementation is correct, consider wrapping the RunRealtimeComparison component with an error boundary to gracefully handle potential streaming failures.
return ( <main className="flex min-h-screen items-center justify-center p-4 bg-gray-900"> + <ErrorBoundary fallback={<div>Something went wrong with the stream</div>}> <RunRealtimeComparison accessToken={accessToken} runId={params.id} /> + </ErrorBoundary> </main> );references/nextjs-realtime/src/components/RunRealtimeComparison.tsx (2)
17-22
: Remove console.log statementsRemove development console.log statements before production deployment.
- onComplete: (...args) => { - console.log("Run completed!", args); - }, + onComplete: (...args) => { + // Handle completion if needed + }, }); - console.log("run", run);
27-33
: Clarify disabled debug button purposeThe debug button is permanently disabled. If it's not needed, consider removing it. If it's for future use, add a TODO comment explaining its purpose.
packages/react-hooks/src/hooks/useRealtime.ts (1)
267-273
: Consider extracting the onComplete logic into a custom hookThe onComplete handling logic is duplicated between
useRealtimeRun
anduseRealtimeRunWithStreams
. Consider extracting this into a reusable custom hook to follow DRY principles.+function useOnComplete<TTask extends AnyTask>( + isComplete: boolean, + run: RealtimeRun<TTask> | undefined, + error: Error | undefined, + onComplete?: (run: RealtimeRun<TTask>, err?: Error) => void +) { + const hasCalledOnCompleteRef = useRef(false); + + useEffect(() => { + if (isComplete && run && onComplete && !hasCalledOnCompleteRef.current) { + onComplete(run, error); + hasCalledOnCompleteRef.current = true; + } + }, [isComplete, run, error, onComplete]); +} export function useRealtimeRun<TTask extends AnyTask>( runId?: string, options?: UseRealtimeSingleRunOptions<TTask> ): UseRealtimeRunInstance<TTask> { // ... existing code ... - const hasCalledOnCompleteRef = useRef(false); - - useEffect(() => { - if (isComplete && run && options?.onComplete && !hasCalledOnCompleteRef.current) { - options.onComplete(run, error); - hasCalledOnCompleteRef.current = true; - } - }, [isComplete, run, error, options?.onComplete]); + useOnComplete(isComplete, run, error, options?.onComplete); // ... rest of the code ... } export function useRealtimeRunWithStreams< TTask extends AnyTask = AnyTask, TStreams extends Record<string, any> = Record<string, any> >( // ... existing code ... ) { // ... existing code ... - const hasCalledOnCompleteRef = useRef(false); - - useEffect(() => { - if (isComplete && run && options?.onComplete && !hasCalledOnCompleteRef.current) { - options.onComplete(run, error); - hasCalledOnCompleteRef.current = true; - } - }, [isComplete, run, error, options?.onComplete]); + useOnComplete(isComplete, run, error, options?.onComplete); // ... rest of the code ... }packages/core/src/v3/apiClient/runStream.ts (1)
284-285
: Handle undefined$$streamsBaseUrl
in metadataWhen accessing
metadata.$$streamsBaseUrl
, ensure that it is safely handled in case it's undefined or not a string to prevent potential runtime errors.packages/core/src/v3/runMetadata/metadataStream.ts (2)
68-69
: Avoid using console logs in production codeThe use of
console.log
in thestop()
method may not be appropriate for production environments. Consider using a proper logging mechanism or removing the log statement altogether.Apply this diff to remove the console log:
stop: () => { - console.log("Stopping zodShapeStream with abortController.abort()"); abortController.abort(); },
72-81
: Consider using existing utilities for stream-to-iterator conversionThe
streamToAsyncIterator
function converts aReadableStream
into anAsyncIterableIterator
. Check if similar functionality already exists in standard libraries or utility packages to avoid duplicating code.packages/core/src/v3/apiClient/stream.ts (2)
19-23
: Add documentation for the newZodShapeStreamInstance
typeThe introduction of
ZodShapeStreamInstance
is significant. Adding documentation comments would help other developers understand its purpose and usage.
237-269
: Handle Windows-style line endings inLineTransformStream
The
LineTransformStream
class splits chunks by\n
, which may not correctly handle Windows-style line endings (\r\n
). Consider updating the splitting logic to handle both\n
and\r\n
to ensure compatibility across different platforms.Apply this diff to improve line splitting:
- const lines = this.buffer.split("\n"); + const lines = this.buffer.split(/\r?\n/);
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
pnpm-lock.yaml
is excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (25)
.changeset/rude-walls-help.md
(1 hunks)apps/webapp/app/presenters/v3/ApiRunListPresenter.server.ts
(1 hunks)apps/webapp/app/presenters/v3/SpanPresenter.server.ts
(1 hunks)apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts
(3 hunks)apps/webapp/app/services/realtime/databaseRealtimeStreams.server.ts
(2 hunks)apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts
(5 hunks)apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts
(1 hunks)apps/webapp/app/services/realtime/utils.server.ts
(1 hunks)apps/webapp/server.ts
(1 hunks)apps/webapp/test/authorizationRateLimitMiddleware.test.ts
(1 hunks)docker/docker-compose.yml
(1 hunks)internal-packages/testcontainers/src/utils.ts
(1 hunks)packages/core/package.json
(1 hunks)packages/core/src/v3/apiClient/runStream.ts
(7 hunks)packages/core/src/v3/apiClient/stream.ts
(5 hunks)packages/core/src/v3/runMetadata/manager.ts
(2 hunks)packages/core/src/v3/runMetadata/metadataStream.ts
(2 hunks)packages/core/src/v3/utils/ioSerialization.ts
(2 hunks)packages/react-hooks/src/hooks/useRealtime.ts
(3 hunks)references/nextjs-realtime/package.json
(2 hunks)references/nextjs-realtime/src/app/realtime/[id]/page.tsx
(1 hunks)references/nextjs-realtime/src/components/RunRealtimeComparison.tsx
(1 hunks)references/nextjs-realtime/src/components/TriggerButtonWithStreaming.tsx
(0 hunks)references/nextjs-realtime/src/components/ui/tabs.tsx
(1 hunks)references/nextjs-realtime/src/trigger/ai.ts
(0 hunks)
💤 Files with no reviewable changes (2)
- references/nextjs-realtime/src/components/TriggerButtonWithStreaming.tsx
- references/nextjs-realtime/src/trigger/ai.ts
🚧 Files skipped from review as they are similar to previous changes (14)
- .changeset/rude-walls-help.md
- apps/webapp/server.ts
- internal-packages/testcontainers/src/utils.ts
- apps/webapp/app/services/realtime/utils.server.ts
- apps/webapp/test/authorizationRateLimitMiddleware.test.ts
- apps/webapp/app/services/realtime/databaseRealtimeStreams.server.ts
- apps/webapp/app/routes/realtime.v1.streams.$runId.$streamId.ts
- apps/webapp/app/presenters/v3/SpanPresenter.server.ts
- docker/docker-compose.yml
- apps/webapp/app/presenters/v3/ApiRunListPresenter.server.ts
- apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts
- packages/core/package.json
- references/nextjs-realtime/package.json
- apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts
🧰 Additional context used
🪛 Biome (1.9.4)
packages/core/src/v3/runMetadata/metadataStream.ts
[error] 15-15: void is confusing inside a union type.
Unsafe fix: Use undefined instead.
(lint/suspicious/noConfusingVoidType)
🔇 Additional comments (14)
packages/core/src/v3/utils/ioSerialization.ts (4)
17-19
: LGTM: Clean type definition
The ParsePacketOptions
type is well-defined and maintains backward compatibility with its optional property.
21-21
: LGTM: Backward-compatible signature update
The function signature change properly introduces the optional options parameter while maintaining backward compatibility.
28-28
: LGTM: Safe JSON parsing with optional filtering
The modification correctly integrates the filtering capability while maintaining the default parsing behavior when no options are provided.
17-28
: Verify impact on existing parsePacket usage
The new filtering capability is well-implemented, but let's verify its impact on existing code.
✅ Verification successful
Based on the search results, I can now provide a final response regarding the impact of the new filtering capability in parsePacket:
No breaking changes detected in parsePacket usage patterns
The codebase scan shows that all existing uses of parsePacket are simple invocations without any options parameter, making the new filtering capability a fully backward-compatible addition. The key observations:
- All 16 current usages of parsePacket() are basic calls without options
- No existing code relies on specific data being present in the parsed output
- The filtering is opt-in through the new options parameter
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for parsePacket usage patterns to ensure compatibility
echo "Searching for parsePacket usage patterns..."
rg --type typescript "parsePacket\(" -A 2 -B 2
# Look for potential sensitive data handling patterns
echo "Checking for potential sensitive data patterns..."
rg --type typescript -e "password" -e "secret" -e "token" -e "key" --glob "**/*.ts" -A 2 -B 2
Length of output: 412
Script:
#!/bin/bash
# Search for parsePacket usage patterns without typescript type
echo "Searching for parsePacket usage patterns..."
rg "parsePacket\(" -A 2 -B 2
# Look for potential sensitive data handling patterns
echo "Checking for potential sensitive data patterns..."
rg -e "password" -e "secret" -e "token" -e "key" --glob "*.ts" -A 2 -B 2
Length of output: 98300
references/nextjs-realtime/src/app/realtime/[id]/page.tsx (1)
5-11
: LGTM: Properly scoped authentication token
The token generation is well-implemented with appropriate scope restrictions, limiting access to only the specific run.
references/nextjs-realtime/src/components/ui/tabs.tsx (2)
10-13
: LGTM: Well-typed component implementations
The components are properly typed with React.ElementRef and ComponentPropsWithoutRef, ensuring type safety and proper prop handling.
Also applies to: 25-28, 40-43
31-33
: LGTM: Accessible and well-styled components
The implementation maintains accessibility through Radix primitives and proper focus management. The styling is well-organized using the cn utility.
Also applies to: 46-48
packages/react-hooks/src/hooks/useRealtime.ts (2)
112-118
: LGTM: Good fix to prevent multiple onComplete calls
The addition of hasCalledOnCompleteRef
ensures that the onComplete
callback is called exactly once when the subscription completes. This prevents potential issues that could arise from multiple invocations of the callback.
602-602
: LGTM: Verify no race conditions with non-awaited stream updates
Removing await
from mutateStreamData
can improve performance by not blocking the event loop. The throttle queue and ref-based state management should handle concurrent updates correctly.
✅ Verification successful
Let me gather more context about the mutateStreamData
usage and implementation to better understand the potential impact.
Let me check the broader context of the streaming implementation to understand the data flow and state management.
Removing await
from mutateStreamData
is safe in this context
The implementation is well-protected against race conditions because:
- Updates are processed through a throttled queue that batches stream updates
- State is managed via a ref (
existingDataRef
) that always contains the latest state - Updates are merged using spread operator, preserving all data
- The throttling mechanism (
createThrottledQueue
) ensures ordered processing of batches
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check for any race condition handling or ordering requirements in the codebase
# Look for comments or documentation about stream ordering requirements
rg -i "stream.*order|order.*stream" --type ts
# Look for any explicit stream synchronization or sequential processing
ast-grep --pattern 'await $_.stream'
# Look for any race condition related comments
rg -i "race.*condition|concurrent.*update" --type ts
Length of output: 3358
Script:
#!/bin/bash
# Check the implementation and usage of mutateStreamData
ast-grep --pattern 'function mutateStreamData'
# Look for other usages of mutateStreamData
rg 'mutateStreamData' -A 3 --type ts
# Check the implementation of processRealtimeRunWithStreams
ast-grep --pattern 'function processRealtimeRunWithStreams'
Length of output: 865
Script:
#!/bin/bash
# Check the implementation of withStreams and stream data handling
ast-grep --pattern 'withStreams<$_>($$$)'
# Look for stream state management patterns
rg 'StreamResults|nextStreamData' -B 2 -A 2 --type ts
# Check for any throttling or batching logic
rg 'throttle.*stream|batch.*stream' -B 2 -A 2 --type ts
Length of output: 4112
packages/core/src/v3/apiClient/runStream.ts (2)
119-123
: Ensure proper cleanup of resources when aborting streams
When creating runStreamInstance
through zodShapeStream
, you should ensure that resources are properly cleaned up when the stream is aborted. Verify that aborting the abortController
in the runShapeStream
method correctly stops the underlying stream without any resource leaks.
223-236
:
Handle JSON parse errors explicitly in stream transformation
In the ElectricStreamSubscription
's subscribe
method, the safeParseJSON
function may return the raw string if parsing fails. This could silently propagate malformed data downstream. Consider adding explicit error handling to manage JSON parse errors.
This comment is similar to a previous one.
packages/core/src/v3/runMetadata/metadataStream.ts (2)
40-46
: 🛠️ Refactor suggestion
Ensure proper error handling in initializeServerStream
The fetch
request may fail or throw an exception, which isn't currently handled. This could lead to unhandled promise rejections. Consider wrapping the fetch
call in a try-catch
block or handling errors appropriately.
This comment is similar to a previous one.
27-33
:
Add error handling when reading from the source iterable
If an error occurs while iterating over this.options.source
, it may not be properly propagated, leading to unhandled exceptions. Consider wrapping the for await
loop in a try-catch
block to handle any errors during iteration.
Apply this diff to handle errors:
start: async (controller) => {
+ try {
for await (const value of this.options.source) {
controller.enqueue(value);
}
+ } catch (error) {
+ controller.error(error);
+ } finally {
+ controller.close();
+ }
},
Likely invalid or redundant comment.
packages/core/src/v3/runMetadata/manager.ts (1)
249-252
:
Verify potential security implications of including $$streamsBaseUrl
in metadata
Storing the $$streamsBaseUrl
in metadata may expose internal URLs or sensitive information if the metadata is accessible externally. Ensure that including $$streamsBaseUrl
does not pose any security risks.
const { streams, stop, run } = useRealtimeRunWithStreams<typeof openaiStreaming, STREAMS>(runId, { | ||
accessToken: accessToken, | ||
baseURL: process.env.NEXT_PUBLIC_TRIGGER_API_URL, | ||
onComplete: (...args) => { | ||
console.log("Run completed!", args); | ||
}, | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add cleanup for stream subscription
Ensure the stream subscription is properly cleaned up when the component unmounts.
+import { useEffect } from 'react';
+
export default function RealtimeComparison({
accessToken,
runId,
}) {
const { streams, stop, run } = useRealtimeRunWithStreams<typeof openaiStreaming, STREAMS>(runId, {
accessToken: accessToken,
baseURL: process.env.NEXT_PUBLIC_TRIGGER_API_URL,
onComplete: (...args) => {
- console.log("Run completed!", args);
},
});
+
+ useEffect(() => {
+ return () => {
+ // Cleanup subscription on unmount
+ stop();
+ };
+ }, [stop]);
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
const { streams, stop, run } = useRealtimeRunWithStreams<typeof openaiStreaming, STREAMS>(runId, { | |
accessToken: accessToken, | |
baseURL: process.env.NEXT_PUBLIC_TRIGGER_API_URL, | |
onComplete: (...args) => { | |
console.log("Run completed!", args); | |
}, | |
}); | |
import { useEffect } from 'react'; | |
const { streams, stop, run } = useRealtimeRunWithStreams<typeof openaiStreaming, STREAMS>(runId, { | |
accessToken: accessToken, | |
baseURL: process.env.NEXT_PUBLIC_TRIGGER_API_URL, | |
onComplete: (...args) => { | |
}, | |
}); | |
useEffect(() => { | |
return () => { | |
// Cleanup subscription on unmount | |
stop(); | |
}; | |
}, [stop]); |
<div className="flex-grow flex overflow-hidden"> | ||
<div className="w-1/2 border-r border-gray-700 overflow-auto"> | ||
<table className="w-full table-fixed"> | ||
<thead> | ||
<tr className="bg-gray-800"> | ||
<th className="w-16 p-2 text-left">ID</th> | ||
<th className="p-2 text-left">Data</th> | ||
</tr> | ||
</thead> | ||
<tbody> | ||
{(streams.openai ?? []).map((part, i) => ( | ||
<tr key={i} className="border-b border-gray-700"> | ||
<td className="w-16 p-2 truncate">{i + 1}</td> | ||
<td className="p-2"> | ||
<div className="font-mono whitespace-nowrap overflow-x-auto"> | ||
{JSON.stringify(part)} | ||
</div> | ||
</td> | ||
</tr> | ||
))} | ||
</tbody> | ||
</table> | ||
</div> | ||
<div className="w-1/2 overflow-auto"> | ||
<table className="w-full table-fixed"> | ||
<thead> | ||
<tr className="bg-gray-800"> | ||
<th className="w-16 p-2 text-left">ID</th> | ||
<th className="p-2 text-left">Data</th> | ||
</tr> | ||
</thead> | ||
<tbody> | ||
{(streams.openaiText ?? []).map((text, i) => ( | ||
<tr key={i} className="border-b border-gray-700"> | ||
<td className="w-16 p-2 truncate">{i + 1}</td> | ||
<td className="p-2"> | ||
<div className="font-mono whitespace-nowrap overflow-x-auto">{text}</div> | ||
</td> | ||
</tr> | ||
))} | ||
</tbody> | ||
</table> | ||
</div> | ||
</div> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add loading and error states, implement virtualization
Several improvements needed for production readiness:
- Add loading state while streams are initializing
- Add error handling for stream failures
- Implement virtualization for large stream arrays to prevent memory issues
Consider using a virtualized table component like react-window
for better performance with large streams:
+import { FixedSizeList } from 'react-window';
+
<div className="flex-grow flex overflow-hidden">
<div className="w-1/2 border-r border-gray-700 overflow-auto">
+ {streams.openai === undefined ? (
+ <div className="p-4">Loading...</div>
+ ) : streams.openai.length === 0 ? (
+ <div className="p-4">No data yet</div>
+ ) : (
<table className="w-full table-fixed">
{/* ... table header ... */}
- <tbody>
- {(streams.openai ?? []).map((part, i) => (
+ <FixedSizeList
+ height={400}
+ width="100%"
+ itemCount={streams.openai.length}
+ itemSize={35}
+ >
+ {({ index, style }) => (
<tr key={i} className="border-b border-gray-700">
{/* ... row content ... */}
</tr>
- ))}
- </tbody>
+ )}
+ </FixedSizeList>
</table>
+ )}
</div>
{/* Similar changes for the second table */}
</div>
Committable suggestion skipped: line range outside the PR's diff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (1)
packages/core/src/v3/apiClient/runStream.ts (1)
283-291
: Consider adding URL validationWhile the baseUrl handling is good, consider validating the URL format to prevent potential issues.
+ const isValidUrl = (url: string) => { + try { + new URL(url); + return true; + } catch { + return false; + } + }; + const $baseUrl = - typeof metadata.$$streamsBaseUrl === "string" ? metadata.$$streamsBaseUrl : baseUrl; + typeof metadata.$$streamsBaseUrl === "string" && isValidUrl(metadata.$$streamsBaseUrl) + ? metadata.$$streamsBaseUrl + : baseUrl;
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
packages/core/src/v3/apiClient/runStream.ts
(7 hunks)packages/core/src/v3/apiClient/stream.ts
(5 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- packages/core/src/v3/apiClient/stream.ts
🔇 Additional comments (4)
packages/core/src/v3/apiClient/runStream.ts (4)
119-126
: LGTM: Improved stream lifecycle management
The addition of stopRunShapeStream
ensures proper cleanup of stream resources.
304-304
: LGTM: Proper stream cleanup implementation
The implementation ensures proper cleanup of stream resources both on completion and unsubscription.
Also applies to: 334-334, 346-346
220-237
:
Consider handling JSON parse errors explicitly
The JSON parsing implementation could be improved with explicit error handling.
Apply this diff to improve error handling:
- controller.enqueue(safeParseJSON(line));
+ try {
+ const parsed = safeParseJSON(line);
+ if (typeof parsed === 'string' && line !== parsed) {
+ console.warn(`Failed to parse JSON for line: ${line}`);
+ }
+ controller.enqueue(parsed);
+ } catch (error) {
+ console.error(`Error processing line: ${line}`, error);
+ }
19-24
: Verify stream utility imports
The new imports enhance stream processing capabilities. Let's verify their implementation.
✅ Verification successful
Stream utility imports are correctly implemented
All imported utilities are properly defined in packages/core/src/v3/apiClient/stream.ts
:
AsyncIterableStream
is defined as a type combining AsyncIterable and ReadableStreamcreateAsyncIterableReadable
is implemented as a function that creates an AsyncIterableStreamLineTransformStream
is implemented as a class extending TransformStreamzodShapeStream
is implemented as a function for schema-based stream processing
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify the implementation of stream utilities
# Check if the stream.js file exists and contains the expected exports
rg -A 5 "export.*(?:AsyncIterableStream|createAsyncIterableReadable|LineTransformStream|zodShapeStream)"
Length of output: 2561
This PR includes a bunch of fixes and improvements to Realtime & Realtime streams:
onComplete
callback to useRealtimeRun and useRealtimeRunWithStreams