5
5
FunctionPrototypeCall,
6
6
ObjectDefineProperties,
7
7
PromisePrototypeThen,
8
- PromiseResolve,
9
8
ReflectConstruct,
10
9
SymbolToStringTag,
11
10
Symbol,
@@ -47,6 +46,7 @@ const {
47
46
nonOpFlush,
48
47
kType,
49
48
kState,
49
+ nonOpCancel,
50
50
} = require ( 'internal/webstreams/util' ) ;
51
51
52
52
const {
@@ -377,8 +377,7 @@ function initializeTransformStream(
377
377
return transformStreamDefaultSourcePullAlgorithm ( stream ) ;
378
378
} ,
379
379
cancel ( reason ) {
380
- transformStreamErrorWritableAndUnblockWrite ( stream , reason ) ;
381
- return PromiseResolve ( ) ;
380
+ return transformStreamDefaultSourceCancelAlgorithm ( stream , reason ) ;
382
381
} ,
383
382
} , {
384
383
highWaterMark : readableHighWaterMark ,
@@ -420,6 +419,10 @@ function transformStreamErrorWritableAndUnblockWrite(stream, error) {
420
419
writableStreamDefaultControllerErrorIfNeeded (
421
420
writable [ kState ] . controller ,
422
421
error ) ;
422
+ transformStreamUnblockWrite ( stream ) ;
423
+ }
424
+
425
+ function transformStreamUnblockWrite ( stream ) {
423
426
if ( stream [ kState ] . backpressure )
424
427
transformStreamSetBackpressure ( stream , false ) ;
425
428
}
@@ -436,13 +439,15 @@ function setupTransformStreamDefaultController(
436
439
stream ,
437
440
controller ,
438
441
transformAlgorithm ,
439
- flushAlgorithm ) {
442
+ flushAlgorithm ,
443
+ cancelAlgorithm ) {
440
444
assert ( isTransformStream ( stream ) ) ;
441
445
assert ( stream [ kState ] . controller === undefined ) ;
442
446
controller [ kState ] = {
443
447
stream,
444
448
transformAlgorithm,
445
449
flushAlgorithm,
450
+ cancelAlgorithm,
446
451
} ;
447
452
stream [ kState ] . controller = controller ;
448
453
}
@@ -453,21 +458,26 @@ function setupTransformStreamDefaultControllerFromTransformer(
453
458
const controller = new TransformStreamDefaultController ( kSkipThrow ) ;
454
459
const transform = transformer ?. transform || defaultTransformAlgorithm ;
455
460
const flush = transformer ?. flush || nonOpFlush ;
461
+ const cancel = transformer ?. cancel || nonOpCancel ;
456
462
const transformAlgorithm =
457
463
FunctionPrototypeBind ( transform , transformer ) ;
458
464
const flushAlgorithm =
459
465
FunctionPrototypeBind ( flush , transformer ) ;
466
+ const cancelAlgorithm =
467
+ FunctionPrototypeBind ( cancel , transformer ) ;
460
468
461
469
setupTransformStreamDefaultController (
462
470
stream ,
463
471
controller ,
464
472
transformAlgorithm ,
465
- flushAlgorithm ) ;
473
+ flushAlgorithm ,
474
+ cancelAlgorithm ) ;
466
475
}
467
476
468
477
function transformStreamDefaultControllerClearAlgorithms ( controller ) {
469
478
controller [ kState ] . transformAlgorithm = undefined ;
470
479
controller [ kState ] . flushAlgorithm = undefined ;
480
+ controller [ kState ] . cancelAlgorithm = undefined ;
471
481
}
472
482
473
483
function transformStreamDefaultControllerEnqueue ( controller , chunk ) {
@@ -556,7 +566,40 @@ function transformStreamDefaultSinkWriteAlgorithm(stream, chunk) {
556
566
}
557
567
558
568
async function transformStreamDefaultSinkAbortAlgorithm ( stream , reason ) {
559
- transformStreamError ( stream , reason ) ;
569
+ const {
570
+ controller,
571
+ readable,
572
+ } = stream [ kState ] ;
573
+
574
+ if ( controller [ kState ] . finishPromise !== undefined ) {
575
+ return controller [ kState ] . finishPromise ;
576
+ }
577
+
578
+ const { promise, resolve, reject } = createDeferredPromise ( ) ;
579
+ controller [ kState ] . finishPromise = promise ;
580
+ const cancelPromise = ensureIsPromise (
581
+ controller [ kState ] . cancelAlgorithm ,
582
+ controller ,
583
+ reason ) ;
584
+ transformStreamDefaultControllerClearAlgorithms ( controller ) ;
585
+
586
+ PromisePrototypeThen (
587
+ cancelPromise ,
588
+ ( ) => {
589
+ if ( readable [ kState ] . state === 'errored' )
590
+ reject ( readable [ kState ] . storedError ) ;
591
+ else {
592
+ readableStreamDefaultControllerError ( readable [ kState ] . controller , reason ) ;
593
+ resolve ( ) ;
594
+ }
595
+ } ,
596
+ ( error ) => {
597
+ readableStreamDefaultControllerError ( readable [ kState ] . controller , error ) ;
598
+ reject ( error ) ;
599
+ } ,
600
+ ) ;
601
+
602
+ return controller [ kState ] . finishPromise ;
560
603
}
561
604
562
605
function transformStreamDefaultSinkCloseAlgorithm ( stream ) {
@@ -565,23 +608,32 @@ function transformStreamDefaultSinkCloseAlgorithm(stream) {
565
608
controller,
566
609
} = stream [ kState ] ;
567
610
611
+ if ( controller [ kState ] . finishPromise !== undefined ) {
612
+ return controller [ kState ] . finishPromise ;
613
+ }
614
+ const { promise, resolve, reject } = createDeferredPromise ( ) ;
615
+ controller [ kState ] . finishPromise = promise ;
568
616
const flushPromise =
569
617
ensureIsPromise (
570
618
controller [ kState ] . flushAlgorithm ,
571
619
controller ,
572
620
controller ) ;
573
621
transformStreamDefaultControllerClearAlgorithms ( controller ) ;
574
- return PromisePrototypeThen (
622
+ PromisePrototypeThen (
575
623
flushPromise ,
576
624
( ) => {
577
625
if ( readable [ kState ] . state === 'errored' )
578
- throw readable [ kState ] . storedError ;
579
- readableStreamDefaultControllerClose ( readable [ kState ] . controller ) ;
626
+ reject ( readable [ kState ] . storedError ) ;
627
+ else {
628
+ readableStreamDefaultControllerClose ( readable [ kState ] . controller ) ;
629
+ resolve ( ) ;
630
+ }
580
631
} ,
581
632
( error ) => {
582
- transformStreamError ( stream , error ) ;
583
- throw readable [ kState ] . storedError ;
633
+ readableStreamDefaultControllerError ( readable [ kState ] . controller , error ) ;
634
+ reject ( error ) ;
584
635
} ) ;
636
+ return controller [ kState ] . finishPromise ;
585
637
}
586
638
587
639
function transformStreamDefaultSourcePullAlgorithm ( stream ) {
@@ -591,6 +643,48 @@ function transformStreamDefaultSourcePullAlgorithm(stream) {
591
643
return stream [ kState ] . backpressureChange . promise ;
592
644
}
593
645
646
+ function transformStreamDefaultSourceCancelAlgorithm ( stream , reason ) {
647
+ const {
648
+ controller,
649
+ writable,
650
+ } = stream [ kState ] ;
651
+
652
+ if ( controller [ kState ] . finishPromise !== undefined ) {
653
+ return controller [ kState ] . finishPromise ;
654
+ }
655
+
656
+ const { promise, resolve, reject } = createDeferredPromise ( ) ;
657
+ controller [ kState ] . finishPromise = promise ;
658
+ const cancelPromise = ensureIsPromise (
659
+ controller [ kState ] . cancelAlgorithm ,
660
+ controller ,
661
+ reason ) ;
662
+ transformStreamDefaultControllerClearAlgorithms ( controller ) ;
663
+
664
+ PromisePrototypeThen ( cancelPromise ,
665
+ ( ) => {
666
+ if ( writable [ kState ] . state === 'errored' )
667
+ reject ( writable [ kState ] . storedError ) ;
668
+ else {
669
+ writableStreamDefaultControllerErrorIfNeeded (
670
+ writable [ kState ] . controller ,
671
+ reason ) ;
672
+ transformStreamUnblockWrite ( stream ) ;
673
+ resolve ( ) ;
674
+ }
675
+ } ,
676
+ ( error ) => {
677
+ writableStreamDefaultControllerErrorIfNeeded (
678
+ writable [ kState ] . controller ,
679
+ error ) ;
680
+ transformStreamUnblockWrite ( stream ) ;
681
+ reject ( error ) ;
682
+ } ,
683
+ ) ;
684
+
685
+ return controller [ kState ] . finishPromise ;
686
+ }
687
+
594
688
module . exports = {
595
689
TransformStream,
596
690
TransformStreamDefaultController,
0 commit comments