-
Notifications
You must be signed in to change notification settings - Fork 161
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Release reader immediately when shutting down a pipe #1208
base: main
Are you sure you want to change the base?
Changes from 7 commits
eb22e7e
15a9768
7008ee5
7b446d5
36a9ad9
256e70f
0eb6177
8cfaed4
01a4f49
3ee5c8f
a0364b9
0c23857
e333293
285bfb7
817293c
124582a
f6d8b73
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,8 +9,9 @@ const { CanTransferArrayBuffer, CopyDataBlockBytes, CreateArrayFromList, IsDetac | |
const { CloneAsUint8Array, IsNonNegativeNumber } = require('./miscellaneous.js'); | ||
const { EnqueueValueWithSize, ResetQueue } = require('./queue-with-sizes.js'); | ||
const { AcquireWritableStreamDefaultWriter, IsWritableStreamLocked, WritableStreamAbort, | ||
WritableStreamDefaultWriterCloseWithErrorPropagation, WritableStreamDefaultWriterRelease, | ||
WritableStreamDefaultWriterWrite, WritableStreamCloseQueuedOrInFlight } = require('./writable-streams.js'); | ||
WritableStreamDefaultWriterCloseWithErrorPropagation, WritableStreamDefaultWriterIsOrBecomesErrored, | ||
WritableStreamDefaultWriterRelease, WritableStreamDefaultWriterWrite, WritableStreamCloseQueuedOrInFlight } = | ||
require('./writable-streams.js'); | ||
const { CancelSteps, PullSteps, ReleaseSteps } = require('./internal-methods.js'); | ||
|
||
const ReadableByteStreamController = require('../../generated/ReadableByteStreamController.js'); | ||
|
@@ -134,7 +135,7 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC | |
assert(IsReadableStreamLocked(source) === false); | ||
assert(IsWritableStreamLocked(dest) === false); | ||
|
||
const reader = AcquireReadableStreamDefaultReader(source); | ||
let reader = AcquireReadableStreamDefaultReader(source); | ||
const writer = AcquireWritableStreamDefaultWriter(dest); | ||
|
||
source._disturbed = true; | ||
|
@@ -200,6 +201,12 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC | |
} | ||
|
||
return transformPromiseWith(writer._readyPromise, () => { | ||
if (shuttingDown === true) { | ||
return promiseResolvedWith(true); | ||
} | ||
if (dest._state !== 'writable' || WritableStreamCloseQueuedOrInFlight(dest) === true) { | ||
return promiseResolvedWith(true); | ||
} | ||
Comment on lines
+207
to
+209
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This implements @domenic's suggestion from #1207 (comment). I don't know if we need to update the spec text for this. It already specifies that these checks must happen before performing any reads and writes:
We should still add a test for this particular case (although that might not be easy looking at the discussion in #1207). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I personally don't think we need to update the spec text. |
||
return new Promise((resolveRead, rejectRead) => { | ||
ReadableStreamDefaultReaderRead( | ||
reader, | ||
|
@@ -228,7 +235,8 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC | |
}); | ||
|
||
// Errors must be propagated backward | ||
isOrBecomesErrored(dest, writer._closedPromise, storedError => { | ||
WritableStreamDefaultWriterIsOrBecomesErrored(writer, () => { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This new helper allows attaching a synchronous callback for when dest becomes |
||
const storedError = dest._storedError; | ||
if (preventCancel === false) { | ||
shutdownWithAction(() => ReadableStreamCancel(source, storedError), true, storedError); | ||
} else { | ||
|
@@ -289,6 +297,8 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC | |
return; | ||
} | ||
shuttingDown = true; | ||
ReadableStreamDefaultReaderRelease(reader); | ||
reader = AcquireReadableStreamDefaultReader(source); | ||
|
||
if (dest._state === 'writable' && WritableStreamCloseQueuedOrInFlight(dest) === false) { | ||
uponFulfillment(waitForWritesToFinish(), doTheRest); | ||
|
@@ -310,6 +320,8 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC | |
return; | ||
} | ||
shuttingDown = true; | ||
ReadableStreamDefaultReaderRelease(reader); | ||
reader = AcquireReadableStreamDefaultReader(source); | ||
|
||
if (dest._state === 'writable' && WritableStreamCloseQueuedOrInFlight(dest) === false) { | ||
uponFulfillment(waitForWritesToFinish(), () => finalize(isError, error)); | ||
|
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -31,6 +31,7 @@ Object.assign(exports, { | |||
WritableStreamDefaultWriterClose, | ||||
WritableStreamDefaultWriterCloseWithErrorPropagation, | ||||
WritableStreamDefaultWriterGetDesiredSize, | ||||
WritableStreamDefaultWriterIsOrBecomesErrored, | ||||
WritableStreamDefaultWriterRelease, | ||||
WritableStreamDefaultWriterWrite | ||||
}); | ||||
|
@@ -143,6 +144,8 @@ function SetUpWritableStreamDefaultWriter(writer, stream) { | |||
writer._stream = stream; | ||||
stream._writer = writer; | ||||
|
||||
writer._errorListeners = []; | ||||
|
||||
const state = stream._state; | ||||
|
||||
if (state === 'writable') { | ||||
|
@@ -378,6 +381,11 @@ function WritableStreamStartErroring(stream, reason) { | |||
const writer = stream._writer; | ||||
if (writer !== undefined) { | ||||
WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer, reason); | ||||
const errorListeners = writer._errorListeners; | ||||
writer._errorListeners = []; | ||||
for (const errorListener of errorListeners) { | ||||
errorListener(); | ||||
} | ||||
} | ||||
|
||||
if (WritableStreamHasOperationMarkedInFlight(stream) === false && controller._started === true) { | ||||
|
@@ -475,6 +483,20 @@ function WritableStreamDefaultWriterGetDesiredSize(writer) { | |||
return WritableStreamDefaultControllerGetDesiredSize(stream._controller); | ||||
} | ||||
|
||||
function WritableStreamDefaultWriterIsOrBecomesErrored(writer, errorListener) { | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe since this is not part of the standard, it should start with a lower-case letter? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure. 👍 What do you suggest we do put in the spec text?
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this very specific to the one change from "writable" to "erroring"? In the reference implementation you've added a much more general listener setup. I would suggest a general note saying "for all the 'becomes' conditions in the above, they must be processed synchronously as part of the [[state]] update, before any other web developer code can run." And then, if we anticipate that only being impactful in the one transition, we could append the extra note: "NOTE: Currently this requirement only has observable consequences for [the transition for writable stream states from from "writable" to "erroring"], and others could be done as asynchronous listeners". Or, if we think we might expand this listener usage in the future, then we should probably omit that note. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
That's the most noticeable case, since it determines whether or not we may drop a chunk (i.e. we accidentally read a chunk that we can no longer write). I'm not sure whether it matters for the state transitions of the readable end. There might be an edge case where two shutdown conditions become true at the same time, and then it matters which condition is handled first. For example: readableController.error(error1); // pipeTo() should immediately call writer.abort(error1)
writableController.error(error2); // should be ignored, since writable is already erroring
// => pipeTo() rejects with error1 versus: writableController.error(error2); // pipeTo() should immediately call reader.cancel(error2)
readableController.error(error1); // should be ignored, since readable is already closed
// => pipeTo() rejects with error2 If we were to use a synchronous reaction for the I'll try to whip up some more WPTs to double check. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here are two possible tests for this: promise_test(async t => {
const rs = recordingReadableStream();
const ws = recordingWritableStream();
const pipeToPromise = rs.pipeTo(ws);
await flushAsyncEvents();
rs.controller.error(error1);
ws.controller.error(error2);
await promise_rejects_exactly(t, error1, pipeToPromise, 'pipeTo must reject with readable\'s error');
assert_array_equals(rs.eventsWithoutPulls, []);
assert_array_equals(ws.events, []);
await promise_rejects_exactly(t, error1, rs.getReader().closed);
await promise_rejects_exactly(t, error2, ws.getWriter().closed);
}, 'Piping: error the readable stream right before erroring the writable stream');
promise_test(async t => {
const rs = recordingReadableStream();
const ws = recordingWritableStream();
const pipeToPromise = rs.pipeTo(ws);
await flushAsyncEvents();
ws.controller.error(error1);
rs.controller.error(error2);
await promise_rejects_exactly(t, error1, pipeToPromise, 'pipeTo must reject with writable\'s error');
assert_array_equals(rs.eventsWithoutPulls, ['cancel', error1]);
assert_array_equals(ws.events, []);
await promise_rejects_exactly(t, error1, ws.getWriter().closed);
await rs.getReader().closed;
}, 'Piping: error the writable stream right before erroring the readable stream'); The behavior might be a bit surprising though. In the first test, ws is still writable when we call
This adds at least one microtask of delay (even if there are no pending writes), so we will not yet call However, in the second test, because ws immediately becomes errored, we don't wait for pending writes to complete and instead we synchronously call The specification is a bit vague about this. It says:
It doesn't say how long this step can take. We may want to require that if there are no pending writes (i.e. we've never started any writes, or all writes have already settled), then this step must complete synchronously. Then, in the first test, we would call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suggest limiting the sync part to as small as possible to fix the issue. This still leaves the problem of how to spec it. We've tried to give latitude for implementations to optimise in their own way, but we're increasingly constraining their behaviour. Transparent thread offloading etc. may become impossible. I'm worried about it but I don't have an answer. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The sync part is already minimal. We have to go from "if source becomes errored" all the way to "perform WritableStreamAbort" in order to avoid Anyway, I found another way to fix it. We keep track of how many However, this test still fails. We do call Adding
I agree, the reference implementation is becoming increasingly complicated in order to deal with these edge cases. 😞 I'm wondering if it's even worth trying to spec these edge cases, or instead allow some wiggle room in how quickly There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suspect most of the testable constraints we're imposing are in cases where the web developer controls one or both ends of the pipe, right? I'm not sure those are the ones we were planning to feasibly optimize, so starting to constrain them still seems like the right thing to do to me. But I might be missing something so please let me know. On the larger problem, the root of the issue seems to be how imprecise "[[state]] is or becomes" is. Does that mean: (1) synchronously after the algorithm step which sets [[state]], probably interrupting other streams-implementation code; (2) synchronously after any streams-implementation code runs; (3) synchronously after any browser code runs; (4) asynchronously is OK to some degree? My preference would be to try to resolve things like so:
As an example of how to apply this process,
My preference would be that, if we decide to constrain all observable behavior, we have both variants of the test, with the version without flushAsyncEvents() having the assert for the other order. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Correct. Streams created by the user agent will use the exported algorithms, and I think it's safe to assume that those will be called in a separate task, outside of web author code.
(2) may be ill-specified, since there are cases where streams code calls into author code, which can then call back into streams code. We've even had cases in the past where streams code calls back into itself, e.g. #1172. I still prefer (1), and that's what I've been implementing. Yes, we need to be very careful when speccing, but at least any problems that arise can be fixed within the streams implementation.
That seems reasonable. 👍 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Coming back to this:
It seems the main difference is that, when you call IIRC the reason for this difference is so you can do e.g. an async loop in new ReadableStream({
async start(c) {
for (let i = 0; i < 10; i++) {
await new Promise(r => setTimeout(r, 1000));
c.enqueue("chunk");
}
c.close();
}
}) whereas for I guess, if we really wanted to, we could have the test check when |
||||
const stream = writer._stream; | ||||
if (stream === undefined) { | ||||
return; | ||||
} | ||||
|
||||
const state = stream._state; | ||||
if (state === 'writable') { | ||||
writer._errorListeners.push(errorListener); | ||||
} else if (state === 'erroring' || state === 'errored') { | ||||
errorListener(); | ||||
} | ||||
} | ||||
|
||||
function WritableStreamDefaultWriterRelease(writer) { | ||||
const stream = writer._stream; | ||||
assert(stream !== undefined); | ||||
|
@@ -491,6 +513,8 @@ function WritableStreamDefaultWriterRelease(writer) { | |||
|
||||
stream._writer = undefined; | ||||
writer._stream = undefined; | ||||
|
||||
stream._errorListeners = []; | ||||
} | ||||
|
||||
function WritableStreamDefaultWriterWrite(writer, chunk) { | ||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's useful to keep the pipe going when dest has already become
"erroring"
? Any new writes will just error immediately, as per step 9 of WritableStreamDefaultWriterWrite.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed.