Skip to content

Commit c44413d

Browse files
atlowChemiruyadorno
authored andcommitted
stream: use addAbortListener
PR-URL: #48550 Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
1 parent b5c7634 commit c44413d

File tree

5 files changed

+39
-26
lines changed

5 files changed

+39
-26
lines changed

lib/internal/streams/add-abort-signal.js

+8-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
'use strict';
22

3+
const {
4+
SymbolDispose,
5+
} = primordials;
6+
37
const {
48
AbortError,
59
codes,
@@ -13,6 +17,7 @@ const {
1317

1418
const eos = require('internal/streams/end-of-stream');
1519
const { ERR_INVALID_ARG_TYPE } = codes;
20+
let addAbortListener;
1621

1722
// This method is inlined here for readable-stream
1823
// It also does not allow for signal to not exist on the stream
@@ -46,8 +51,9 @@ module.exports.addAbortSignalNoValidate = function(signal, stream) {
4651
if (signal.aborted) {
4752
onAbort();
4853
} else {
49-
signal.addEventListener('abort', onAbort);
50-
eos(stream, () => signal.removeEventListener('abort', onAbort));
54+
addAbortListener ??= require('events').addAbortListener;
55+
const disposable = addAbortListener(signal, onAbort);
56+
eos(stream, disposable[SymbolDispose]);
5157
}
5258
return stream;
5359
};

lib/internal/streams/end-of-stream.js

+12-5
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,11 @@ const {
2222
validateBoolean,
2323
} = require('internal/validators');
2424

25-
const { Promise, PromisePrototypeThen } = primordials;
25+
const {
26+
Promise,
27+
PromisePrototypeThen,
28+
SymbolDispose,
29+
} = primordials;
2630

2731
const {
2832
isClosed,
@@ -40,6 +44,7 @@ const {
4044
willEmitClose: _willEmitClose,
4145
kIsClosedPromise,
4246
} = require('internal/streams/utils');
47+
let addAbortListener;
4348

4449
function isRequest(stream) {
4550
return stream.setHeader && typeof stream.abort === 'function';
@@ -249,12 +254,13 @@ function eos(stream, options, callback) {
249254
if (options.signal.aborted) {
250255
process.nextTick(abort);
251256
} else {
257+
addAbortListener ??= require('events').addAbortListener;
258+
const disposable = addAbortListener(options.signal, abort);
252259
const originalCallback = callback;
253260
callback = once((...args) => {
254-
options.signal.removeEventListener('abort', abort);
261+
disposable[SymbolDispose]();
255262
originalCallback.apply(stream, args);
256263
});
257-
options.signal.addEventListener('abort', abort);
258264
}
259265
}
260266

@@ -272,12 +278,13 @@ function eosWeb(stream, options, callback) {
272278
if (options.signal.aborted) {
273279
process.nextTick(abort);
274280
} else {
281+
addAbortListener ??= require('events').addAbortListener;
282+
const disposable = addAbortListener(options.signal, abort);
275283
const originalCallback = callback;
276284
callback = once((...args) => {
277-
options.signal.removeEventListener('abort', abort);
285+
disposable[SymbolDispose]();
278286
originalCallback.apply(stream, args);
279287
});
280-
options.signal.addEventListener('abort', abort);
281288
}
282289
}
283290
const resolverFn = (...args) => {

lib/internal/streams/operators.js

+5-15
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
'use strict';
22

3-
const { AbortController } = require('internal/abort_controller');
3+
const { AbortController, AbortSignal } = require('internal/abort_controller');
44

55
const {
66
codes: {
@@ -16,7 +16,7 @@ const {
1616
validateInteger,
1717
validateObject,
1818
} = require('internal/validators');
19-
const { kWeakHandler } = require('internal/event_target');
19+
const { kWeakHandler, kResistStopPropagation } = require('internal/event_target');
2020
const { finished } = require('internal/streams/end-of-stream');
2121
const staticCompose = require('internal/streams/compose');
2222
const {
@@ -27,6 +27,7 @@ const { deprecate } = require('internal/util');
2727

2828
const {
2929
ArrayPrototypePush,
30+
Boolean,
3031
MathFloor,
3132
Number,
3233
NumberIsNaN,
@@ -84,19 +85,11 @@ function map(fn, options) {
8485
validateInteger(concurrency, 'concurrency', 1);
8586

8687
return async function* map() {
87-
const ac = new AbortController();
88+
const signal = AbortSignal.any([options?.signal].filter(Boolean));
8889
const stream = this;
8990
const queue = [];
90-
const signal = ac.signal;
9191
const signalOpt = { signal };
9292

93-
const abort = () => ac.abort();
94-
if (options?.signal?.aborted) {
95-
abort();
96-
}
97-
98-
options?.signal?.addEventListener('abort', abort);
99-
10093
let next;
10194
let resume;
10295
let done = false;
@@ -153,7 +146,6 @@ function map(fn, options) {
153146
next();
154147
next = null;
155148
}
156-
options?.signal?.removeEventListener('abort', abort);
157149
}
158150
}
159151

@@ -188,8 +180,6 @@ function map(fn, options) {
188180
});
189181
}
190182
} finally {
191-
ac.abort();
192-
193183
done = true;
194184
if (resume) {
195185
resume();
@@ -301,7 +291,7 @@ async function reduce(reducer, initialValue, options) {
301291
const ac = new AbortController();
302292
const signal = ac.signal;
303293
if (options?.signal) {
304-
const opts = { once: true, [kWeakHandler]: this };
294+
const opts = { once: true, [kWeakHandler]: this, [kResistStopPropagation]: true };
305295
options.signal.addEventListener('abort', () => ac.abort(), opts);
306296
}
307297
let gotAnyItemFromStream = false;

lib/internal/streams/pipeline.js

+8-2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ const {
77
ArrayIsArray,
88
Promise,
99
SymbolAsyncIterator,
10+
SymbolDispose,
1011
} = primordials;
1112

1213
const eos = require('internal/streams/end-of-stream');
@@ -44,6 +45,7 @@ const { AbortController } = require('internal/abort_controller');
4445

4546
let PassThrough;
4647
let Readable;
48+
let addAbortListener;
4749

4850
function destroyer(stream, reading, writing) {
4951
let finished = false;
@@ -206,7 +208,11 @@ function pipelineImpl(streams, callback, opts) {
206208
finishImpl(new AbortError());
207209
}
208210

209-
outerSignal?.addEventListener('abort', abort);
211+
addAbortListener ??= require('events').addAbortListener;
212+
let disposable;
213+
if (outerSignal) {
214+
disposable = addAbortListener(outerSignal, abort);
215+
}
210216

211217
let error;
212218
let value;
@@ -231,7 +237,7 @@ function pipelineImpl(streams, callback, opts) {
231237
destroys.shift()(error);
232238
}
233239

234-
outerSignal?.removeEventListener('abort', abort);
240+
disposable?.[SymbolDispose]();
235241
ac.abort();
236242

237243
if (final) {

lib/internal/webstreams/readablestream.js

+6-2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ const {
2222
SafePromiseAll,
2323
Symbol,
2424
SymbolAsyncIterator,
25+
SymbolDispose,
2526
SymbolToStringTag,
2627
Uint8Array,
2728
} = primordials;
@@ -140,6 +141,7 @@ const kRelease = Symbol('kRelease');
140141

141142
let releasedError;
142143
let releasingError;
144+
let addAbortListener;
143145

144146
const userModuleRegExp = /^ {4}at (?:[^/\\(]+ \()(?!node:(.+):\d+:\d+\)$).*/gm;
145147

@@ -1259,6 +1261,7 @@ function readableStreamPipeTo(
12591261

12601262
let reader;
12611263
let writer;
1264+
let disposable;
12621265
// Both of these can throw synchronously. We want to capture
12631266
// the error and return a rejected promise instead.
12641267
try {
@@ -1291,7 +1294,7 @@ function readableStreamPipeTo(
12911294
writableStreamDefaultWriterRelease(writer);
12921295
readableStreamReaderGenericRelease(reader);
12931296
if (signal !== undefined)
1294-
signal.removeEventListener('abort', abortAlgorithm);
1297+
disposable?.[SymbolDispose]();
12951298
if (rejected)
12961299
promise.reject(error);
12971300
else
@@ -1418,7 +1421,8 @@ function readableStreamPipeTo(
14181421
abortAlgorithm();
14191422
return promise.promise;
14201423
}
1421-
signal.addEventListener('abort', abortAlgorithm, { once: true });
1424+
addAbortListener ??= require('events').addAbortListener;
1425+
disposable = addAbortListener(signal, abortAlgorithm);
14221426
}
14231427

14241428
setPromiseHandled(run());

0 commit comments

Comments
 (0)