Skip to content

Commit 7a2a4d0

Browse files
authored
stream: implement TransformStream cleanup using "transformer.cancel"
Fixes: #49971 PR-URL: #50126 Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
1 parent 452d29c commit 7a2a4d0

File tree

72 files changed

+418
-94
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

72 files changed

+418
-94
lines changed

lib/internal/webstreams/readablestream.js

+3-2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ const {
1414
ObjectCreate,
1515
ObjectDefineProperties,
1616
ObjectSetPrototypeOf,
17+
Promise,
1718
PromisePrototypeThen,
1819
PromiseResolve,
1920
PromiseReject,
@@ -2444,7 +2445,7 @@ function setupReadableStreamDefaultController(
24442445
const startResult = startAlgorithm();
24452446

24462447
PromisePrototypeThen(
2447-
PromiseResolve(startResult),
2448+
new Promise((r) => r(startResult)),
24482449
() => {
24492450
controller[kState].started = true;
24502451
assert(!controller[kState].pulling);
@@ -3243,7 +3244,7 @@ function setupReadableByteStreamController(
32433244
const startResult = startAlgorithm();
32443245

32453246
PromisePrototypeThen(
3246-
PromiseResolve(startResult),
3247+
new Promise((r) => r(startResult)),
32473248
() => {
32483249
controller[kState].started = true;
32493250
assert(!controller[kState].pulling);

lib/internal/webstreams/transformstream.js

+105-11
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ const {
66
ObjectDefineProperties,
77
ObjectSetPrototypeOf,
88
PromisePrototypeThen,
9-
PromiseResolve,
109
SymbolToStringTag,
1110
Symbol,
1211
} = primordials;
@@ -47,6 +46,7 @@ const {
4746
nonOpFlush,
4847
kType,
4948
kState,
49+
nonOpCancel,
5050
} = require('internal/webstreams/util');
5151

5252
const {
@@ -384,8 +384,7 @@ function initializeTransformStream(
384384
return transformStreamDefaultSourcePullAlgorithm(stream);
385385
},
386386
cancel(reason) {
387-
transformStreamErrorWritableAndUnblockWrite(stream, reason);
388-
return PromiseResolve();
387+
return transformStreamDefaultSourceCancelAlgorithm(stream, reason);
389388
},
390389
}, {
391390
highWaterMark: readableHighWaterMark,
@@ -427,6 +426,10 @@ function transformStreamErrorWritableAndUnblockWrite(stream, error) {
427426
writableStreamDefaultControllerErrorIfNeeded(
428427
writable[kState].controller,
429428
error);
429+
transformStreamUnblockWrite(stream);
430+
}
431+
432+
function transformStreamUnblockWrite(stream) {
430433
if (stream[kState].backpressure)
431434
transformStreamSetBackpressure(stream, false);
432435
}
@@ -443,13 +446,15 @@ function setupTransformStreamDefaultController(
443446
stream,
444447
controller,
445448
transformAlgorithm,
446-
flushAlgorithm) {
449+
flushAlgorithm,
450+
cancelAlgorithm) {
447451
assert(isTransformStream(stream));
448452
assert(stream[kState].controller === undefined);
449453
controller[kState] = {
450454
stream,
451455
transformAlgorithm,
452456
flushAlgorithm,
457+
cancelAlgorithm,
453458
};
454459
stream[kState].controller = controller;
455460
}
@@ -460,21 +465,26 @@ function setupTransformStreamDefaultControllerFromTransformer(
460465
const controller = new TransformStreamDefaultController(kSkipThrow);
461466
const transform = transformer?.transform || defaultTransformAlgorithm;
462467
const flush = transformer?.flush || nonOpFlush;
468+
const cancel = transformer?.cancel || nonOpCancel;
463469
const transformAlgorithm =
464470
FunctionPrototypeBind(transform, transformer);
465471
const flushAlgorithm =
466472
FunctionPrototypeBind(flush, transformer);
473+
const cancelAlgorithm =
474+
FunctionPrototypeBind(cancel, transformer);
467475

468476
setupTransformStreamDefaultController(
469477
stream,
470478
controller,
471479
transformAlgorithm,
472-
flushAlgorithm);
480+
flushAlgorithm,
481+
cancelAlgorithm);
473482
}
474483

475484
function transformStreamDefaultControllerClearAlgorithms(controller) {
476485
controller[kState].transformAlgorithm = undefined;
477486
controller[kState].flushAlgorithm = undefined;
487+
controller[kState].cancelAlgorithm = undefined;
478488
}
479489

480490
function transformStreamDefaultControllerEnqueue(controller, chunk) {
@@ -563,7 +573,40 @@ function transformStreamDefaultSinkWriteAlgorithm(stream, chunk) {
563573
}
564574

565575
async function transformStreamDefaultSinkAbortAlgorithm(stream, reason) {
566-
transformStreamError(stream, reason);
576+
const {
577+
controller,
578+
readable,
579+
} = stream[kState];
580+
581+
if (controller[kState].finishPromise !== undefined) {
582+
return controller[kState].finishPromise;
583+
}
584+
585+
const { promise, resolve, reject } = createDeferredPromise();
586+
controller[kState].finishPromise = promise;
587+
const cancelPromise = ensureIsPromise(
588+
controller[kState].cancelAlgorithm,
589+
controller,
590+
reason);
591+
transformStreamDefaultControllerClearAlgorithms(controller);
592+
593+
PromisePrototypeThen(
594+
cancelPromise,
595+
() => {
596+
if (readable[kState].state === 'errored')
597+
reject(readable[kState].storedError);
598+
else {
599+
readableStreamDefaultControllerError(readable[kState].controller, reason);
600+
resolve();
601+
}
602+
},
603+
(error) => {
604+
readableStreamDefaultControllerError(readable[kState].controller, error);
605+
reject(error);
606+
},
607+
);
608+
609+
return controller[kState].finishPromise;
567610
}
568611

569612
function transformStreamDefaultSinkCloseAlgorithm(stream) {
@@ -572,23 +615,32 @@ function transformStreamDefaultSinkCloseAlgorithm(stream) {
572615
controller,
573616
} = stream[kState];
574617

618+
if (controller[kState].finishPromise !== undefined) {
619+
return controller[kState].finishPromise;
620+
}
621+
const { promise, resolve, reject } = createDeferredPromise();
622+
controller[kState].finishPromise = promise;
575623
const flushPromise =
576624
ensureIsPromise(
577625
controller[kState].flushAlgorithm,
578626
controller,
579627
controller);
580628
transformStreamDefaultControllerClearAlgorithms(controller);
581-
return PromisePrototypeThen(
629+
PromisePrototypeThen(
582630
flushPromise,
583631
() => {
584632
if (readable[kState].state === 'errored')
585-
throw readable[kState].storedError;
586-
readableStreamDefaultControllerClose(readable[kState].controller);
633+
reject(readable[kState].storedError);
634+
else {
635+
readableStreamDefaultControllerClose(readable[kState].controller);
636+
resolve();
637+
}
587638
},
588639
(error) => {
589-
transformStreamError(stream, error);
590-
throw readable[kState].storedError;
640+
readableStreamDefaultControllerError(readable[kState].controller, error);
641+
reject(error);
591642
});
643+
return controller[kState].finishPromise;
592644
}
593645

594646
function transformStreamDefaultSourcePullAlgorithm(stream) {
@@ -598,6 +650,48 @@ function transformStreamDefaultSourcePullAlgorithm(stream) {
598650
return stream[kState].backpressureChange.promise;
599651
}
600652

653+
function transformStreamDefaultSourceCancelAlgorithm(stream, reason) {
654+
const {
655+
controller,
656+
writable,
657+
} = stream[kState];
658+
659+
if (controller[kState].finishPromise !== undefined) {
660+
return controller[kState].finishPromise;
661+
}
662+
663+
const { promise, resolve, reject } = createDeferredPromise();
664+
controller[kState].finishPromise = promise;
665+
const cancelPromise = ensureIsPromise(
666+
controller[kState].cancelAlgorithm,
667+
controller,
668+
reason);
669+
transformStreamDefaultControllerClearAlgorithms(controller);
670+
671+
PromisePrototypeThen(cancelPromise,
672+
() => {
673+
if (writable[kState].state === 'errored')
674+
reject(writable[kState].storedError);
675+
else {
676+
writableStreamDefaultControllerErrorIfNeeded(
677+
writable[kState].controller,
678+
reason);
679+
transformStreamUnblockWrite(stream);
680+
resolve();
681+
}
682+
},
683+
(error) => {
684+
writableStreamDefaultControllerErrorIfNeeded(
685+
writable[kState].controller,
686+
error);
687+
transformStreamUnblockWrite(stream);
688+
reject(error);
689+
},
690+
);
691+
692+
return controller[kState].finishPromise;
693+
}
694+
601695
module.exports = {
602696
TransformStream,
603697
TransformStreamDefaultController,

lib/internal/webstreams/writablestream.js

+2-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ const {
77
FunctionPrototypeCall,
88
ObjectDefineProperties,
99
ObjectSetPrototypeOf,
10+
Promise,
1011
PromisePrototypeThen,
1112
PromiseResolve,
1213
PromiseReject,
@@ -1295,7 +1296,7 @@ function setupWritableStreamDefaultController(
12951296
const startResult = startAlgorithm();
12961297

12971298
PromisePrototypeThen(
1298-
PromiseResolve(startResult),
1299+
new Promise((r) => r(startResult)),
12991300
() => {
13001301
assert(stream[kState].state === 'writable' ||
13011302
stream[kState].state === 'erroring');

test/fixtures/wpt/README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ Last update:
2727
- performance-timeline: https://github.com/web-platform-tests/wpt/tree/17ebc3aea0/performance-timeline
2828
- resource-timing: https://github.com/web-platform-tests/wpt/tree/22d38586d0/resource-timing
2929
- resources: https://github.com/web-platform-tests/wpt/tree/1e140d63ec/resources
30-
- streams: https://github.com/web-platform-tests/wpt/tree/517e945bbf/streams
30+
- streams: https://github.com/web-platform-tests/wpt/tree/a8872d92b1/streams
3131
- url: https://github.com/web-platform-tests/wpt/tree/c2d7e70b52/url
3232
- user-timing: https://github.com/web-platform-tests/wpt/tree/5ae85bf826/user-timing
3333
- wasm/jsapi: https://github.com/web-platform-tests/wpt/tree/cde25e7e3c/wasm/jsapi

test/fixtures/wpt/streams/piping/abort.any.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// META: global=window,worker
1+
// META: global=window,worker,shadowrealm
22
// META: script=../resources/recording-streams.js
33
// META: script=../resources/test-utils.js
44
'use strict';

test/fixtures/wpt/streams/piping/close-propagation-backward.any.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// META: global=window,worker
1+
// META: global=window,worker,shadowrealm
22
// META: script=../resources/recording-streams.js
33
'use strict';
44

test/fixtures/wpt/streams/piping/close-propagation-forward.any.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// META: global=window,worker
1+
// META: global=window,worker,shadowrealm
22
// META: script=../resources/test-utils.js
33
// META: script=../resources/recording-streams.js
44
'use strict';
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
<!DOCTYPE html>
2+
<script type="module">
3+
let a = new ReadableStream();
4+
let b = self.open()
5+
let f = new b.WritableStream();
6+
a.pipeThrough(
7+
{ "readable": a, "writable": f },
8+
{ "signal": AbortSignal.abort() }
9+
)
10+
await new Promise(setTimeout);
11+
structuredClone(undefined, { "transfer": [f] })
12+
</script>

test/fixtures/wpt/streams/piping/error-propagation-backward.any.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// META: global=window,worker
1+
// META: global=window,worker,shadowrealm
22
// META: script=../resources/test-utils.js
33
// META: script=../resources/recording-streams.js
44
'use strict';

test/fixtures/wpt/streams/piping/error-propagation-forward.any.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// META: global=window,worker
1+
// META: global=window,worker,shadowrealm
22
// META: script=../resources/test-utils.js
33
// META: script=../resources/recording-streams.js
44
'use strict';

test/fixtures/wpt/streams/piping/flow-control.any.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// META: global=window,worker
1+
// META: global=window,worker,shadowrealm
22
// META: script=../resources/test-utils.js
33
// META: script=../resources/rs-utils.js
44
// META: script=../resources/recording-streams.js

test/fixtures/wpt/streams/piping/general-addition.any.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// META: global=window,worker
1+
// META: global=window,worker,shadowrealm
22
'use strict';
33

44
promise_test(async t => {

test/fixtures/wpt/streams/piping/general.any.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// META: global=window,worker
1+
// META: global=window,worker,shadowrealm
22
// META: script=../resources/test-utils.js
33
// META: script=../resources/recording-streams.js
44
'use strict';

test/fixtures/wpt/streams/piping/multiple-propagation.any.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// META: global=window,worker
1+
// META: global=window,worker,shadowrealm
22
// META: script=../resources/test-utils.js
33
// META: script=../resources/recording-streams.js
44
'use strict';

test/fixtures/wpt/streams/piping/pipe-through.any.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// META: global=window,worker
1+
// META: global=window,worker,shadowrealm
22
// META: script=../resources/rs-utils.js
33
// META: script=../resources/test-utils.js
44
// META: script=../resources/recording-streams.js

test/fixtures/wpt/streams/piping/then-interception.any.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// META: global=window,worker
1+
// META: global=window,worker,shadowrealm
22
// META: script=../resources/test-utils.js
33
// META: script=../resources/recording-streams.js
44
'use strict';

test/fixtures/wpt/streams/piping/throwing-options.any.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// META: global=window,worker
1+
// META: global=window,worker,shadowrealm
22
'use strict';
33

44
class ThrowingOptions {

test/fixtures/wpt/streams/piping/transform-streams.any.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// META: global=window,worker
1+
// META: global=window,worker,shadowrealm
22
'use strict';
33

44
promise_test(() => {

test/fixtures/wpt/streams/queuing-strategies.any.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// META: global=window,worker
1+
// META: global=window,worker,shadowrealm
22
'use strict';
33

44
const highWaterMarkConversions = new Map([

test/fixtures/wpt/streams/readable-byte-streams/bad-buffers-and-views.any.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// META: global=window,worker
1+
// META: global=window,worker,shadowrealm
22
'use strict';
33

44
promise_test(() => {

test/fixtures/wpt/streams/readable-byte-streams/construct-byob-request.any.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// META: global=window,worker
1+
// META: global=window,worker,shadowrealm
22
// META: script=../resources/rs-utils.js
33
'use strict';
44

test/fixtures/wpt/streams/readable-byte-streams/enqueue-with-detached-buffer.any.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// META: global=window,worker
1+
// META: global=window,worker,shadowrealm
22

33
promise_test(async t => {
44
const error = new Error('cannot proceed');

test/fixtures/wpt/streams/readable-byte-streams/general.any.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// META: global=window,worker
1+
// META: global=window,worker,shadowrealm
22
// META: script=../resources/rs-utils.js
33
// META: script=../resources/test-utils.js
44
'use strict';

test/fixtures/wpt/streams/readable-byte-streams/non-transferable-buffers.any.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// META: global=window,worker
1+
// META: global=window,worker,shadowrealm
22
'use strict';
33

44
promise_test(async t => {

test/fixtures/wpt/streams/readable-byte-streams/respond-after-enqueue.any.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// META: global=window,worker
1+
// META: global=window,worker,shadowrealm
22

33
'use strict';
44

0 commit comments

Comments
 (0)