From eb22e7e1062a71b07b8643608e5a3343d7bd0d3c Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Sat, 15 Jan 2022 23:45:39 +0100 Subject: [PATCH 01/17] Release reader immediately when shutting down a pipe --- index.bs | 10 +++++++--- .../lib/abstract-ops/readable-streams.js | 7 ++++++- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/index.bs b/index.bs index 568dfaf0e..931577114 100644 --- a/index.bs +++ b/index.bs @@ -2198,6 +2198,9 @@ The following abstract operations operate on {{ReadableStream}} instances at a h |originalError|, then: 1. If |shuttingDown| is true, abort these substeps. 1. Set |shuttingDown| to true. + 1. If |reader| [=implements=] {{ReadableStreamBYOBReader}}, perform + ! [$ReadableStreamBYOBReaderRelease$](|reader|). + 1. Otherwise, perform ! [$ReadableStreamDefaultReaderRelease$](|reader|). 1. If |dest|.[=WritableStream/[[state]]=] is "`writable`" and ! [$WritableStreamCloseQueuedOrInFlight$](|dest|) is false, 1. If any [=chunks=] have been read but not yet written, write them to |dest|. @@ -2210,6 +2213,9 @@ The following abstract operations operate on {{ReadableStream}} instances at a h ask to shutdown, optionally with an error |error|, then: 1. If |shuttingDown| is true, abort these substeps. 1. Set |shuttingDown| to true. + 1. If |reader| [=implements=] {{ReadableStreamBYOBReader}}, perform + ! [$ReadableStreamBYOBReaderRelease$](|reader|). + 1. Otherwise, perform ! [$ReadableStreamDefaultReaderRelease$](|reader|). 1. If |dest|.[=WritableStream/[[state]]=] is "`writable`" and ! [$WritableStreamCloseQueuedOrInFlight$](|dest|) is false, 1. If any [=chunks=] have been read but not yet written, write them to |dest|. @@ -2218,10 +2224,8 @@ The following abstract operations operate on {{ReadableStream}} instances at a h 1. [=Finalize=], passing along |error| if it was given. * Finalize: both forms of shutdown will eventually ask to finalize, optionally with an error |error|, which means to perform the following steps: + 1. Assert: |reader|.[=ReadableStreamGenericReader/[[stream]]=] is undefined. 1. Perform ! [$WritableStreamDefaultWriterRelease$](|writer|). - 1. If |reader| [=implements=] {{ReadableStreamBYOBReader}}, perform - ! [$ReadableStreamBYOBReaderRelease$](|reader|). - 1. Otherwise, perform ! [$ReadableStreamDefaultReaderRelease$](|reader|). 1. If |signal| is not undefined, [=AbortSignal/remove=] |abortAlgorithm| from |signal|. 1. If |error| was given, [=reject=] |promise| with |error|. 1. Otherwise, [=resolve=] |promise| with undefined. diff --git a/reference-implementation/lib/abstract-ops/readable-streams.js b/reference-implementation/lib/abstract-ops/readable-streams.js index db1da4c73..0b8613092 100644 --- a/reference-implementation/lib/abstract-ops/readable-streams.js +++ b/reference-implementation/lib/abstract-ops/readable-streams.js @@ -200,6 +200,9 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC } return transformPromiseWith(writer._readyPromise, () => { + if (shuttingDown === true) { + return promiseResolvedWith(true); + } return new Promise((resolveRead, rejectRead) => { ReadableStreamDefaultReaderRead( reader, @@ -289,6 +292,7 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC return; } shuttingDown = true; + ReadableStreamDefaultReaderRelease(reader); if (dest._state === 'writable' && WritableStreamCloseQueuedOrInFlight(dest) === false) { uponFulfillment(waitForWritesToFinish(), doTheRest); @@ -310,6 +314,7 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC return; } shuttingDown = true; + ReadableStreamDefaultReaderRelease(reader); if (dest._state === 'writable' && WritableStreamCloseQueuedOrInFlight(dest) === false) { uponFulfillment(waitForWritesToFinish(), () => finalize(isError, error)); @@ -319,8 +324,8 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC } function finalize(isError, error) { + assert(reader._stream === undefined); WritableStreamDefaultWriterRelease(writer); - ReadableStreamDefaultReaderRelease(reader); if (signal !== undefined) { signal.removeEventListener('abort', abortAlgorithm); From 15a9768dfeeb4ea94e11bf848ca85a2b4dc23d4c Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Sun, 16 Jan 2022 01:29:32 +0100 Subject: [PATCH 02/17] Roll WPT --- reference-implementation/web-platform-tests | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/reference-implementation/web-platform-tests b/reference-implementation/web-platform-tests index 99d74f952..9c259c332 160000 --- a/reference-implementation/web-platform-tests +++ b/reference-implementation/web-platform-tests @@ -1 +1 @@ -Subproject commit 99d74f9529e16ec0722ef11136ab29b9e80fff26 +Subproject commit 9c259c33236adb4464bbe57383064e8fb22a0a6d From 7008ee512b0eea8ac33406ba4892402019a2bce1 Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Wed, 19 Jan 2022 23:22:18 +0100 Subject: [PATCH 03/17] Keep source locked until the end of the pipe --- index.bs | 13 ++++++++++++- .../lib/abstract-ops/readable-streams.js | 6 ++++-- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/index.bs b/index.bs index 931577114..1fb17decd 100644 --- a/index.bs +++ b/index.bs @@ -2201,6 +2201,14 @@ The following abstract operations operate on {{ReadableStream}} instances at a h 1. If |reader| [=implements=] {{ReadableStreamBYOBReader}}, perform ! [$ReadableStreamBYOBReaderRelease$](|reader|). 1. Otherwise, perform ! [$ReadableStreamDefaultReaderRelease$](|reader|). + 1. Set |reader| to ! [$AcquireReadableStreamDefaultReader$](|source|). +

The initial reader is released to ensure that any pending read requests + are immediately aborted, and no more chunks are pulled from |source|. A new reader is + acquired in order to keep |source| locked until the shutdown is [=finalized=], for example + to [=cancel a readable stream|cancel=] |source| if necessary. + This exchange of readers is not observable to author code and the user agent is free to + implement this differently, for example by keeping the same reader and internally aborting + its pending read requests. 1. If |dest|.[=WritableStream/[[state]]=] is "`writable`" and ! [$WritableStreamCloseQueuedOrInFlight$](|dest|) is false, 1. If any [=chunks=] have been read but not yet written, write them to |dest|. @@ -2216,6 +2224,7 @@ The following abstract operations operate on {{ReadableStream}} instances at a h 1. If |reader| [=implements=] {{ReadableStreamBYOBReader}}, perform ! [$ReadableStreamBYOBReaderRelease$](|reader|). 1. Otherwise, perform ! [$ReadableStreamDefaultReaderRelease$](|reader|). + 1. Set |reader| to ! [$AcquireReadableStreamDefaultReader$](|source|). 1. If |dest|.[=WritableStream/[[state]]=] is "`writable`" and ! [$WritableStreamCloseQueuedOrInFlight$](|dest|) is false, 1. If any [=chunks=] have been read but not yet written, write them to |dest|. @@ -2224,8 +2233,10 @@ The following abstract operations operate on {{ReadableStream}} instances at a h 1. [=Finalize=], passing along |error| if it was given. * Finalize: both forms of shutdown will eventually ask to finalize, optionally with an error |error|, which means to perform the following steps: - 1. Assert: |reader|.[=ReadableStreamGenericReader/[[stream]]=] is undefined. 1. Perform ! [$WritableStreamDefaultWriterRelease$](|writer|). + 1. If |reader| [=implements=] {{ReadableStreamBYOBReader}}, perform + ! [$ReadableStreamBYOBReaderRelease$](|reader|). + 1. Otherwise, perform ! [$ReadableStreamDefaultReaderRelease$](|reader|). 1. If |signal| is not undefined, [=AbortSignal/remove=] |abortAlgorithm| from |signal|. 1. If |error| was given, [=reject=] |promise| with |error|. 1. Otherwise, [=resolve=] |promise| with undefined. diff --git a/reference-implementation/lib/abstract-ops/readable-streams.js b/reference-implementation/lib/abstract-ops/readable-streams.js index 0b8613092..74f34a369 100644 --- a/reference-implementation/lib/abstract-ops/readable-streams.js +++ b/reference-implementation/lib/abstract-ops/readable-streams.js @@ -134,7 +134,7 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC assert(IsReadableStreamLocked(source) === false); assert(IsWritableStreamLocked(dest) === false); - const reader = AcquireReadableStreamDefaultReader(source); + let reader = AcquireReadableStreamDefaultReader(source); const writer = AcquireWritableStreamDefaultWriter(dest); source._disturbed = true; @@ -293,6 +293,7 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC } shuttingDown = true; ReadableStreamDefaultReaderRelease(reader); + reader = AcquireReadableStreamDefaultReader(source); if (dest._state === 'writable' && WritableStreamCloseQueuedOrInFlight(dest) === false) { uponFulfillment(waitForWritesToFinish(), doTheRest); @@ -315,6 +316,7 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC } shuttingDown = true; ReadableStreamDefaultReaderRelease(reader); + reader = AcquireReadableStreamDefaultReader(source); if (dest._state === 'writable' && WritableStreamCloseQueuedOrInFlight(dest) === false) { uponFulfillment(waitForWritesToFinish(), () => finalize(isError, error)); @@ -324,8 +326,8 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC } function finalize(isError, error) { - assert(reader._stream === undefined); WritableStreamDefaultWriterRelease(writer); + ReadableStreamDefaultReaderRelease(reader); if (signal !== undefined) { signal.removeEventListener('abort', abortAlgorithm); From 7b446d57b817975b94e68242e6a0f3405a0c1014 Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Wed, 19 Jan 2022 23:31:27 +0100 Subject: [PATCH 04/17] Check whether destination is still writable before starting a new read --- reference-implementation/lib/abstract-ops/readable-streams.js | 3 +++ 1 file changed, 3 insertions(+) diff --git a/reference-implementation/lib/abstract-ops/readable-streams.js b/reference-implementation/lib/abstract-ops/readable-streams.js index 74f34a369..b27790077 100644 --- a/reference-implementation/lib/abstract-ops/readable-streams.js +++ b/reference-implementation/lib/abstract-ops/readable-streams.js @@ -203,6 +203,9 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC if (shuttingDown === true) { return promiseResolvedWith(true); } + if (dest._state !== 'writable' || WritableStreamCloseQueuedOrInFlight(dest) === true) { + return promiseResolvedWith(true); + } return new Promise((resolveRead, rejectRead) => { ReadableStreamDefaultReaderRead( reader, From 36a9ad9edeb9af7caa05fa50e0d48d9f0ac93a67 Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Thu, 20 Jan 2022 23:15:02 +0100 Subject: [PATCH 05/17] Shutdown pipe if destination becomes "erroring" --- index.bs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/index.bs b/index.bs index 1fb17decd..5244a068a 100644 --- a/index.bs +++ b/index.bs @@ -2175,7 +2175,7 @@ The following abstract operations operate on {{ReadableStream}} instances at a h |source|.[=ReadableStream/[[storedError]]=]. 1. Otherwise, [=shutdown=] with |source|.[=ReadableStream/[[storedError]]=]. 1. Errors must be propagated backward: if |dest|.[=WritableStream/[[state]]=] - is or becomes "`errored`", then + is or becomes "`erroring`" or "`errored`", then 1. If |preventCancel| is false, [=shutdown with an action=] of ! [$ReadableStreamCancel$](|source|, |dest|.[=WritableStream/[[storedError]]=]) and with |dest|.[=WritableStream/[[storedError]]=]. From 256e70fcfa930b8258075ae9410975b5506ee66d Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Thu, 20 Jan 2022 23:16:36 +0100 Subject: [PATCH 06/17] Use helper to synchronously detect writable becoming errored --- .../lib/abstract-ops/readable-streams.js | 8 ++++--- .../lib/abstract-ops/writable-streams.js | 24 +++++++++++++++++++ 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/reference-implementation/lib/abstract-ops/readable-streams.js b/reference-implementation/lib/abstract-ops/readable-streams.js index b27790077..bbda97185 100644 --- a/reference-implementation/lib/abstract-ops/readable-streams.js +++ b/reference-implementation/lib/abstract-ops/readable-streams.js @@ -9,8 +9,9 @@ const { CanTransferArrayBuffer, CopyDataBlockBytes, CreateArrayFromList, IsDetac const { CloneAsUint8Array, IsNonNegativeNumber } = require('./miscellaneous.js'); const { EnqueueValueWithSize, ResetQueue } = require('./queue-with-sizes.js'); const { AcquireWritableStreamDefaultWriter, IsWritableStreamLocked, WritableStreamAbort, - WritableStreamDefaultWriterCloseWithErrorPropagation, WritableStreamDefaultWriterRelease, - WritableStreamDefaultWriterWrite, WritableStreamCloseQueuedOrInFlight } = require('./writable-streams.js'); + WritableStreamDefaultWriterCloseWithErrorPropagation, WritableStreamDefaultWriterIsOrBecomesErrored, + WritableStreamDefaultWriterRelease, WritableStreamDefaultWriterWrite, WritableStreamCloseQueuedOrInFlight } = + require('./writable-streams.js'); const { CancelSteps, PullSteps, ReleaseSteps } = require('./internal-methods.js'); const ReadableByteStreamController = require('../../generated/ReadableByteStreamController.js'); @@ -234,7 +235,8 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC }); // Errors must be propagated backward - isOrBecomesErrored(dest, writer._closedPromise, storedError => { + WritableStreamDefaultWriterIsOrBecomesErrored(writer, () => { + const storedError = dest._storedError; if (preventCancel === false) { shutdownWithAction(() => ReadableStreamCancel(source, storedError), true, storedError); } else { diff --git a/reference-implementation/lib/abstract-ops/writable-streams.js b/reference-implementation/lib/abstract-ops/writable-streams.js index cf303bfe7..697259afd 100644 --- a/reference-implementation/lib/abstract-ops/writable-streams.js +++ b/reference-implementation/lib/abstract-ops/writable-streams.js @@ -31,6 +31,7 @@ Object.assign(exports, { WritableStreamDefaultWriterClose, WritableStreamDefaultWriterCloseWithErrorPropagation, WritableStreamDefaultWriterGetDesiredSize, + WritableStreamDefaultWriterIsOrBecomesErrored, WritableStreamDefaultWriterRelease, WritableStreamDefaultWriterWrite }); @@ -143,6 +144,8 @@ function SetUpWritableStreamDefaultWriter(writer, stream) { writer._stream = stream; stream._writer = writer; + writer._errorListeners = []; + const state = stream._state; if (state === 'writable') { @@ -378,6 +381,11 @@ function WritableStreamStartErroring(stream, reason) { const writer = stream._writer; if (writer !== undefined) { WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer, reason); + const errorListeners = writer._errorListeners; + writer._errorListeners = []; + for (const errorListener of errorListeners) { + errorListener(); + } } if (WritableStreamHasOperationMarkedInFlight(stream) === false && controller._started === true) { @@ -475,6 +483,20 @@ function WritableStreamDefaultWriterGetDesiredSize(writer) { return WritableStreamDefaultControllerGetDesiredSize(stream._controller); } +function WritableStreamDefaultWriterIsOrBecomesErrored(writer, errorListener) { + const stream = writer._stream; + if (stream === undefined) { + return; + } + + const state = stream._state; + if (state === 'writable') { + writer._errorListeners.push(errorListener); + } else if (state === 'erroring' || state === 'errored') { + errorListener(); + } +} + function WritableStreamDefaultWriterRelease(writer) { const stream = writer._stream; assert(stream !== undefined); @@ -491,6 +513,8 @@ function WritableStreamDefaultWriterRelease(writer) { stream._writer = undefined; writer._stream = undefined; + + stream._errorListeners = []; } function WritableStreamDefaultWriterWrite(writer, chunk) { From 0eb6177b85c062ed21c5168d0dd6851562cd1f94 Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Thu, 20 Jan 2022 23:31:33 +0100 Subject: [PATCH 07/17] Roll WPT --- reference-implementation/web-platform-tests | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/reference-implementation/web-platform-tests b/reference-implementation/web-platform-tests index 9c259c332..1646d657b 160000 --- a/reference-implementation/web-platform-tests +++ b/reference-implementation/web-platform-tests @@ -1 +1 @@ -Subproject commit 9c259c33236adb4464bbe57383064e8fb22a0a6d +Subproject commit 1646d657bf258f850afb5f22860ec02dccbc6b7a From 8cfaed4eb5ace5c0a6f166ff38ca8506080a42e5 Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Tue, 25 Jan 2022 21:51:29 +0100 Subject: [PATCH 08/17] Move --- .../lib/abstract-ops/writable-streams.js | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/reference-implementation/lib/abstract-ops/writable-streams.js b/reference-implementation/lib/abstract-ops/writable-streams.js index 697259afd..cf51595c3 100644 --- a/reference-implementation/lib/abstract-ops/writable-streams.js +++ b/reference-implementation/lib/abstract-ops/writable-streams.js @@ -483,20 +483,6 @@ function WritableStreamDefaultWriterGetDesiredSize(writer) { return WritableStreamDefaultControllerGetDesiredSize(stream._controller); } -function WritableStreamDefaultWriterIsOrBecomesErrored(writer, errorListener) { - const stream = writer._stream; - if (stream === undefined) { - return; - } - - const state = stream._state; - if (state === 'writable') { - writer._errorListeners.push(errorListener); - } else if (state === 'erroring' || state === 'errored') { - errorListener(); - } -} - function WritableStreamDefaultWriterRelease(writer) { const stream = writer._stream; assert(stream !== undefined); @@ -550,6 +536,20 @@ function WritableStreamDefaultWriterWrite(writer, chunk) { return promise; } +function WritableStreamDefaultWriterIsOrBecomesErrored(writer, errorListener) { + const stream = writer._stream; + if (stream === undefined) { + return; + } + + const state = stream._state; + if (state === 'writable') { + writer._errorListeners.push(errorListener); + } else if (state === 'erroring' || state === 'errored') { + errorListener(); + } +} + // Default controllers function SetUpWritableStreamDefaultController(stream, controller, startAlgorithm, writeAlgorithm, closeAlgorithm, From 01a4f49320557b9a40009c43240f795f93b68fa3 Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Tue, 25 Jan 2022 21:55:49 +0100 Subject: [PATCH 09/17] Rename helper --- .../lib/abstract-ops/readable-streams.js | 6 +++--- .../lib/abstract-ops/writable-streams.js | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/reference-implementation/lib/abstract-ops/readable-streams.js b/reference-implementation/lib/abstract-ops/readable-streams.js index bbda97185..31524fcb6 100644 --- a/reference-implementation/lib/abstract-ops/readable-streams.js +++ b/reference-implementation/lib/abstract-ops/readable-streams.js @@ -9,8 +9,8 @@ const { CanTransferArrayBuffer, CopyDataBlockBytes, CreateArrayFromList, IsDetac const { CloneAsUint8Array, IsNonNegativeNumber } = require('./miscellaneous.js'); const { EnqueueValueWithSize, ResetQueue } = require('./queue-with-sizes.js'); const { AcquireWritableStreamDefaultWriter, IsWritableStreamLocked, WritableStreamAbort, - WritableStreamDefaultWriterCloseWithErrorPropagation, WritableStreamDefaultWriterIsOrBecomesErrored, - WritableStreamDefaultWriterRelease, WritableStreamDefaultWriterWrite, WritableStreamCloseQueuedOrInFlight } = + WritableStreamDefaultWriterCloseWithErrorPropagation, WritableStreamDefaultWriterRelease, + WritableStreamDefaultWriterWrite, WritableStreamCloseQueuedOrInFlight, defaultWriterAddErrorListener } = require('./writable-streams.js'); const { CancelSteps, PullSteps, ReleaseSteps } = require('./internal-methods.js'); @@ -235,7 +235,7 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC }); // Errors must be propagated backward - WritableStreamDefaultWriterIsOrBecomesErrored(writer, () => { + defaultWriterAddErrorListener(writer, () => { const storedError = dest._storedError; if (preventCancel === false) { shutdownWithAction(() => ReadableStreamCancel(source, storedError), true, storedError); diff --git a/reference-implementation/lib/abstract-ops/writable-streams.js b/reference-implementation/lib/abstract-ops/writable-streams.js index cf51595c3..11a5706b2 100644 --- a/reference-implementation/lib/abstract-ops/writable-streams.js +++ b/reference-implementation/lib/abstract-ops/writable-streams.js @@ -31,9 +31,9 @@ Object.assign(exports, { WritableStreamDefaultWriterClose, WritableStreamDefaultWriterCloseWithErrorPropagation, WritableStreamDefaultWriterGetDesiredSize, - WritableStreamDefaultWriterIsOrBecomesErrored, WritableStreamDefaultWriterRelease, - WritableStreamDefaultWriterWrite + WritableStreamDefaultWriterWrite, + defaultWriterAddErrorListener }); // Working with writable streams @@ -536,7 +536,7 @@ function WritableStreamDefaultWriterWrite(writer, chunk) { return promise; } -function WritableStreamDefaultWriterIsOrBecomesErrored(writer, errorListener) { +function defaultWriterAddErrorListener(writer, errorListener) { const stream = writer._stream; if (stream === undefined) { return; From 3ee5c8fcf530a1390918ddfac37a0e31c2f3caa4 Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Tue, 25 Jan 2022 21:56:01 +0100 Subject: [PATCH 10/17] Extract defaultWriterRunErrorListeners helper --- .../lib/abstract-ops/writable-streams.js | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/reference-implementation/lib/abstract-ops/writable-streams.js b/reference-implementation/lib/abstract-ops/writable-streams.js index 11a5706b2..dbb12c878 100644 --- a/reference-implementation/lib/abstract-ops/writable-streams.js +++ b/reference-implementation/lib/abstract-ops/writable-streams.js @@ -381,11 +381,7 @@ function WritableStreamStartErroring(stream, reason) { const writer = stream._writer; if (writer !== undefined) { WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer, reason); - const errorListeners = writer._errorListeners; - writer._errorListeners = []; - for (const errorListener of errorListeners) { - errorListener(); - } + defaultWriterRunErrorListeners(writer); } if (WritableStreamHasOperationMarkedInFlight(stream) === false && controller._started === true) { @@ -550,6 +546,14 @@ function defaultWriterAddErrorListener(writer, errorListener) { } } +function defaultWriterRunErrorListeners(writer) { + const errorListeners = writer._errorListeners; + writer._errorListeners = []; + for (const errorListener of errorListeners) { + errorListener(); + } +} + // Default controllers function SetUpWritableStreamDefaultController(stream, controller, startAlgorithm, writeAlgorithm, closeAlgorithm, From a0364b96e615a705344b2373bfc16f2cc111cbb7 Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Tue, 25 Jan 2022 22:09:51 +0100 Subject: [PATCH 11/17] Use specialized helpers for detecting when source/dest become closed/erroring/errored during pipe --- .../lib/abstract-ops/readable-streams.js | 33 ++++++++++++------- .../lib/abstract-ops/writable-streams.js | 19 ++++------- 2 files changed, 28 insertions(+), 24 deletions(-) diff --git a/reference-implementation/lib/abstract-ops/readable-streams.js b/reference-implementation/lib/abstract-ops/readable-streams.js index 31524fcb6..51cde8b01 100644 --- a/reference-implementation/lib/abstract-ops/readable-streams.js +++ b/reference-implementation/lib/abstract-ops/readable-streams.js @@ -226,7 +226,7 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC } // Errors must be propagated forward - isOrBecomesErrored(source, reader._closedPromise, storedError => { + sourceIsOrBecomesErrored(storedError => { if (preventAbort === false) { shutdownWithAction(() => WritableStreamAbort(dest, storedError), true, storedError); } else { @@ -235,8 +235,7 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC }); // Errors must be propagated backward - defaultWriterAddErrorListener(writer, () => { - const storedError = dest._storedError; + destIsOrBecomesErroringOrErrored(storedError => { if (preventCancel === false) { shutdownWithAction(() => ReadableStreamCancel(source, storedError), true, storedError); } else { @@ -245,7 +244,7 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC }); // Closing must be propagated forward - isOrBecomesClosed(source, reader._closedPromise, () => { + sourceIsOrBecomesClosed(() => { if (preventClose === false) { shutdownWithAction(() => WritableStreamDefaultWriterCloseWithErrorPropagation(writer)); } else { @@ -276,19 +275,31 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC ); } - function isOrBecomesErrored(stream, promise, action) { - if (stream._state === 'errored') { - action(stream._storedError); + function sourceIsOrBecomesErrored(action) { + if (source._state === 'errored') { + action(source._storedError); } else { - uponRejection(promise, action); + uponRejection(reader._closedPromise, action); } } - function isOrBecomesClosed(stream, promise, action) { - if (stream._state === 'closed') { + function sourceIsOrBecomesClosed(action) { + if (source._state === 'closed') { action(); } else { - uponFulfillment(promise, action); + uponFulfillment(reader._closedPromise, action); + } + } + + function destIsOrBecomesErroringOrErrored(action) { + const state = dest._state; + if (state === 'erroring' || state === 'errored') { + action(dest._storedError); + } else if (state === 'writable') { + defaultWriterAddErrorListener(writer, action); + } else { + assert(state === 'closed'); + // Handled in "closing must be propagated backward" } } diff --git a/reference-implementation/lib/abstract-ops/writable-streams.js b/reference-implementation/lib/abstract-ops/writable-streams.js index dbb12c878..49c974b6c 100644 --- a/reference-implementation/lib/abstract-ops/writable-streams.js +++ b/reference-implementation/lib/abstract-ops/writable-streams.js @@ -381,7 +381,7 @@ function WritableStreamStartErroring(stream, reason) { const writer = stream._writer; if (writer !== undefined) { WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer, reason); - defaultWriterRunErrorListeners(writer); + defaultWriterRunErrorListeners(writer, reason); } if (WritableStreamHasOperationMarkedInFlight(stream) === false && controller._started === true) { @@ -534,23 +534,16 @@ function WritableStreamDefaultWriterWrite(writer, chunk) { function defaultWriterAddErrorListener(writer, errorListener) { const stream = writer._stream; - if (stream === undefined) { - return; - } - - const state = stream._state; - if (state === 'writable') { - writer._errorListeners.push(errorListener); - } else if (state === 'erroring' || state === 'errored') { - errorListener(); - } + assert(stream !== undefined); + assert(stream._state === 'writable'); + writer._errorListeners.push(errorListener); } -function defaultWriterRunErrorListeners(writer) { +function defaultWriterRunErrorListeners(writer, error) { const errorListeners = writer._errorListeners; writer._errorListeners = []; for (const errorListener of errorListeners) { - errorListener(); + errorListener(error); } } From 0c2385709daa0f66768b287f18ef88bb3113bb72 Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Tue, 25 Jan 2022 22:30:46 +0100 Subject: [PATCH 12/17] Generalize to listen for *any* state change --- .../lib/abstract-ops/readable-streams.js | 49 +++++++++++++++---- .../lib/abstract-ops/writable-streams.js | 28 ++++++----- 2 files changed, 57 insertions(+), 20 deletions(-) diff --git a/reference-implementation/lib/abstract-ops/readable-streams.js b/reference-implementation/lib/abstract-ops/readable-streams.js index 51cde8b01..93ed92dae 100644 --- a/reference-implementation/lib/abstract-ops/readable-streams.js +++ b/reference-implementation/lib/abstract-ops/readable-streams.js @@ -9,8 +9,8 @@ const { CanTransferArrayBuffer, CopyDataBlockBytes, CreateArrayFromList, IsDetac const { CloneAsUint8Array, IsNonNegativeNumber } = require('./miscellaneous.js'); const { EnqueueValueWithSize, ResetQueue } = require('./queue-with-sizes.js'); const { AcquireWritableStreamDefaultWriter, IsWritableStreamLocked, WritableStreamAbort, - WritableStreamDefaultWriterCloseWithErrorPropagation, WritableStreamDefaultWriterRelease, - WritableStreamDefaultWriterWrite, WritableStreamCloseQueuedOrInFlight, defaultWriterAddErrorListener } = + WritableStreamDefaultWriterCloseWithErrorPropagation, WritableStreamDefaultWriterRelease, + WritableStreamDefaultWriterWrite, WritableStreamCloseQueuedOrInFlight, defaultWriterAddStateChangeListener } = require('./writable-streams.js'); const { CancelSteps, PullSteps, ReleaseSteps } = require('./internal-methods.js'); @@ -226,7 +226,8 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC } // Errors must be propagated forward - sourceIsOrBecomesErrored(storedError => { + sourceIsOrBecomesErrored(() => { + const storedError = source._storedError; if (preventAbort === false) { shutdownWithAction(() => WritableStreamAbort(dest, storedError), true, storedError); } else { @@ -235,7 +236,8 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC }); // Errors must be propagated backward - destIsOrBecomesErroringOrErrored(storedError => { + destIsOrBecomesErroringOrErrored(() => { + const storedError = dest._storedError; if (preventCancel === false) { shutdownWithAction(() => ReadableStreamCancel(source, storedError), true, storedError); } else { @@ -276,18 +278,26 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC } function sourceIsOrBecomesErrored(action) { - if (source._state === 'errored') { - action(source._storedError); + const state = source._state; + if (state === 'errored') { + action(); + } else if (state === 'readable') { + readerAddStateChangeListener(reader, () => sourceIsOrBecomesErrored(action)); } else { - uponRejection(reader._closedPromise, action); + assert(state === 'closed'); + // Handled in "closing must be propagated forward" } } function sourceIsOrBecomesClosed(action) { + const state = source._state; if (source._state === 'closed') { action(); + } else if (state === 'readable') { + readerAddStateChangeListener(reader, () => sourceIsOrBecomesClosed(action)); } else { - uponFulfillment(reader._closedPromise, action); + assert(state === 'errored'); + // Handled in "errors must be propagated forward" } } @@ -296,7 +306,7 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC if (state === 'erroring' || state === 'errored') { action(dest._storedError); } else if (state === 'writable') { - defaultWriterAddErrorListener(writer, action); + defaultWriterAddStateChangeListener(writer, () => destIsOrBecomesErroringOrErrored(action)); } else { assert(state === 'closed'); // Handled in "closing must be propagated backward" @@ -793,6 +803,7 @@ function ReadableStreamClose(stream) { } resolvePromise(reader._closedPromise, undefined); + readerRunStateChangeListeners(reader); if (ReadableStreamDefaultReader.isImpl(reader)) { const readRequests = reader._readRequests; @@ -817,6 +828,7 @@ function ReadableStreamError(stream, e) { rejectPromise(reader._closedPromise, e); setPromiseIsHandledToTrue(reader._closedPromise); + readerRunStateChangeListeners(reader); if (ReadableStreamDefaultReader.isImpl(reader)) { ReadableStreamDefaultReaderErrorReadRequests(reader, e); @@ -900,6 +912,8 @@ function ReadableStreamReaderGenericInitialize(reader, stream) { reader._stream = stream; stream._reader = reader; + reader._stateChangeListeners = []; + if (stream._state === 'readable') { reader._closedPromise = newPromise(); } else if (stream._state === 'closed') { @@ -933,6 +947,23 @@ function ReadableStreamReaderGenericRelease(reader) { stream._reader = undefined; reader._stream = undefined; + + reader._stateChangeListeners = []; +} + +function readerAddStateChangeListener(reader, stateChangeListener) { + const stream = reader._stream; + assert(stream !== undefined); + assert(stream._state === 'readable'); + reader._stateChangeListeners.push(stateChangeListener); +} + +function readerRunStateChangeListeners(reader) { + const stateChangeListeners = reader._stateChangeListeners; + reader._stateChangeListeners = []; + for (const stateChangeListener of stateChangeListeners) { + stateChangeListener(); + } } function ReadableStreamBYOBReaderRead(reader, view, readIntoRequest) { diff --git a/reference-implementation/lib/abstract-ops/writable-streams.js b/reference-implementation/lib/abstract-ops/writable-streams.js index 49c974b6c..6d1d1b7ef 100644 --- a/reference-implementation/lib/abstract-ops/writable-streams.js +++ b/reference-implementation/lib/abstract-ops/writable-streams.js @@ -33,7 +33,7 @@ Object.assign(exports, { WritableStreamDefaultWriterGetDesiredSize, WritableStreamDefaultWriterRelease, WritableStreamDefaultWriterWrite, - defaultWriterAddErrorListener + defaultWriterAddStateChangeListener }); // Working with writable streams @@ -144,7 +144,7 @@ function SetUpWritableStreamDefaultWriter(writer, stream) { writer._stream = stream; stream._writer = writer; - writer._errorListeners = []; + writer._stateChangeListeners = []; const state = stream._state; @@ -242,6 +242,11 @@ function WritableStreamFinishErroring(stream) { } stream._writeRequests = []; + const writer = stream._writer; + if (writer !== undefined) { + defaultWriterRunStateChangeListeners(writer); + } + if (stream._pendingAbortRequest === undefined) { WritableStreamRejectCloseAndClosedPromiseIfNeeded(stream); return; @@ -292,6 +297,7 @@ function WritableStreamFinishInFlightClose(stream) { const writer = stream._writer; if (writer !== undefined) { resolvePromise(writer._closedPromise, undefined); + defaultWriterRunStateChangeListeners(writer); } assert(stream._pendingAbortRequest === undefined); @@ -381,7 +387,7 @@ function WritableStreamStartErroring(stream, reason) { const writer = stream._writer; if (writer !== undefined) { WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer, reason); - defaultWriterRunErrorListeners(writer, reason); + defaultWriterRunStateChangeListeners(writer, reason); } if (WritableStreamHasOperationMarkedInFlight(stream) === false && controller._started === true) { @@ -496,7 +502,7 @@ function WritableStreamDefaultWriterRelease(writer) { stream._writer = undefined; writer._stream = undefined; - stream._errorListeners = []; + writer._stateChangeListeners = []; } function WritableStreamDefaultWriterWrite(writer, chunk) { @@ -532,18 +538,18 @@ function WritableStreamDefaultWriterWrite(writer, chunk) { return promise; } -function defaultWriterAddErrorListener(writer, errorListener) { +function defaultWriterAddStateChangeListener(writer, stateChangeListener) { const stream = writer._stream; assert(stream !== undefined); assert(stream._state === 'writable'); - writer._errorListeners.push(errorListener); + writer._stateChangeListeners.push(stateChangeListener); } -function defaultWriterRunErrorListeners(writer, error) { - const errorListeners = writer._errorListeners; - writer._errorListeners = []; - for (const errorListener of errorListeners) { - errorListener(error); +function defaultWriterRunStateChangeListeners(writer) { + const stateChangeListeners = writer._stateChangeListeners; + writer._stateChangeListeners = []; + for (const stateChangeListener of stateChangeListeners) { + stateChangeListener(); } } From e333293d9ae319c423388471e6b986108e8cdde5 Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Tue, 25 Jan 2022 22:42:29 +0100 Subject: [PATCH 13/17] Use a single state change listener to check all shutdown conditions --- .../lib/abstract-ops/readable-streams.js | 70 ++++++++----------- 1 file changed, 28 insertions(+), 42 deletions(-) diff --git a/reference-implementation/lib/abstract-ops/readable-streams.js b/reference-implementation/lib/abstract-ops/readable-streams.js index 93ed92dae..8348ba3ed 100644 --- a/reference-implementation/lib/abstract-ops/readable-streams.js +++ b/reference-implementation/lib/abstract-ops/readable-streams.js @@ -226,33 +226,50 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC } // Errors must be propagated forward - sourceIsOrBecomesErrored(() => { + function sourceIsOrBecomesErrored() { const storedError = source._storedError; if (preventAbort === false) { shutdownWithAction(() => WritableStreamAbort(dest, storedError), true, storedError); } else { shutdown(true, storedError); } - }); + } // Errors must be propagated backward - destIsOrBecomesErroringOrErrored(() => { + function destIsOrBecomesErroringOrErrored() { const storedError = dest._storedError; if (preventCancel === false) { shutdownWithAction(() => ReadableStreamCancel(source, storedError), true, storedError); } else { shutdown(true, storedError); } - }); + } // Closing must be propagated forward - sourceIsOrBecomesClosed(() => { + function sourceIsOrBecomesClosed() { if (preventClose === false) { shutdownWithAction(() => WritableStreamDefaultWriterCloseWithErrorPropagation(writer)); } else { shutdown(); } - }); + } + + function checkState() { + const sourceState = source._state; + const destState = dest._state; + if (sourceState === 'errored') { + // Errors must be propagated forward + sourceIsOrBecomesErrored(); + } else if (destState === 'erroring' || destState === 'errored') { + // Errors must be propagated backward + destIsOrBecomesErroringOrErrored(); + } else if (sourceState === 'closed') { + // Closing must be propagated forward + sourceIsOrBecomesClosed(); + } + } + + checkState(); // Closing must be propagated backward if (WritableStreamCloseQueuedOrInFlight(dest) === true || dest._state === 'closed') { @@ -265,6 +282,11 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC } } + if (!shuttingDown) { + readerAddStateChangeListener(reader, checkState); + defaultWriterAddStateChangeListener(writer, checkState); + } + setPromiseIsHandledToTrue(pipeLoop()); function waitForWritesToFinish() { @@ -277,42 +299,6 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC ); } - function sourceIsOrBecomesErrored(action) { - const state = source._state; - if (state === 'errored') { - action(); - } else if (state === 'readable') { - readerAddStateChangeListener(reader, () => sourceIsOrBecomesErrored(action)); - } else { - assert(state === 'closed'); - // Handled in "closing must be propagated forward" - } - } - - function sourceIsOrBecomesClosed(action) { - const state = source._state; - if (source._state === 'closed') { - action(); - } else if (state === 'readable') { - readerAddStateChangeListener(reader, () => sourceIsOrBecomesClosed(action)); - } else { - assert(state === 'errored'); - // Handled in "errors must be propagated forward" - } - } - - function destIsOrBecomesErroringOrErrored(action) { - const state = dest._state; - if (state === 'erroring' || state === 'errored') { - action(dest._storedError); - } else if (state === 'writable') { - defaultWriterAddStateChangeListener(writer, () => destIsOrBecomesErroringOrErrored(action)); - } else { - assert(state === 'closed'); - // Handled in "closing must be propagated backward" - } - } - function shutdownWithAction(action, originalIsError, originalError) { if (shuttingDown === true) { return; From 285bfb7241df0c6b2a3589cb5bc9cc150039178c Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Tue, 25 Jan 2022 22:43:22 +0100 Subject: [PATCH 14/17] Avoid starting pipe loop if already shutting down --- reference-implementation/lib/abstract-ops/readable-streams.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/reference-implementation/lib/abstract-ops/readable-streams.js b/reference-implementation/lib/abstract-ops/readable-streams.js index 8348ba3ed..16d8d5af5 100644 --- a/reference-implementation/lib/abstract-ops/readable-streams.js +++ b/reference-implementation/lib/abstract-ops/readable-streams.js @@ -285,9 +285,9 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC if (!shuttingDown) { readerAddStateChangeListener(reader, checkState); defaultWriterAddStateChangeListener(writer, checkState); - } - setPromiseIsHandledToTrue(pipeLoop()); + setPromiseIsHandledToTrue(pipeLoop()); + } function waitForWritesToFinish() { // Another write may have started while we were waiting on this currentWrite, so we have to be sure to wait From 817293cd7691d420e31d61dd8f6a53f9248a850a Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Tue, 25 Jan 2022 22:44:31 +0100 Subject: [PATCH 15/17] Rename --- .../lib/abstract-ops/readable-streams.js | 4 ++-- .../lib/abstract-ops/writable-streams.js | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/reference-implementation/lib/abstract-ops/readable-streams.js b/reference-implementation/lib/abstract-ops/readable-streams.js index 16d8d5af5..101781718 100644 --- a/reference-implementation/lib/abstract-ops/readable-streams.js +++ b/reference-implementation/lib/abstract-ops/readable-streams.js @@ -10,7 +10,7 @@ const { CloneAsUint8Array, IsNonNegativeNumber } = require('./miscellaneous.js') const { EnqueueValueWithSize, ResetQueue } = require('./queue-with-sizes.js'); const { AcquireWritableStreamDefaultWriter, IsWritableStreamLocked, WritableStreamAbort, WritableStreamDefaultWriterCloseWithErrorPropagation, WritableStreamDefaultWriterRelease, - WritableStreamDefaultWriterWrite, WritableStreamCloseQueuedOrInFlight, defaultWriterAddStateChangeListener } = + WritableStreamDefaultWriterWrite, WritableStreamCloseQueuedOrInFlight, writerAddStateChangeListener } = require('./writable-streams.js'); const { CancelSteps, PullSteps, ReleaseSteps } = require('./internal-methods.js'); @@ -284,7 +284,7 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC if (!shuttingDown) { readerAddStateChangeListener(reader, checkState); - defaultWriterAddStateChangeListener(writer, checkState); + writerAddStateChangeListener(writer, checkState); setPromiseIsHandledToTrue(pipeLoop()); } diff --git a/reference-implementation/lib/abstract-ops/writable-streams.js b/reference-implementation/lib/abstract-ops/writable-streams.js index 6d1d1b7ef..59d7f38ca 100644 --- a/reference-implementation/lib/abstract-ops/writable-streams.js +++ b/reference-implementation/lib/abstract-ops/writable-streams.js @@ -33,7 +33,7 @@ Object.assign(exports, { WritableStreamDefaultWriterGetDesiredSize, WritableStreamDefaultWriterRelease, WritableStreamDefaultWriterWrite, - defaultWriterAddStateChangeListener + writerAddStateChangeListener }); // Working with writable streams @@ -244,7 +244,7 @@ function WritableStreamFinishErroring(stream) { const writer = stream._writer; if (writer !== undefined) { - defaultWriterRunStateChangeListeners(writer); + writerRunStateChangeListeners(writer); } if (stream._pendingAbortRequest === undefined) { @@ -297,7 +297,7 @@ function WritableStreamFinishInFlightClose(stream) { const writer = stream._writer; if (writer !== undefined) { resolvePromise(writer._closedPromise, undefined); - defaultWriterRunStateChangeListeners(writer); + writerRunStateChangeListeners(writer); } assert(stream._pendingAbortRequest === undefined); @@ -387,7 +387,7 @@ function WritableStreamStartErroring(stream, reason) { const writer = stream._writer; if (writer !== undefined) { WritableStreamDefaultWriterEnsureReadyPromiseRejected(writer, reason); - defaultWriterRunStateChangeListeners(writer, reason); + writerRunStateChangeListeners(writer, reason); } if (WritableStreamHasOperationMarkedInFlight(stream) === false && controller._started === true) { @@ -538,14 +538,14 @@ function WritableStreamDefaultWriterWrite(writer, chunk) { return promise; } -function defaultWriterAddStateChangeListener(writer, stateChangeListener) { +function writerAddStateChangeListener(writer, stateChangeListener) { const stream = writer._stream; assert(stream !== undefined); assert(stream._state === 'writable'); writer._stateChangeListeners.push(stateChangeListener); } -function defaultWriterRunStateChangeListeners(writer) { +function writerRunStateChangeListeners(writer) { const stateChangeListeners = writer._stateChangeListeners; writer._stateChangeListeners = []; for (const stateChangeListener of stateChangeListeners) { From 124582a9c678538643e9106759f0aaba055a8d66 Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Tue, 25 Jan 2022 23:00:15 +0100 Subject: [PATCH 16/17] Tweak asserts --- reference-implementation/lib/abstract-ops/readable-streams.js | 2 +- reference-implementation/lib/abstract-ops/writable-streams.js | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/reference-implementation/lib/abstract-ops/readable-streams.js b/reference-implementation/lib/abstract-ops/readable-streams.js index 101781718..8f14bde21 100644 --- a/reference-implementation/lib/abstract-ops/readable-streams.js +++ b/reference-implementation/lib/abstract-ops/readable-streams.js @@ -283,6 +283,7 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC } if (!shuttingDown) { + assert(source._state === 'readable' && dest._state === 'writable'); readerAddStateChangeListener(reader, checkState); writerAddStateChangeListener(writer, checkState); @@ -940,7 +941,6 @@ function ReadableStreamReaderGenericRelease(reader) { function readerAddStateChangeListener(reader, stateChangeListener) { const stream = reader._stream; assert(stream !== undefined); - assert(stream._state === 'readable'); reader._stateChangeListeners.push(stateChangeListener); } diff --git a/reference-implementation/lib/abstract-ops/writable-streams.js b/reference-implementation/lib/abstract-ops/writable-streams.js index 59d7f38ca..a83da61f0 100644 --- a/reference-implementation/lib/abstract-ops/writable-streams.js +++ b/reference-implementation/lib/abstract-ops/writable-streams.js @@ -541,7 +541,6 @@ function WritableStreamDefaultWriterWrite(writer, chunk) { function writerAddStateChangeListener(writer, stateChangeListener) { const stream = writer._stream; assert(stream !== undefined); - assert(stream._state === 'writable'); writer._stateChangeListeners.push(stateChangeListener); } From f6d8b7343a1bba0141917e39e6412dc5ef4642bb Mon Sep 17 00:00:00 2001 From: Mattias Buelens Date: Thu, 27 Jan 2022 00:08:20 +0100 Subject: [PATCH 17/17] Assert that reader is a default reader in finalize --- index.bs | 5 ++--- .../lib/abstract-ops/readable-streams.js | 1 + 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/index.bs b/index.bs index 5244a068a..54b791b71 100644 --- a/index.bs +++ b/index.bs @@ -2233,10 +2233,9 @@ The following abstract operations operate on {{ReadableStream}} instances at a h 1. [=Finalize=], passing along |error| if it was given. * Finalize: both forms of shutdown will eventually ask to finalize, optionally with an error |error|, which means to perform the following steps: + 1. Assert: |reader| [=implements=] {{ReadableStreamDefaultReader}}. 1. Perform ! [$WritableStreamDefaultWriterRelease$](|writer|). - 1. If |reader| [=implements=] {{ReadableStreamBYOBReader}}, perform - ! [$ReadableStreamBYOBReaderRelease$](|reader|). - 1. Otherwise, perform ! [$ReadableStreamDefaultReaderRelease$](|reader|). + 1. Perform ! [$ReadableStreamDefaultReaderRelease$](|reader|). 1. If |signal| is not undefined, [=AbortSignal/remove=] |abortAlgorithm| from |signal|. 1. If |error| was given, [=reject=] |promise| with |error|. 1. Otherwise, [=resolve=] |promise| with undefined. diff --git a/reference-implementation/lib/abstract-ops/readable-streams.js b/reference-implementation/lib/abstract-ops/readable-streams.js index 8f14bde21..4ca45d816 100644 --- a/reference-implementation/lib/abstract-ops/readable-streams.js +++ b/reference-implementation/lib/abstract-ops/readable-streams.js @@ -339,6 +339,7 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC } function finalize(isError, error) { + assert(ReadableStreamDefaultReader.isImpl(reader)); WritableStreamDefaultWriterRelease(writer); ReadableStreamDefaultReaderRelease(reader);