Skip to content

Commit 9ce9716

Browse files
committed
streams: implement TransformStream cleanup using
Fixes: nodejs#49971
1 parent 54bb691 commit 9ce9716

File tree

1 file changed

+105
-10
lines changed

1 file changed

+105
-10
lines changed

lib/internal/webstreams/transformstream.js

+105-10
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ const {
4747
nonOpFlush,
4848
kType,
4949
kState,
50+
nonOpCancel,
5051
} = require('internal/webstreams/util');
5152

5253
const {
@@ -376,8 +377,7 @@ function initializeTransformStream(
376377
return transformStreamDefaultSourcePullAlgorithm(stream);
377378
},
378379
cancel(reason) {
379-
transformStreamErrorWritableAndUnblockWrite(stream, reason);
380-
return PromiseResolve();
380+
return transformStreamDefaultSourceCancelAlgorithm(stream, reason);
381381
},
382382
}, {
383383
highWaterMark: readableHighWaterMark,
@@ -419,6 +419,10 @@ function transformStreamErrorWritableAndUnblockWrite(stream, error) {
419419
writableStreamDefaultControllerErrorIfNeeded(
420420
writable[kState].controller,
421421
error);
422+
transformStreamUnblockWrite(stream);
423+
}
424+
425+
function transformStreamUnblockWrite(stream) {
422426
if (stream[kState].backpressure)
423427
transformStreamSetBackpressure(stream, false);
424428
}
@@ -435,13 +439,15 @@ function setupTransformStreamDefaultController(
435439
stream,
436440
controller,
437441
transformAlgorithm,
438-
flushAlgorithm) {
442+
flushAlgorithm,
443+
cancelAlgorithm) {
439444
assert(isTransformStream(stream));
440445
assert(stream[kState].controller === undefined);
441446
controller[kState] = {
442447
stream,
443448
transformAlgorithm,
444449
flushAlgorithm,
450+
cancelAlgorithm,
445451
};
446452
stream[kState].controller = controller;
447453
}
@@ -452,21 +458,26 @@ function setupTransformStreamDefaultControllerFromTransformer(
452458
const controller = new TransformStreamDefaultController(kSkipThrow);
453459
const transform = transformer?.transform || defaultTransformAlgorithm;
454460
const flush = transformer?.flush || nonOpFlush;
461+
const cancel = transformer?.cancel || nonOpCancel;
455462
const transformAlgorithm =
456463
FunctionPrototypeBind(transform, transformer);
457464
const flushAlgorithm =
458465
FunctionPrototypeBind(flush, transformer);
466+
const cancelAlgorithm =
467+
FunctionPrototypeBind(cancel, transformer);
459468

460469
setupTransformStreamDefaultController(
461470
stream,
462471
controller,
463472
transformAlgorithm,
464-
flushAlgorithm);
473+
flushAlgorithm,
474+
cancelAlgorithm);
465475
}
466476

467477
function transformStreamDefaultControllerClearAlgorithms(controller) {
468478
controller[kState].transformAlgorithm = undefined;
469479
controller[kState].flushAlgorithm = undefined;
480+
controller[kState].cancelAlgorithm = undefined;
470481
}
471482

472483
function transformStreamDefaultControllerEnqueue(controller, chunk) {
@@ -555,7 +566,40 @@ function transformStreamDefaultSinkWriteAlgorithm(stream, chunk) {
555566
}
556567

557568
async function transformStreamDefaultSinkAbortAlgorithm(stream, reason) {
558-
transformStreamError(stream, reason);
569+
const {
570+
controller,
571+
readable,
572+
} = stream[kState];
573+
574+
if (controller[kState].finishPromise !== undefined) {
575+
return controller[kState].finishPromise
576+
}
577+
578+
const { promise, resolve, reject } = createDeferredPromise();
579+
controller[kState].finishPromise = promise;
580+
const cancelPromise = ensureIsPromise(
581+
controller[kState].cancelAlgorithm,
582+
controller,
583+
reason);
584+
transformStreamDefaultControllerClearAlgorithms(controller);
585+
586+
PromisePrototypeThen(
587+
cancelPromise,
588+
() => {
589+
if (readable[kState].state === 'errored')
590+
reject(readable[kState].storedError);
591+
else {
592+
readableStreamDefaultControllerError(readable[kState].controller, reason);
593+
resolve();
594+
}
595+
},
596+
(error) => {
597+
readableStreamDefaultControllerError(readable[kState].controller, error);
598+
reject(error);
599+
}
600+
);
601+
602+
return controller[kState].finishPromise;
559603
}
560604

561605
function transformStreamDefaultSinkCloseAlgorithm(stream) {
@@ -564,23 +608,32 @@ function transformStreamDefaultSinkCloseAlgorithm(stream) {
564608
controller,
565609
} = stream[kState];
566610

611+
if (controller[kState].finishPromise !== undefined) {
612+
return controller[kState].finishPromise
613+
}
614+
const { promise, resolve, reject } = createDeferredPromise();
615+
controller[kState].finishPromise = promise;
567616
const flushPromise =
568617
ensureIsPromise(
569618
controller[kState].flushAlgorithm,
570619
controller,
571620
controller);
572621
transformStreamDefaultControllerClearAlgorithms(controller);
573-
return PromisePrototypeThen(
622+
PromisePrototypeThen(
574623
flushPromise,
575624
() => {
576625
if (readable[kState].state === 'errored')
577-
throw readable[kState].storedError;
578-
readableStreamDefaultControllerClose(readable[kState].controller);
626+
reject(readable[kState].storedError);
627+
else {
628+
readableStreamDefaultControllerClose(readable[kState].controller);
629+
resolve();
630+
}
579631
},
580632
(error) => {
581-
transformStreamError(stream, error);
582-
throw readable[kState].storedError;
633+
readableStreamDefaultControllerError(readable[kState].controller, error);
634+
reject(error);
583635
});
636+
return controller[kState].finishPromise;
584637
}
585638

586639
function transformStreamDefaultSourcePullAlgorithm(stream) {
@@ -590,6 +643,48 @@ function transformStreamDefaultSourcePullAlgorithm(stream) {
590643
return stream[kState].backpressureChange.promise;
591644
}
592645

646+
function transformStreamDefaultSourceCancelAlgorithm(stream, reason) {
647+
const {
648+
controller,
649+
writable,
650+
} = stream[kState];
651+
652+
if (controller[kState].finishPromise !== undefined) {
653+
return controller[kState].finishPromise;
654+
}
655+
656+
const { promise, resolve, reject } = createDeferredPromise();
657+
controller[kState].finishPromise = promise;
658+
const cancelPromise = ensureIsPromise(
659+
controller[kState].cancelAlgorithm,
660+
controller,
661+
reason);
662+
transformStreamDefaultControllerClearAlgorithms(controller);
663+
664+
PromisePrototypeThen(cancelPromise,
665+
() => {
666+
if (writable[kState].state === 'errored')
667+
reject(writable[kState].storedError);
668+
else {
669+
writableStreamDefaultControllerErrorIfNeeded(
670+
writable[kState].controller,
671+
reason);
672+
transformStreamUnblockWrite(stream);
673+
resolve();
674+
}
675+
},
676+
(error) => {
677+
writableStreamDefaultControllerErrorIfNeeded(
678+
writable[kState].controller,
679+
error);
680+
transformStreamUnblockWrite(stream);
681+
reject(error);
682+
}
683+
);
684+
685+
return controller[kState].finishPromise
686+
}
687+
593688
module.exports = {
594689
TransformStream,
595690
TransformStreamDefaultController,

0 commit comments

Comments
 (0)