Skip to content

Commit

Permalink
Improved the relay realtime cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
ericallam committed Dec 12, 2024
1 parent 4dd27b4 commit cab69db
Showing 1 changed file with 32 additions and 2 deletions.
34 changes: 32 additions & 2 deletions apps/webapp/app/services/realtime/relayRealtimeStreams.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { singleton } from "~/utils/singleton";

export type RelayRealtimeStreamsOptions = {
ttl: number;
cleanupInterval: number;
fallbackIngestor: StreamIngestor;
fallbackResponder: StreamResponder;
waitForBufferTimeout?: number; // Time to wait for buffer in ms (default: 500ms)
Expand All @@ -17,6 +18,7 @@ interface RelayedStreamRecord {
stream: ReadableStream<Uint8Array>;
createdAt: number;
lastAccessed: number;
locked: boolean;
finalized: boolean;
}

Expand All @@ -33,7 +35,7 @@ export class RelayRealtimeStreams implements StreamIngestor, StreamResponder {
// Periodic cleanup
this.cleanupInterval = setInterval(() => {
this.cleanup();
}, this.options.ttl).unref();
}, this.options.cleanupInterval).unref();
}

async streamResponse(
Expand Down Expand Up @@ -76,6 +78,23 @@ export class RelayRealtimeStreams implements StreamIngestor, StreamResponder {
}
}

// Only 1 reader of the stream can use the relayed stream, the rest should use the fallback
if (record.locked) {
logger.debug("[RelayRealtimeStreams][streamResponse] Stream already locked, using fallback", {
streamId,
runId,
});

return this.options.fallbackResponder.streamResponse(
request,
runId,
streamId,
environment,
signal
);
}

record.locked = true;
record.lastAccessed = Date.now();

logger.debug("[RelayRealtimeStreams][streamResponse] Streaming from ephemeral record", {
Expand Down Expand Up @@ -106,7 +125,7 @@ export class RelayRealtimeStreams implements StreamIngestor, StreamResponder {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
"x-relay-realtime-streams": "true",
"x-trigger-relay-realtime-streams": "true",
},
});
}
Expand Down Expand Up @@ -157,6 +176,7 @@ export class RelayRealtimeStreams implements StreamIngestor, StreamResponder {
createdAt: Date.now(),
lastAccessed: Date.now(),
finalized: false,
locked: false,
};
this._buffers.set(bufferKey, record);
} else {
Expand All @@ -167,12 +187,21 @@ export class RelayRealtimeStreams implements StreamIngestor, StreamResponder {

private cleanup() {
const now = Date.now();

logger.debug("[RelayRealtimeStreams][cleanup] Cleaning up old buffers", {
bufferCount: this._buffers.size,
});

for (const [key, record] of this._buffers.entries()) {
// If last accessed is older than ttl, clean up
if (now - record.lastAccessed > this.options.ttl) {
this.deleteBuffer(key);
}
}

logger.debug("[RelayRealtimeStreams][cleanup] Cleaned up old buffers", {
bufferCount: this._buffers.size,
});
}

private deleteBuffer(bufferKey: string) {
Expand Down Expand Up @@ -216,6 +245,7 @@ export class RelayRealtimeStreams implements StreamIngestor, StreamResponder {
function initializeRelayRealtimeStreams() {
return new RelayRealtimeStreams({
ttl: 1000 * 60 * 5, // 5 minutes
cleanupInterval: 1000 * 60, // 1 minute
fallbackIngestor: v1RealtimeStreams,
fallbackResponder: v1RealtimeStreams,
});
Expand Down

0 comments on commit cab69db

Please sign in to comment.