1
1
'use strict' ;
2
2
3
- const { AbortController } = require ( 'internal/abort_controller' ) ;
3
+ const { AbortController, AbortSignal } = require ( 'internal/abort_controller' ) ;
4
4
5
5
const {
6
6
codes : {
@@ -16,7 +16,7 @@ const {
16
16
validateInteger,
17
17
validateObject,
18
18
} = require ( 'internal/validators' ) ;
19
- const { kWeakHandler } = require ( 'internal/event_target' ) ;
19
+ const { kWeakHandler, kResistStopPropagation } = require ( 'internal/event_target' ) ;
20
20
const { finished } = require ( 'internal/streams/end-of-stream' ) ;
21
21
const staticCompose = require ( 'internal/streams/compose' ) ;
22
22
const {
@@ -27,6 +27,7 @@ const { deprecate } = require('internal/util');
27
27
28
28
const {
29
29
ArrayPrototypePush,
30
+ Boolean,
30
31
MathFloor,
31
32
Number,
32
33
NumberIsNaN,
@@ -84,19 +85,11 @@ function map(fn, options) {
84
85
validateInteger ( concurrency , 'concurrency' , 1 ) ;
85
86
86
87
return async function * map ( ) {
87
- const ac = new AbortController ( ) ;
88
+ const signal = AbortSignal . any ( [ options ?. signal ] . filter ( Boolean ) ) ;
88
89
const stream = this ;
89
90
const queue = [ ] ;
90
- const signal = ac . signal ;
91
91
const signalOpt = { signal } ;
92
92
93
- const abort = ( ) => ac . abort ( ) ;
94
- if ( options ?. signal ?. aborted ) {
95
- abort ( ) ;
96
- }
97
-
98
- options ?. signal ?. addEventListener ( 'abort' , abort ) ;
99
-
100
93
let next ;
101
94
let resume ;
102
95
let done = false ;
@@ -153,7 +146,6 @@ function map(fn, options) {
153
146
next ( ) ;
154
147
next = null ;
155
148
}
156
- options ?. signal ?. removeEventListener ( 'abort' , abort ) ;
157
149
}
158
150
}
159
151
@@ -188,8 +180,6 @@ function map(fn, options) {
188
180
} ) ;
189
181
}
190
182
} finally {
191
- ac . abort ( ) ;
192
-
193
183
done = true ;
194
184
if ( resume ) {
195
185
resume ( ) ;
@@ -301,7 +291,7 @@ async function reduce(reducer, initialValue, options) {
301
291
const ac = new AbortController ( ) ;
302
292
const signal = ac . signal ;
303
293
if ( options ?. signal ) {
304
- const opts = { once : true , [ kWeakHandler ] : this } ;
294
+ const opts = { once : true , [ kWeakHandler ] : this , [ kResistStopPropagation ] : true } ;
305
295
options . signal . addEventListener ( 'abort' , ( ) => ac . abort ( ) , opts ) ;
306
296
}
307
297
let gotAnyItemFromStream = false ;
0 commit comments