Skip to content

Commit 3d3df0c

Browse files
Nitzan Uzielytargos
Nitzan Uziely
authored andcommitted
stream: add AbortSignal support to finished
PR-URL: #37354 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Robert Nagy <ronagy@icloud.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Antoine du Hamel <duhamelantoine1995@gmail.com>
1 parent 617819e commit 3d3df0c

File tree

3 files changed

+117
-2
lines changed

3 files changed

+117
-2
lines changed

doc/api/stream.md

+7
Original file line numberDiff line numberDiff line change
@@ -1578,6 +1578,9 @@ further errors except from `_destroy()` may be emitted as `'error'`.
15781578
<!-- YAML
15791579
added: v10.0.0
15801580
changes:
1581+
- version: REPLACEME
1582+
pr-url: https://github.com/nodejs/node/pull/37354
1583+
description: The `signal` option was added.
15811584
- version: v14.0.0
15821585
pr-url: https://github.com/nodejs/node/pull/32158
15831586
description: The `finished(stream, cb)` will wait for the `'close'` event
@@ -1604,6 +1607,10 @@ changes:
16041607
* `writable` {boolean} When set to `false`, the callback will be called when
16051608
the stream ends even though the stream might still be writable.
16061609
**Default:** `true`.
1610+
* `signal` {AbortSignal} allows aborting the wait for the stream finish. The
1611+
underlying stream will *not* be aborted if the signal is aborted. The
1612+
callback will get called with an `AbortError`. All registered
1613+
listeners added by this function will also be removed.
16071614
* `callback` {Function} A callback function that takes an optional error
16081615
argument.
16091616
* Returns: {Function} A cleanup function which removes all registered

lib/internal/streams/end-of-stream.js

+33-2
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,20 @@
33

44
'use strict';
55

6+
const {
7+
FunctionPrototypeCall,
8+
ReflectApply,
9+
} = primordials;
10+
const {
11+
AbortError,
12+
codes,
13+
} = require('internal/errors');
614
const {
715
ERR_STREAM_PREMATURE_CLOSE
8-
} = require('internal/errors').codes;
16+
} = codes;
917
const { once } = require('internal/util');
1018
const {
19+
validateAbortSignal,
1120
validateFunction,
1221
validateObject,
1322
} = require('internal/validators');
@@ -64,6 +73,7 @@ function eos(stream, options, callback) {
6473
validateObject(options, 'options');
6574
}
6675
validateFunction(callback, 'callback');
76+
validateAbortSignal(options.signal, 'options.signal');
6777

6878
callback = once(callback);
6979

@@ -185,7 +195,7 @@ function eos(stream, options, callback) {
185195
});
186196
}
187197

188-
return function() {
198+
const cleanup = () => {
189199
callback = nop;
190200
stream.removeListener('aborted', onclose);
191201
stream.removeListener('complete', onfinish);
@@ -199,6 +209,27 @@ function eos(stream, options, callback) {
199209
stream.removeListener('error', onerror);
200210
stream.removeListener('close', onclose);
201211
};
212+
213+
if (options.signal && !closed) {
214+
const abort = () => {
215+
// Keep it because cleanup removes it.
216+
const endCallback = callback;
217+
cleanup();
218+
FunctionPrototypeCall(endCallback, stream, new AbortError());
219+
};
220+
if (options.signal.aborted) {
221+
process.nextTick(abort);
222+
} else {
223+
const originalCallback = callback;
224+
callback = once((...args) => {
225+
options.signal.removeEventListener('abort', abort);
226+
ReflectApply(originalCallback, stream, args);
227+
});
228+
options.signal.addEventListener('abort', abort);
229+
}
230+
}
231+
232+
return cleanup;
202233
}
203234

204235
module.exports = eos;

test/parallel/test-stream-finished.js

+77
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,83 @@ const http = require('http');
9292
run();
9393
}
9494

95+
{
96+
// Check pre-cancelled
97+
const signal = new EventTarget();
98+
signal.aborted = true;
99+
100+
const rs = Readable.from((function* () {})());
101+
finished(rs, { signal }, common.mustCall((err) => {
102+
assert.strictEqual(err.name, 'AbortError');
103+
}));
104+
}
105+
106+
{
107+
// Check cancelled before the stream ends sync.
108+
const ac = new AbortController();
109+
const { signal } = ac;
110+
111+
const rs = Readable.from((function* () {})());
112+
finished(rs, { signal }, common.mustCall((err) => {
113+
assert.strictEqual(err.name, 'AbortError');
114+
}));
115+
116+
ac.abort();
117+
}
118+
119+
{
120+
// Check cancelled before the stream ends async.
121+
const ac = new AbortController();
122+
const { signal } = ac;
123+
124+
const rs = Readable.from((function* () {})());
125+
setTimeout(() => ac.abort(), 1);
126+
finished(rs, { signal }, common.mustCall((err) => {
127+
assert.strictEqual(err.name, 'AbortError');
128+
}));
129+
}
130+
131+
{
132+
// Check cancelled after doesn't throw.
133+
const ac = new AbortController();
134+
const { signal } = ac;
135+
136+
const rs = Readable.from((function* () {
137+
yield 5;
138+
setImmediate(() => ac.abort());
139+
})());
140+
rs.resume();
141+
finished(rs, { signal }, common.mustSucceed());
142+
}
143+
144+
{
145+
// Promisified abort works
146+
const finishedPromise = promisify(finished);
147+
async function run() {
148+
const ac = new AbortController();
149+
const { signal } = ac;
150+
const rs = Readable.from((function* () {})());
151+
setImmediate(() => ac.abort());
152+
await finishedPromise(rs, { signal });
153+
}
154+
155+
assert.rejects(run, { name: 'AbortError' }).then(common.mustCall());
156+
}
157+
158+
{
159+
// Promisified pre-aborted works
160+
const finishedPromise = promisify(finished);
161+
async function run() {
162+
const signal = new EventTarget();
163+
signal.aborted = true;
164+
const rs = Readable.from((function* () {})());
165+
await finishedPromise(rs, { signal });
166+
}
167+
168+
assert.rejects(run, { name: 'AbortError' }).then(common.mustCall());
169+
}
170+
171+
95172
{
96173
const rs = fs.createReadStream('file-does-not-exist');
97174

0 commit comments

Comments
 (0)