From 33a6cd9965ffd645f084a15c61071a91fa4e6451 Mon Sep 17 00:00:00 2001 From: atlowChemi Date: Thu, 6 Jul 2023 01:21:34 +0300 Subject: [PATCH 1/8] http2: use addAbortListener --- lib/internal/http2/core.js | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index fe104f52a38ed8..f5527a0dd11949 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -27,6 +27,7 @@ const { StringPrototypeSlice, Symbol, SymbolAsyncDispose, + SymbolDispose, TypedArrayPrototypeGetLength, Uint32Array, Uint8Array, @@ -1811,10 +1812,8 @@ class ClientHttp2Session extends Http2Session { if (signal.aborted) { aborter(); } else { - signal.addEventListener('abort', aborter); - stream.once('close', () => { - signal.removeEventListener('abort', aborter); - }); + const disposable = EventEmitter.addAbortListener(signal, aborter); + stream.once('close', disposable[SymbolDispose]); } } From 2c2284309602ee32bf4ca6c8a49f343b0d9df26f Mon Sep 17 00:00:00 2001 From: atlowChemi Date: Thu, 6 Jul 2023 01:30:47 +0300 Subject: [PATCH 2/8] readline: use addAbortListener --- lib/internal/readline/interface.js | 5 +++-- lib/readline.js | 17 +++++++++-------- lib/readline/promises.js | 8 ++++++-- 3 files changed, 18 insertions(+), 12 deletions(-) diff --git a/lib/internal/readline/interface.js b/lib/internal/readline/interface.js index df08875cc79ae6..f7f06674ef7c41 100644 --- a/lib/internal/readline/interface.js +++ b/lib/internal/readline/interface.js @@ -28,6 +28,7 @@ const { StringPrototypeStartsWith, StringPrototypeTrim, Symbol, + SymbolDispose, SymbolAsyncIterator, SafeStringIterator, } = primordials; @@ -325,8 +326,8 @@ function InterfaceConstructor(input, output, completer, terminal) { if (signal.aborted) { process.nextTick(onAborted); } else { - signal.addEventListener('abort', onAborted, { once: true }); - self.once('close', () => signal.removeEventListener('abort', onAborted)); + const disposable = EventEmitter.addAbortListener(signal, onAborted); + self.once('close', disposable[SymbolDispose]); } } diff --git a/lib/readline.js b/lib/readline.js index b9c6f17c52b4b0..5276d9401b4c12 100644 --- a/lib/readline.js +++ b/lib/readline.js @@ -30,6 +30,7 @@ const { Promise, PromiseReject, StringPrototypeSlice, + SymbolDispose, } = primordials; const { @@ -95,6 +96,7 @@ const { kWordRight, kWriteToOutput, } = require('internal/readline/interface'); +let addAbortListener; function Interface(input, output, completer, terminal) { if (!(this instanceof Interface)) { @@ -143,15 +145,13 @@ Interface.prototype.question = function question(query, options, cb) { const onAbort = () => { this[kQuestionCancel](); }; - options.signal.addEventListener('abort', onAbort, { once: true }); - const cleanup = () => { - options.signal.removeEventListener('abort', onAbort); - }; + addAbortListener ??= require('events').addAbortListener; + const disposable = addAbortListener(options.signal, onAbort); const originalCb = cb; cb = typeof cb === 'function' ? (answer) => { - cleanup(); + disposable[SymbolDispose](); return originalCb(answer); - } : cleanup; + } : disposable[SymbolDispose]; } if (typeof cb === 'function') { @@ -175,9 +175,10 @@ Interface.prototype.question[promisify.custom] = function question(query, option const onAbort = () => { reject(new AbortError(undefined, { cause: options.signal.reason })); }; - options.signal.addEventListener('abort', onAbort, { once: true }); + addAbortListener ??= require('events').addAbortListener; + const disposable = addAbortListener(options.signal, onAbort); cb = (answer) => { - options.signal.removeEventListener('abort', onAbort); + disposable[SymbolDispose](); resolve(answer); }; } diff --git a/lib/readline/promises.js b/lib/readline/promises.js index 9bfa2aaecd6b44..4c2ce90479ef8f 100644 --- a/lib/readline/promises.js +++ b/lib/readline/promises.js @@ -2,6 +2,7 @@ const { Promise, + SymbolDispose, } = primordials; const { @@ -22,6 +23,7 @@ const { validateAbortSignal } = require('internal/validators'); const { kEmptyObject, } = require('internal/util'); +let addAbortListener; class Interface extends _Interface { // eslint-disable-next-line no-useless-constructor @@ -43,9 +45,11 @@ class Interface extends _Interface { this[kQuestionCancel](); reject(new AbortError(undefined, { cause: options.signal.reason })); }; - options.signal.addEventListener('abort', onAbort, { once: true }); + addAbortListener ??= require('events').addAbortListener; + const disposable = addAbortListener(options.signal, onAbort); + cb = (answer) => { - options.signal.removeEventListener('abort', onAbort); + disposable[SymbolDispose](); resolve(answer); }; } From 2eebe7f2303714a575f6d62de77600943c33cac8 Mon Sep 17 00:00:00 2001 From: atlowChemi Date: Thu, 6 Jul 2023 01:58:28 +0300 Subject: [PATCH 3/8] streams: use addAbortListener --- lib/internal/streams/add-abort-signal.js | 10 ++++++++-- lib/internal/streams/end-of-stream.js | 17 ++++++++++++----- lib/internal/streams/operators.js | 21 ++++++--------------- lib/internal/streams/pipeline.js | 7 +++++-- lib/internal/webstreams/readablestream.js | 8 ++++++-- 5 files changed, 37 insertions(+), 26 deletions(-) diff --git a/lib/internal/streams/add-abort-signal.js b/lib/internal/streams/add-abort-signal.js index d6c8ca4c9c7842..819be3ff63e915 100644 --- a/lib/internal/streams/add-abort-signal.js +++ b/lib/internal/streams/add-abort-signal.js @@ -1,5 +1,9 @@ 'use strict'; +const { + SymbolDispose, +} = primordials; + const { AbortError, codes, @@ -13,6 +17,7 @@ const { const eos = require('internal/streams/end-of-stream'); const { ERR_INVALID_ARG_TYPE } = codes; +let addAbortListener; // This method is inlined here for readable-stream // It also does not allow for signal to not exist on the stream @@ -46,8 +51,9 @@ module.exports.addAbortSignalNoValidate = function(signal, stream) { if (signal.aborted) { onAbort(); } else { - signal.addEventListener('abort', onAbort); - eos(stream, () => signal.removeEventListener('abort', onAbort)); + addAbortListener ??= require('events').addAbortListener; + const disposable = addAbortListener(signal, onAbort); + eos(stream, disposable[SymbolDispose]); } return stream; }; diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index df8fdeb50110d7..663222e3149bad 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -22,7 +22,11 @@ const { validateBoolean, } = require('internal/validators'); -const { Promise, PromisePrototypeThen } = primordials; +const { + Promise, + PromisePrototypeThen, + SymbolDispose, +} = primordials; const { isClosed, @@ -40,6 +44,7 @@ const { willEmitClose: _willEmitClose, kIsClosedPromise, } = require('internal/streams/utils'); +let addAbortListener; function isRequest(stream) { return stream.setHeader && typeof stream.abort === 'function'; @@ -249,12 +254,13 @@ function eos(stream, options, callback) { if (options.signal.aborted) { process.nextTick(abort); } else { + addAbortListener ??= require('events').addAbortListener; + const disposable = addAbortListener(options.signal, abort); const originalCallback = callback; callback = once((...args) => { - options.signal.removeEventListener('abort', abort); + disposable[SymbolDispose](); originalCallback.apply(stream, args); }); - options.signal.addEventListener('abort', abort); } } @@ -272,12 +278,13 @@ function eosWeb(stream, options, callback) { if (options.signal.aborted) { process.nextTick(abort); } else { + addAbortListener ??= require('events').addAbortListener; + const disposable = addAbortListener(options.signal, abort); const originalCallback = callback; callback = once((...args) => { - options.signal.removeEventListener('abort', abort); + disposable[SymbolDispose](); originalCallback.apply(stream, args); }); - options.signal.addEventListener('abort', abort); } } const resolverFn = (...args) => { diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js index 13cbce1005ece1..b4914da0201478 100644 --- a/lib/internal/streams/operators.js +++ b/lib/internal/streams/operators.js @@ -1,6 +1,6 @@ 'use strict'; -const { AbortController } = require('internal/abort_controller'); +const { AbortController, AbortSignal } = require('internal/abort_controller'); const { codes: { @@ -16,7 +16,7 @@ const { validateInteger, validateObject, } = require('internal/validators'); -const { kWeakHandler } = require('internal/event_target'); +const { kWeakHandler, kResistStopPropagation } = require('internal/event_target'); const { finished } = require('internal/streams/end-of-stream'); const staticCompose = require('internal/streams/compose'); const { @@ -26,6 +26,7 @@ const { isWritable, isNodeStream } = require('internal/streams/utils'); const { ArrayPrototypePush, + Boolean, MathFloor, Number, NumberIsNaN, @@ -37,6 +38,7 @@ const { const kEmpty = Symbol('kEmpty'); const kEof = Symbol('kEof'); +let addAbortListener; function compose(stream, options) { if (options != null) { @@ -83,19 +85,11 @@ function map(fn, options) { validateInteger(concurrency, 'concurrency', 1); return async function* map() { - const ac = new AbortController(); + const signal = AbortSignal.any([options?.signal].filter(Boolean)); const stream = this; const queue = []; - const signal = ac.signal; const signalOpt = { signal }; - const abort = () => ac.abort(); - if (options?.signal?.aborted) { - abort(); - } - - options?.signal?.addEventListener('abort', abort); - let next; let resume; let done = false; @@ -152,7 +146,6 @@ function map(fn, options) { next(); next = null; } - options?.signal?.removeEventListener('abort', abort); } } @@ -187,8 +180,6 @@ function map(fn, options) { }); } } finally { - ac.abort(); - done = true; if (resume) { resume(); @@ -281,7 +272,7 @@ async function reduce(reducer, initialValue, options) { const ac = new AbortController(); const signal = ac.signal; if (options?.signal) { - const opts = { once: true, [kWeakHandler]: this }; + const opts = { once: true, [kWeakHandler]: this, [kResistStopPropagation]: true }; options.signal.addEventListener('abort', () => ac.abort(), opts); } let gotAnyItemFromStream = false; diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index fb2cd90a2678ea..51be4c49d57fa0 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -7,6 +7,7 @@ const { ArrayIsArray, Promise, SymbolAsyncIterator, + SymbolDispose, } = primordials; const eos = require('internal/streams/end-of-stream'); @@ -44,6 +45,7 @@ const { AbortController } = require('internal/abort_controller'); let PassThrough; let Readable; +let addAbortListener; function destroyer(stream, reading, writing) { let finished = false; @@ -206,7 +208,8 @@ function pipelineImpl(streams, callback, opts) { finishImpl(new AbortError()); } - outerSignal?.addEventListener('abort', abort); + addAbortListener ??= require('events').addAbortListener; + const disposable = addAbortListener(outerSignal, abort); let error; let value; @@ -231,7 +234,7 @@ function pipelineImpl(streams, callback, opts) { destroys.shift()(error); } - outerSignal?.removeEventListener('abort', abort); + disposable[SymbolDispose](); ac.abort(); if (final) { diff --git a/lib/internal/webstreams/readablestream.js b/lib/internal/webstreams/readablestream.js index 1f96a709959301..9af63227e0496f 100644 --- a/lib/internal/webstreams/readablestream.js +++ b/lib/internal/webstreams/readablestream.js @@ -22,6 +22,7 @@ const { SafePromiseAll, Symbol, SymbolAsyncIterator, + SymbolDispose, SymbolToStringTag, Uint8Array, } = primordials; @@ -140,6 +141,7 @@ const kRelease = Symbol('kRelease'); let releasedError; let releasingError; +let addAbortListener; const userModuleRegExp = /^ {4}at (?:[^/\\(]+ \()(?!node:(.+):\d+:\d+\)$).*/gm; @@ -1259,6 +1261,7 @@ function readableStreamPipeTo( let reader; let writer; + let disposable; // Both of these can throw synchronously. We want to capture // the error and return a rejected promise instead. try { @@ -1291,7 +1294,7 @@ function readableStreamPipeTo( writableStreamDefaultWriterRelease(writer); readableStreamReaderGenericRelease(reader); if (signal !== undefined) - signal.removeEventListener('abort', abortAlgorithm); + disposable?.[SymbolDispose](); if (rejected) promise.reject(error); else @@ -1418,7 +1421,8 @@ function readableStreamPipeTo( abortAlgorithm(); return promise.promise; } - signal.addEventListener('abort', abortAlgorithm, { once: true }); + addAbortListener ??= require('events').addAbortListener; + disposable = addAbortListener(signal, abortAlgorithm); } setPromiseHandled(run()); From 51bebe6327b414a9f10a29b60ef4137da0fca679 Mon Sep 17 00:00:00 2001 From: atlowChemi Date: Thu, 6 Jul 2023 02:00:57 +0300 Subject: [PATCH 4/8] dgram: use addAbortListener --- lib/dgram.js | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/lib/dgram.js b/lib/dgram.js index b28d727c8a83ce..57975de9183f00 100644 --- a/lib/dgram.js +++ b/lib/dgram.js @@ -30,6 +30,7 @@ const { ObjectDefineProperty, ObjectSetPrototypeOf, ReflectApply, + SymbolDispose, } = primordials; const errors = require('internal/errors'); @@ -143,8 +144,8 @@ function Socket(type, listener) { if (signal.aborted) { onAborted(); } else { - signal.addEventListener('abort', onAborted); - this.once('close', () => signal.removeEventListener('abort', onAborted)); + const disposable = EventEmitter.addAbortListener(signal, onAborted); + this.once('close', disposable[SymbolDispose]); } } if (udpSocketChannel.hasSubscribers) { From 63738e5b41d9b91b51d563cb83a661d1e64a531a Mon Sep 17 00:00:00 2001 From: atlowChemi Date: Thu, 6 Jul 2023 02:05:13 +0300 Subject: [PATCH 5/8] net: use addAbortListener --- lib/net.js | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/lib/net.js b/lib/net.js index c1fd13ca19d9b5..194da673db5a22 100644 --- a/lib/net.js +++ b/lib/net.js @@ -35,6 +35,7 @@ const { ObjectDefineProperty, ObjectSetPrototypeOf, Symbol, + SymbolDispose, } = primordials; const EventEmitter = require('events'); @@ -1605,9 +1606,10 @@ function afterConnect(status, handle, req, readable, writable) { function addClientAbortSignalOption(self, options) { validateAbortSignal(options.signal, 'options.signal'); const { signal } = options; + let disposable; function onAbort() { - signal.removeEventListener('abort', onAbort); + disposable?.[SymbolDispose](); self._aborted = true; } @@ -1615,7 +1617,7 @@ function addClientAbortSignalOption(self, options) { process.nextTick(onAbort); } else { process.nextTick(() => { - signal.addEventListener('abort', onAbort); + disposable = EventEmitter.addAbortListener(signal, onAbort); }); } } @@ -1695,8 +1697,8 @@ function addServerAbortSignalOption(self, options) { if (signal.aborted) { process.nextTick(onAborted); } else { - signal.addEventListener('abort', onAborted); - self.once('close', () => signal.removeEventListener('abort', onAborted)); + const disposable = EventEmitter.addAbortListener(signal, onAborted); + self.once('close', disposable[SymbolDispose]); } } From 9069b42f5f4e0161b102fa12abc1b12f7c21e21a Mon Sep 17 00:00:00 2001 From: atlowChemi Date: Thu, 6 Jul 2023 02:09:18 +0300 Subject: [PATCH 6/8] child_process: use addAbortListener --- lib/child_process.js | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/lib/child_process.js b/lib/child_process.js index 59c37b97672d39..c32756437833b6 100644 --- a/lib/child_process.js +++ b/lib/child_process.js @@ -42,6 +42,7 @@ const { StringPrototypeIncludes, StringPrototypeSlice, StringPrototypeToUpperCase, + SymbolDispose, } = primordials; const { @@ -95,6 +96,7 @@ const { const MAX_BUFFER = 1024 * 1024; const isZOS = process.platform === 'os390'; +let addAbortListener; /** * Spawns a new Node.js process + fork. @@ -781,9 +783,9 @@ function spawn(file, args, options) { if (signal.aborted) { process.nextTick(onAbortListener); } else { - signal.addEventListener('abort', onAbortListener, { once: true }); - child.once('exit', - () => signal.removeEventListener('abort', onAbortListener)); + addAbortListener ??= require('events').addAbortListener; + const disposable = addAbortListener(signal, onAbortListener); + child.once('exit', disposable[SymbolDispose]); } function onAbortListener() { From 46c779aafff684318a8f50007925068a447bc306 Mon Sep 17 00:00:00 2001 From: atlowChemi Date: Thu, 6 Jul 2023 02:12:37 +0300 Subject: [PATCH 7/8] lib: use addAbortListener --- lib/internal/abort_controller.js | 4 +++- lib/internal/watch_mode/files_watcher.js | 5 +++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/lib/internal/abort_controller.js b/lib/internal/abort_controller.js index e31738b98288ca..1d9d09aaff62c9 100644 --- a/lib/internal/abort_controller.js +++ b/lib/internal/abort_controller.js @@ -23,6 +23,7 @@ const { kTrustEvent, kNewListener, kRemoveListener, + kResistStopPropagation, kWeakHandler, } = require('internal/event_target'); const { @@ -435,7 +436,8 @@ async function aborted(signal, resource) { if (signal.aborted) return PromiseResolve(); const abortPromise = createDeferredPromise(); - signal.addEventListener('abort', abortPromise.resolve, { [kWeakHandler]: resource, once: true }); + const opts = { __proto__: null, [kWeakHandler]: resource, once: true, [kResistStopPropagation]: true }; + signal.addEventListener('abort', abortPromise.resolve, opts); return abortPromise.promise; } diff --git a/lib/internal/watch_mode/files_watcher.js b/lib/internal/watch_mode/files_watcher.js index 1fa4fc14cd4d4d..848c17f4115616 100644 --- a/lib/internal/watch_mode/files_watcher.js +++ b/lib/internal/watch_mode/files_watcher.js @@ -18,7 +18,6 @@ const { fileURLToPath } = require('url'); const { resolve, dirname } = require('path'); const { setTimeout } = require('timers'); - const supportsRecursiveWatching = process.platform === 'win32' || process.platform === 'darwin'; @@ -41,7 +40,9 @@ class FilesWatcher extends EventEmitter { this.#mode = mode; this.#signal = signal; - signal?.addEventListener('abort', () => this.clear(), { __proto__: null, once: true }); + if (signal) { + EventEmitter.addAbortListener(signal, () => this.clear()); + } } #isPathWatched(path) { From def52a3091343875d1276bee9280eb0df6496d5f Mon Sep 17 00:00:00 2001 From: atlowChemi Date: Thu, 6 Jul 2023 09:28:47 +0300 Subject: [PATCH 8/8] fixup! streams: use addAbortListener --- lib/internal/streams/operators.js | 1 - lib/internal/streams/pipeline.js | 7 +++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js index b4914da0201478..8f4797da5dd519 100644 --- a/lib/internal/streams/operators.js +++ b/lib/internal/streams/operators.js @@ -38,7 +38,6 @@ const { const kEmpty = Symbol('kEmpty'); const kEof = Symbol('kEof'); -let addAbortListener; function compose(stream, options) { if (options != null) { diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 51be4c49d57fa0..aac7f65f0404d8 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -209,7 +209,10 @@ function pipelineImpl(streams, callback, opts) { } addAbortListener ??= require('events').addAbortListener; - const disposable = addAbortListener(outerSignal, abort); + let disposable; + if (outerSignal) { + disposable = addAbortListener(outerSignal, abort); + } let error; let value; @@ -234,7 +237,7 @@ function pipelineImpl(streams, callback, opts) { destroys.shift()(error); } - disposable[SymbolDispose](); + disposable?.[SymbolDispose](); ac.abort(); if (final) {