Skip to content

Commit

Permalink
Keep source locked until the end of the pipe
Browse files Browse the repository at this point in the history
  • Loading branch information
MattiasBuelens committed Jan 19, 2022
1 parent 15a9768 commit 7008ee5
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 3 deletions.
13 changes: 12 additions & 1 deletion index.bs
Original file line number Diff line number Diff line change
Expand Up @@ -2201,6 +2201,14 @@ The following abstract operations operate on {{ReadableStream}} instances at a h
1. If |reader| [=implements=] {{ReadableStreamBYOBReader}}, perform
! [$ReadableStreamBYOBReaderRelease$](|reader|).
1. Otherwise, perform ! [$ReadableStreamDefaultReaderRelease$](|reader|).
1. Set |reader| to ! [$AcquireReadableStreamDefaultReader$](|source|).
<p class="note">The initial reader is released to ensure that any pending read requests
are immediately aborted, and no more chunks are pulled from |source|. A new reader is
acquired in order to keep |source| locked until the shutdown is [=finalized=], for example
to [=cancel a readable stream|cancel=] |source| if necessary.
This exchange of readers is not observable to author code and the user agent is free to
implement this differently, for example by keeping the same reader and internally aborting
its pending read requests.
1. If |dest|.[=WritableStream/[[state]]=] is "`writable`" and !
[$WritableStreamCloseQueuedOrInFlight$](|dest|) is false,
1. If any [=chunks=] have been read but not yet written, write them to |dest|.
Expand All @@ -2216,6 +2224,7 @@ The following abstract operations operate on {{ReadableStream}} instances at a h
1. If |reader| [=implements=] {{ReadableStreamBYOBReader}}, perform
! [$ReadableStreamBYOBReaderRelease$](|reader|).
1. Otherwise, perform ! [$ReadableStreamDefaultReaderRelease$](|reader|).
1. Set |reader| to ! [$AcquireReadableStreamDefaultReader$](|source|).
1. If |dest|.[=WritableStream/[[state]]=] is "`writable`" and !
[$WritableStreamCloseQueuedOrInFlight$](|dest|) is false,
1. If any [=chunks=] have been read but not yet written, write them to |dest|.
Expand All @@ -2224,8 +2233,10 @@ The following abstract operations operate on {{ReadableStream}} instances at a h
1. [=Finalize=], passing along |error| if it was given.
* <dfn id="rs-pipeTo-finalize"><i>Finalize</i></dfn>: both forms of shutdown will eventually ask
to finalize, optionally with an error |error|, which means to perform the following steps:
1. Assert: |reader|.[=ReadableStreamGenericReader/[[stream]]=] is undefined.
1. Perform ! [$WritableStreamDefaultWriterRelease$](|writer|).
1. If |reader| [=implements=] {{ReadableStreamBYOBReader}}, perform
! [$ReadableStreamBYOBReaderRelease$](|reader|).
1. Otherwise, perform ! [$ReadableStreamDefaultReaderRelease$](|reader|).
1. If |signal| is not undefined, [=AbortSignal/remove=] |abortAlgorithm| from |signal|.
1. If |error| was given, [=reject=] |promise| with |error|.
1. Otherwise, [=resolve=] |promise| with undefined.
Expand Down
6 changes: 4 additions & 2 deletions reference-implementation/lib/abstract-ops/readable-streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC
assert(IsReadableStreamLocked(source) === false);
assert(IsWritableStreamLocked(dest) === false);

const reader = AcquireReadableStreamDefaultReader(source);
let reader = AcquireReadableStreamDefaultReader(source);
const writer = AcquireWritableStreamDefaultWriter(dest);

source._disturbed = true;
Expand Down Expand Up @@ -293,6 +293,7 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC
}
shuttingDown = true;
ReadableStreamDefaultReaderRelease(reader);
reader = AcquireReadableStreamDefaultReader(source);

if (dest._state === 'writable' && WritableStreamCloseQueuedOrInFlight(dest) === false) {
uponFulfillment(waitForWritesToFinish(), doTheRest);
Expand All @@ -315,6 +316,7 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC
}
shuttingDown = true;
ReadableStreamDefaultReaderRelease(reader);
reader = AcquireReadableStreamDefaultReader(source);

if (dest._state === 'writable' && WritableStreamCloseQueuedOrInFlight(dest) === false) {
uponFulfillment(waitForWritesToFinish(), () => finalize(isError, error));
Expand All @@ -324,8 +326,8 @@ function ReadableStreamPipeTo(source, dest, preventClose, preventAbort, preventC
}

function finalize(isError, error) {
assert(reader._stream === undefined);
WritableStreamDefaultWriterRelease(writer);
ReadableStreamDefaultReaderRelease(reader);

if (signal !== undefined) {
signal.removeEventListener('abort', abortAlgorithm);
Expand Down

0 comments on commit 7008ee5

Please sign in to comment.