Skip to content

Commit 197ba21

Browse files
benjamingrdanielleadams
authored andcommitted
stream: support abort signal
PR-URL: #36061 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Robert Nagy <ronagy@icloud.com>
1 parent b39d150 commit 197ba21

10 files changed

+183
-13
lines changed

doc/api/stream.md

+52-2
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ There are four fundamental stream types within Node.js:
4545
is written and read (for example, [`zlib.createDeflate()`][]).
4646

4747
Additionally, this module includes the utility functions
48-
[`stream.pipeline()`][], [`stream.finished()`][] and
49-
[`stream.Readable.from()`][].
48+
[`stream.pipeline()`][], [`stream.finished()`][], [`stream.Readable.from()`][]
49+
and [`stream.addAbortSignal()`][].
5050

5151
### Streams Promises API
5252
<!-- YAML
@@ -1799,6 +1799,55 @@ Calling `Readable.from(string)` or `Readable.from(buffer)` will not have
17991799
the strings or buffers be iterated to match the other streams semantics
18001800
for performance reasons.
18011801

1802+
### `stream.addAbortSignal(signal, stream)`
1803+
<!-- YAML
1804+
added: REPLACEME
1805+
-->
1806+
* `signal` {AbortSignal} A signal representing possible cancellation
1807+
* `stream` {Stream} a stream to attach a signal to
1808+
1809+
Attaches an AbortSignal to a readable or writeable stream. This lets code
1810+
control stream destruction using an `AbortController`.
1811+
1812+
Calling `abort` on the `AbortController` corresponding to the passed
1813+
`AbortSignal` will behave the same way as calling `.destroy(new AbortError())`
1814+
on the stream.
1815+
1816+
```js
1817+
const fs = require('fs');
1818+
1819+
const controller = new AbortController();
1820+
const read = addAbortSignal(
1821+
controller.signal,
1822+
fs.createReadStream(('object.json'))
1823+
);
1824+
// Later, abort the operation closing the stream
1825+
controller.abort();
1826+
```
1827+
1828+
Or using an `AbortSignal` with a readable stream as an async iterable:
1829+
1830+
```js
1831+
const controller = new AbortController();
1832+
setTimeout(() => controller.abort(), 10_000); // set a timeout
1833+
const stream = addAbortSignal(
1834+
controller.signal,
1835+
fs.createReadStream(('object.json'))
1836+
);
1837+
(async () => {
1838+
try {
1839+
for await (const chunk of stream) {
1840+
await process(chunk);
1841+
}
1842+
} catch (e) {
1843+
if (e.name === 'AbortError') {
1844+
// The operation was cancelled
1845+
} else {
1846+
throw e;
1847+
}
1848+
}
1849+
})();
1850+
```
18021851
## API for stream implementers
18031852

18041853
<!--type=misc-->
@@ -3123,6 +3172,7 @@ contain multi-byte characters.
31233172
[`stream.finished()`]: #stream_stream_finished_stream_options_callback
31243173
[`stream.pipe()`]: #stream_readable_pipe_destination_options
31253174
[`stream.pipeline()`]: #stream_stream_pipeline_source_transforms_destination_callback
3175+
[`stream.addAbortSignal()`]: #stream_stream_addabortsignal_signal_stream
31263176
[`stream.uncork()`]: #stream_writable_uncork
31273177
[`stream.unpipe()`]: #stream_readable_unpipe_destination
31283178
[`stream.wrap()`]: #stream_readable_wrap_stream

lib/_http_client.js

+4-8
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ const { Buffer } = require('buffer');
5151
const { defaultTriggerAsyncIdScope } = require('internal/async_hooks');
5252
const { URL, urlToOptions, searchParamsSymbol } = require('internal/url');
5353
const { kOutHeaders, kNeedDrain } = require('internal/http');
54-
const { AbortError, connResetException, codes } = require('internal/errors');
54+
const { connResetException, codes } = require('internal/errors');
5555
const {
5656
ERR_HTTP_HEADERS_SENT,
5757
ERR_INVALID_ARG_TYPE,
@@ -61,14 +61,15 @@ const {
6161
} = codes;
6262
const {
6363
validateInteger,
64-
validateAbortSignal,
6564
} = require('internal/validators');
6665
const { getTimerDuration } = require('internal/timers');
6766
const {
6867
DTRACE_HTTP_CLIENT_REQUEST,
6968
DTRACE_HTTP_CLIENT_RESPONSE
7069
} = require('internal/dtrace');
7170

71+
const { addAbortSignal } = require('stream');
72+
7273
const INVALID_PATH_REGEX = /[^\u0021-\u00ff]/;
7374
const kError = Symbol('kError');
7475

@@ -174,12 +175,7 @@ function ClientRequest(input, options, cb) {
174175

175176
const signal = options.signal;
176177
if (signal) {
177-
validateAbortSignal(signal, 'options.signal');
178-
const listener = (e) => this.destroy(new AbortError());
179-
signal.addEventListener('abort', listener);
180-
this.once('close', () => {
181-
signal.removeEventListener('abort', listener);
182-
});
178+
addAbortSignal(signal, this);
183179
}
184180
let method = options.method;
185181
const methodIsString = (typeof method === 'string');
+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
'use strict';
2+
3+
const {
4+
AbortError,
5+
codes,
6+
} = require('internal/errors');
7+
8+
const eos = require('internal/streams/end-of-stream');
9+
const { ERR_INVALID_ARG_TYPE } = codes;
10+
11+
// This method is inlined here for readable-stream
12+
// https://github.com/nodejs/node/pull/36061#discussion_r533718029
13+
const validateAbortSignal = (signal, name) => {
14+
if (signal !== undefined &&
15+
(signal === null ||
16+
typeof signal !== 'object' ||
17+
!('aborted' in signal))) {
18+
throw new ERR_INVALID_ARG_TYPE(name, 'AbortSignal', signal);
19+
}
20+
};
21+
22+
function isStream(obj) {
23+
return !!(obj && typeof obj.pipe === 'function');
24+
}
25+
26+
module.exports = function addAbortSignal(signal, stream) {
27+
validateAbortSignal(signal, 'signal');
28+
if (!isStream(stream)) {
29+
throw new ERR_INVALID_ARG_TYPE('stream', 'stream.Stream', stream);
30+
}
31+
const onAbort = () => {
32+
stream.destroy(new AbortError());
33+
};
34+
if (signal.aborted) {
35+
onAbort();
36+
} else {
37+
signal.addEventListener('abort', onAbort);
38+
eos(stream, () => signal.removeEventListener('abort', onAbort));
39+
}
40+
return stream;
41+
};

lib/internal/streams/readable.js

+1
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ const {
5050
getHighWaterMark,
5151
getDefaultHighWaterMark
5252
} = require('internal/streams/state');
53+
5354
const {
5455
ERR_INVALID_ARG_TYPE,
5556
ERR_STREAM_PUSH_AFTER_EOF,

lib/stream.js

+1
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ Stream.Duplex = require('internal/streams/duplex');
4343
Stream.Transform = require('internal/streams/transform');
4444
Stream.PassThrough = require('internal/streams/passthrough');
4545
Stream.pipeline = pipeline;
46+
Stream.addAbortSignal = require('internal/streams/add-abort-signal');
4647
Stream.finished = eos;
4748

4849
function lazyLoadPromises() {

node.gyp

+1
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,7 @@
245245
'lib/internal/worker/js_transferable.js',
246246
'lib/internal/watchdog.js',
247247
'lib/internal/streams/lazy_transform.js',
248+
'lib/internal/streams/add-abort-signal.js',
248249
'lib/internal/streams/buffer_list.js',
249250
'lib/internal/streams/duplexpair.js',
250251
'lib/internal/streams/from.js',

test/parallel/test-bootstrap-modules.js

+1
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ const expectedModules = new Set([
7878
'NativeModule internal/process/warning',
7979
'NativeModule internal/querystring',
8080
'NativeModule internal/source_map/source_map_cache',
81+
'NativeModule internal/streams/add-abort-signal',
8182
'NativeModule internal/streams/buffer_list',
8283
'NativeModule internal/streams/destroy',
8384
'NativeModule internal/streams/duplex',

test/parallel/test-stream-pipeline.js

+31-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ const {
88
Transform,
99
pipeline,
1010
PassThrough,
11-
Duplex
11+
Duplex,
12+
addAbortSignal,
1213
} = require('stream');
1314
const assert = require('assert');
1415
const http = require('http');
@@ -1261,3 +1262,32 @@ const net = require('net');
12611262
() => common.mustNotCall(),
12621263
);
12631264
}
1265+
1266+
1267+
{
1268+
const ac = new AbortController();
1269+
const r = Readable.from(async function* () {
1270+
for (let i = 0; i < 10; i++) {
1271+
await Promise.resolve();
1272+
yield String(i);
1273+
if (i === 5) {
1274+
ac.abort();
1275+
}
1276+
}
1277+
}());
1278+
let res = '';
1279+
const w = new Writable({
1280+
write(chunk, encoding, callback) {
1281+
res += chunk;
1282+
callback();
1283+
}
1284+
});
1285+
const cb = common.mustCall((err) => {
1286+
assert.strictEqual(err.name, 'AbortError');
1287+
assert.strictEqual(res, '012345');
1288+
assert.strictEqual(w.destroyed, true);
1289+
assert.strictEqual(r.destroyed, true);
1290+
assert.strictEqual(pipelined.destroyed, true);
1291+
});
1292+
const pipelined = addAbortSignal(ac.signal, pipeline([r, w], cb));
1293+
}

test/parallel/test-stream-readable-destroy.js

+36-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
'use strict';
22

33
const common = require('../common');
4-
const { Readable } = require('stream');
4+
const { Readable, addAbortSignal } = require('stream');
55
const assert = require('assert');
66

77
{
@@ -268,3 +268,38 @@ const assert = require('assert');
268268
}));
269269
read.resume();
270270
}
271+
272+
{
273+
const controller = new AbortController();
274+
const read = addAbortSignal(controller.signal, new Readable({
275+
read() {
276+
this.push('asd');
277+
},
278+
}));
279+
280+
read.on('error', common.mustCall((e) => {
281+
assert.strictEqual(e.name, 'AbortError');
282+
}));
283+
controller.abort();
284+
read.on('data', common.mustNotCall());
285+
}
286+
287+
{
288+
const controller = new AbortController();
289+
const read = addAbortSignal(controller.signal, new Readable({
290+
objectMode: true,
291+
read() {
292+
return false;
293+
}
294+
}));
295+
read.push('asd');
296+
297+
read.on('error', common.mustCall((e) => {
298+
assert.strictEqual(e.name, 'AbortError');
299+
}));
300+
assert.rejects((async () => {
301+
/* eslint-disable-next-line no-unused-vars */
302+
for await (const chunk of read) {}
303+
})(), /AbortError/);
304+
setTimeout(() => controller.abort(), 0);
305+
}

test/parallel/test-stream-writable-destroy.js

+15-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
'use strict';
22

33
const common = require('../common');
4-
const { Writable } = require('stream');
4+
const { Writable, addAbortSignal } = require('stream');
55
const assert = require('assert');
66

77
{
@@ -417,3 +417,17 @@ const assert = require('assert');
417417
}));
418418
write.write('asd');
419419
}
420+
421+
{
422+
const ac = new AbortController();
423+
const write = addAbortSignal(ac.signal, new Writable({
424+
write(chunk, enc, cb) { cb(); }
425+
}));
426+
427+
write.on('error', common.mustCall((e) => {
428+
assert.strictEqual(e.name, 'AbortError');
429+
assert.strictEqual(write.destroyed, true);
430+
}));
431+
write.write('asd');
432+
ac.abort();
433+
}

0 commit comments

Comments
 (0)