diff --git a/doc/api/events.md b/doc/api/events.md index 7c756e6df9eab8..387b9b15df1c39 100644 --- a/doc/api/events.md +++ b/doc/api/events.md @@ -1000,7 +1000,7 @@ Value: `Symbol.for('nodejs.rejection')` See how to write a custom [rejection handler][rejection]. -## `events.on(emitter, eventName)` +## `events.on(emitter, eventName[, options])` <!-- YAML added: - v13.6.0 @@ -1009,6 +1009,9 @@ added: * `emitter` {EventEmitter} * `eventName` {string|symbol} The name of the event being listened for +* `options` {Object} + * `signal` {AbortSignal} An {AbortSignal} that can be used to cancel awaiting + events. * Returns: {AsyncIterator} that iterates `eventName` events emitted by the `emitter` ```js @@ -1038,6 +1041,33 @@ if the `EventEmitter` emits `'error'`. It removes all listeners when exiting the loop. The `value` returned by each iteration is an array composed of the emitted event arguments. +An {AbortSignal} may be used to cancel waiting on events: + +```js +const { on, EventEmitter } = require('events'); +const ac = new AbortController(); + +(async () => { + const ee = new EventEmitter(); + + // Emit later on + process.nextTick(() => { + ee.emit('foo', 'bar'); + ee.emit('foo', 42); + }); + + for await (const event of on(ee, 'foo', { signal: ac.signal })) { + // The execution of this inner block is synchronous and it + // processes one event at a time (even with await). Do not use + // if concurrent execution is required. + console.log(event); // prints ['bar'] [42] + } + // Unreachable here +})(); + +process.nextTick(() => ac.abort()); +``` + ## `EventTarget` and `Event` API <!-- YAML added: v14.5.0 diff --git a/lib/events.js b/lib/events.js index 270588fcbc9e7a..902b27544ba23a 100644 --- a/lib/events.js +++ b/lib/events.js @@ -730,7 +730,13 @@ function eventTargetAgnosticAddListener(emitter, name, listener, flags) { } } -function on(emitter, event) { +function on(emitter, event, options) { + const { signal } = { ...options }; + validateAbortSignal(signal, 'options.signal'); + if (signal && signal.aborted) { + throw lazyDOMException('The operation was aborted', 'AbortError'); + } + const unconsumedEvents = []; const unconsumedPromises = []; let error = null; @@ -768,6 +774,15 @@ function on(emitter, event) { return() { eventTargetAgnosticRemoveListener(emitter, event, eventHandler); eventTargetAgnosticRemoveListener(emitter, 'error', errorHandler); + + if (signal) { + eventTargetAgnosticRemoveListener( + signal, + 'abort', + abortListener, + { once: true }); + } + finished = true; for (const promise of unconsumedPromises) { @@ -797,9 +812,20 @@ function on(emitter, event) { addErrorHandlerIfEventEmitter(emitter, errorHandler); } + if (signal) { + eventTargetAgnosticAddListener( + signal, + 'abort', + abortListener, + { once: true }); + } return iterator; + function abortListener() { + errorHandler(lazyDOMException('The operation was aborted', 'AbortError')); + } + function eventHandler(...args) { const promise = unconsumedPromises.shift(); if (promise) { diff --git a/test/parallel/test-event-on-async-iterator.js b/test/parallel/test-event-on-async-iterator.js index 5c02360250538c..d9817ff15e0701 100644 --- a/test/parallel/test-event-on-async-iterator.js +++ b/test/parallel/test-event-on-async-iterator.js @@ -1,4 +1,4 @@ -// Flags: --expose-internals +// Flags: --expose-internals --no-warnings 'use strict'; const common = require('../common'); @@ -248,6 +248,117 @@ async function nodeEventTarget() { clearInterval(interval); } +async function abortableOnBefore() { + const ee = new EventEmitter(); + const ac = new AbortController(); + ac.abort(); + [1, {}, null, false, 'hi'].forEach((signal) => { + assert.throws(() => on(ee, 'foo', { signal }), { + code: 'ERR_INVALID_ARG_TYPE' + }); + }); + assert.throws(() => on(ee, 'foo', { signal: ac.signal }), { + name: 'AbortError' + }); +} + +async function eventTargetAbortableOnBefore() { + const et = new EventTarget(); + const ac = new AbortController(); + ac.abort(); + [1, {}, null, false, 'hi'].forEach((signal) => { + assert.throws(() => on(et, 'foo', { signal }), { + code: 'ERR_INVALID_ARG_TYPE' + }); + }); + assert.throws(() => on(et, 'foo', { signal: ac.signal }), { + name: 'AbortError' + }); +} + +async function abortableOnAfter() { + const ee = new EventEmitter(); + const ac = new AbortController(); + + const i = setInterval(() => ee.emit('foo', 'foo'), 10); + + async function foo() { + for await (const f of on(ee, 'foo', { signal: ac.signal })) { + assert.strictEqual(f, 'foo'); + } + } + + foo().catch(common.mustCall((error) => { + assert.strictEqual(error.name, 'AbortError'); + })).finally(() => { + clearInterval(i); + }); + + process.nextTick(() => ac.abort()); +} + +async function eventTargetAbortableOnAfter() { + const et = new EventTarget(); + const ac = new AbortController(); + + const i = setInterval(() => et.dispatchEvent(new Event('foo')), 10); + + async function foo() { + for await (const f of on(et, 'foo', { signal: ac.signal })) { + assert(f); + } + } + + foo().catch(common.mustCall((error) => { + assert.strictEqual(error.name, 'AbortError'); + })).finally(() => { + clearInterval(i); + }); + + process.nextTick(() => ac.abort()); +} + +async function eventTargetAbortableOnAfter2() { + const et = new EventTarget(); + const ac = new AbortController(); + + const i = setInterval(() => et.dispatchEvent(new Event('foo')), 10); + + async function foo() { + for await (const f of on(et, 'foo', { signal: ac.signal })) { + assert(f); + // Cancel after a single event has been triggered. + ac.abort(); + } + } + + foo().catch(common.mustCall((error) => { + assert.strictEqual(error.name, 'AbortError'); + })).finally(() => { + clearInterval(i); + }); +} + +async function abortableOnAfterDone() { + const ee = new EventEmitter(); + const ac = new AbortController(); + + const i = setInterval(() => ee.emit('foo', 'foo'), 1); + let count = 0; + + async function foo() { + for await (const f of on(ee, 'foo', { signal: ac.signal })) { + assert.strictEqual(f[0], 'foo'); + if (++count === 5) + break; + } + ac.abort(); // No error will occur + } + + foo().finally(() => { + clearInterval(i); + }); +} async function run() { const funcs = [ @@ -260,7 +371,13 @@ async function run() { iterableThrow, eventTarget, errorListenerCount, - nodeEventTarget + nodeEventTarget, + abortableOnBefore, + abortableOnAfter, + eventTargetAbortableOnBefore, + eventTargetAbortableOnAfter, + eventTargetAbortableOnAfter2, + abortableOnAfterDone ]; for (const fn of funcs) {