Skip to content

Commit 424a9b8

Browse files
jasnelljoesepi
authored andcommitted
events: allow use of AbortController with on
Signed-off-by: James M Snell <jasnell@gmail.com> PR-URL: nodejs#34912 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Denys Otrishko <shishugi@gmail.com>
1 parent 79a3735 commit 424a9b8

File tree

3 files changed

+177
-4
lines changed

3 files changed

+177
-4
lines changed

doc/api/events.md

+31-1
Original file line numberDiff line numberDiff line change
@@ -1000,7 +1000,7 @@ Value: `Symbol.for('nodejs.rejection')`
10001000

10011001
See how to write a custom [rejection handler][rejection].
10021002

1003-
## `events.on(emitter, eventName)`
1003+
## `events.on(emitter, eventName[, options])`
10041004
<!-- YAML
10051005
added:
10061006
- v13.6.0
@@ -1009,6 +1009,9 @@ added:
10091009

10101010
* `emitter` {EventEmitter}
10111011
* `eventName` {string|symbol} The name of the event being listened for
1012+
* `options` {Object}
1013+
* `signal` {AbortSignal} An {AbortSignal} that can be used to cancel awaiting
1014+
events.
10121015
* Returns: {AsyncIterator} that iterates `eventName` events emitted by the `emitter`
10131016

10141017
```js
@@ -1038,6 +1041,33 @@ if the `EventEmitter` emits `'error'`. It removes all listeners when
10381041
exiting the loop. The `value` returned by each iteration is an array
10391042
composed of the emitted event arguments.
10401043

1044+
An {AbortSignal} may be used to cancel waiting on events:
1045+
1046+
```js
1047+
const { on, EventEmitter } = require('events');
1048+
const ac = new AbortController();
1049+
1050+
(async () => {
1051+
const ee = new EventEmitter();
1052+
1053+
// Emit later on
1054+
process.nextTick(() => {
1055+
ee.emit('foo', 'bar');
1056+
ee.emit('foo', 42);
1057+
});
1058+
1059+
for await (const event of on(ee, 'foo', { signal: ac.signal })) {
1060+
// The execution of this inner block is synchronous and it
1061+
// processes one event at a time (even with await). Do not use
1062+
// if concurrent execution is required.
1063+
console.log(event); // prints ['bar'] [42]
1064+
}
1065+
// Unreachable here
1066+
})();
1067+
1068+
process.nextTick(() => ac.abort());
1069+
```
1070+
10411071
## `EventTarget` and `Event` API
10421072
<!-- YAML
10431073
added: v14.5.0

lib/events.js

+27-1
Original file line numberDiff line numberDiff line change
@@ -730,7 +730,13 @@ function eventTargetAgnosticAddListener(emitter, name, listener, flags) {
730730
}
731731
}
732732

733-
function on(emitter, event) {
733+
function on(emitter, event, options) {
734+
const { signal } = { ...options };
735+
validateAbortSignal(signal, 'options.signal');
736+
if (signal && signal.aborted) {
737+
throw lazyDOMException('The operation was aborted', 'AbortError');
738+
}
739+
734740
const unconsumedEvents = [];
735741
const unconsumedPromises = [];
736742
let error = null;
@@ -768,6 +774,15 @@ function on(emitter, event) {
768774
return() {
769775
eventTargetAgnosticRemoveListener(emitter, event, eventHandler);
770776
eventTargetAgnosticRemoveListener(emitter, 'error', errorHandler);
777+
778+
if (signal) {
779+
eventTargetAgnosticRemoveListener(
780+
signal,
781+
'abort',
782+
abortListener,
783+
{ once: true });
784+
}
785+
771786
finished = true;
772787

773788
for (const promise of unconsumedPromises) {
@@ -797,9 +812,20 @@ function on(emitter, event) {
797812
addErrorHandlerIfEventEmitter(emitter, errorHandler);
798813
}
799814

815+
if (signal) {
816+
eventTargetAgnosticAddListener(
817+
signal,
818+
'abort',
819+
abortListener,
820+
{ once: true });
821+
}
800822

801823
return iterator;
802824

825+
function abortListener() {
826+
errorHandler(lazyDOMException('The operation was aborted', 'AbortError'));
827+
}
828+
803829
function eventHandler(...args) {
804830
const promise = unconsumedPromises.shift();
805831
if (promise) {

test/parallel/test-event-on-async-iterator.js

+119-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Flags: --expose-internals
1+
// Flags: --expose-internals --no-warnings
22
'use strict';
33

44
const common = require('../common');
@@ -248,6 +248,117 @@ async function nodeEventTarget() {
248248
clearInterval(interval);
249249
}
250250

251+
async function abortableOnBefore() {
252+
const ee = new EventEmitter();
253+
const ac = new AbortController();
254+
ac.abort();
255+
[1, {}, null, false, 'hi'].forEach((signal) => {
256+
assert.throws(() => on(ee, 'foo', { signal }), {
257+
code: 'ERR_INVALID_ARG_TYPE'
258+
});
259+
});
260+
assert.throws(() => on(ee, 'foo', { signal: ac.signal }), {
261+
name: 'AbortError'
262+
});
263+
}
264+
265+
async function eventTargetAbortableOnBefore() {
266+
const et = new EventTarget();
267+
const ac = new AbortController();
268+
ac.abort();
269+
[1, {}, null, false, 'hi'].forEach((signal) => {
270+
assert.throws(() => on(et, 'foo', { signal }), {
271+
code: 'ERR_INVALID_ARG_TYPE'
272+
});
273+
});
274+
assert.throws(() => on(et, 'foo', { signal: ac.signal }), {
275+
name: 'AbortError'
276+
});
277+
}
278+
279+
async function abortableOnAfter() {
280+
const ee = new EventEmitter();
281+
const ac = new AbortController();
282+
283+
const i = setInterval(() => ee.emit('foo', 'foo'), 10);
284+
285+
async function foo() {
286+
for await (const f of on(ee, 'foo', { signal: ac.signal })) {
287+
assert.strictEqual(f, 'foo');
288+
}
289+
}
290+
291+
foo().catch(common.mustCall((error) => {
292+
assert.strictEqual(error.name, 'AbortError');
293+
})).finally(() => {
294+
clearInterval(i);
295+
});
296+
297+
process.nextTick(() => ac.abort());
298+
}
299+
300+
async function eventTargetAbortableOnAfter() {
301+
const et = new EventTarget();
302+
const ac = new AbortController();
303+
304+
const i = setInterval(() => et.dispatchEvent(new Event('foo')), 10);
305+
306+
async function foo() {
307+
for await (const f of on(et, 'foo', { signal: ac.signal })) {
308+
assert(f);
309+
}
310+
}
311+
312+
foo().catch(common.mustCall((error) => {
313+
assert.strictEqual(error.name, 'AbortError');
314+
})).finally(() => {
315+
clearInterval(i);
316+
});
317+
318+
process.nextTick(() => ac.abort());
319+
}
320+
321+
async function eventTargetAbortableOnAfter2() {
322+
const et = new EventTarget();
323+
const ac = new AbortController();
324+
325+
const i = setInterval(() => et.dispatchEvent(new Event('foo')), 10);
326+
327+
async function foo() {
328+
for await (const f of on(et, 'foo', { signal: ac.signal })) {
329+
assert(f);
330+
// Cancel after a single event has been triggered.
331+
ac.abort();
332+
}
333+
}
334+
335+
foo().catch(common.mustCall((error) => {
336+
assert.strictEqual(error.name, 'AbortError');
337+
})).finally(() => {
338+
clearInterval(i);
339+
});
340+
}
341+
342+
async function abortableOnAfterDone() {
343+
const ee = new EventEmitter();
344+
const ac = new AbortController();
345+
346+
const i = setInterval(() => ee.emit('foo', 'foo'), 1);
347+
let count = 0;
348+
349+
async function foo() {
350+
for await (const f of on(ee, 'foo', { signal: ac.signal })) {
351+
assert.strictEqual(f[0], 'foo');
352+
if (++count === 5)
353+
break;
354+
}
355+
ac.abort(); // No error will occur
356+
}
357+
358+
foo().finally(() => {
359+
clearInterval(i);
360+
});
361+
}
251362

252363
async function run() {
253364
const funcs = [
@@ -260,7 +371,13 @@ async function run() {
260371
iterableThrow,
261372
eventTarget,
262373
errorListenerCount,
263-
nodeEventTarget
374+
nodeEventTarget,
375+
abortableOnBefore,
376+
abortableOnAfter,
377+
eventTargetAbortableOnBefore,
378+
eventTargetAbortableOnAfter,
379+
eventTargetAbortableOnAfter2,
380+
abortableOnAfterDone
264381
];
265382

266383
for (const fn of funcs) {

0 commit comments

Comments
 (0)