Skip to content

Commit 6ecbc1d

Browse files
benjamingrtargos
authored andcommitted
stream: support abortsignal in constructor
PR-URL: #36431 Reviewed-By: Robert Nagy <ronagy@icloud.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
1 parent e46a46a commit 6ecbc1d

8 files changed

+119
-6
lines changed

doc/api/stream.md

+46
Original file line numberDiff line numberDiff line change
@@ -1938,6 +1938,9 @@ method.
19381938
#### `new stream.Writable([options])`
19391939
<!-- YAML
19401940
changes:
1941+
- version: REPLACEME
1942+
pr-url: https://github.com/nodejs/node/pull/36431
1943+
description: support passing in an AbortSignal.
19411944
- version: v14.0.0
19421945
pr-url: https://github.com/nodejs/node/pull/30623
19431946
description: Change `autoDestroy` option default to `true`.
@@ -1985,6 +1988,7 @@ changes:
19851988
[`stream._construct()`][writable-_construct] method.
19861989
* `autoDestroy` {boolean} Whether this stream should automatically call
19871990
`.destroy()` on itself after ending. **Default:** `true`.
1991+
* `signal` {AbortSignal} A signal representing possible cancellation.
19881992

19891993
<!-- eslint-disable no-useless-constructor -->
19901994
```js
@@ -2028,6 +2032,27 @@ const myWritable = new Writable({
20282032
});
20292033
```
20302034

2035+
Calling `abort` on the `AbortController` corresponding to the passed
2036+
`AbortSignal` will behave the same way as calling `.destroy(new AbortError())`
2037+
on the writeable stream.
2038+
2039+
```js
2040+
const { Writable } = require('stream');
2041+
2042+
const controller = new AbortController();
2043+
const myWritable = new Writable({
2044+
write(chunk, encoding, callback) {
2045+
// ...
2046+
},
2047+
writev(chunks, callback) {
2048+
// ...
2049+
},
2050+
signal: controller.signal
2051+
});
2052+
// Later, abort the operation closing the stream
2053+
controller.abort();
2054+
2055+
```
20312056
#### `writable._construct(callback)`
20322057
<!-- YAML
20332058
added: v15.0.0
@@ -2276,6 +2301,9 @@ constructor and implement the [`readable._read()`][] method.
22762301
#### `new stream.Readable([options])`
22772302
<!-- YAML
22782303
changes:
2304+
- version: REPLACEME
2305+
pr-url: https://github.com/nodejs/node/pull/36431
2306+
description: support passing in an AbortSignal.
22792307
- version: v14.0.0
22802308
pr-url: https://github.com/nodejs/node/pull/30623
22812309
description: Change `autoDestroy` option default to `true`.
@@ -2306,6 +2334,7 @@ changes:
23062334
[`stream._construct()`][readable-_construct] method.
23072335
* `autoDestroy` {boolean} Whether this stream should automatically call
23082336
`.destroy()` on itself after ending. **Default:** `true`.
2337+
* `signal` {AbortSignal} A signal representing possible cancellation.
23092338

23102339
<!-- eslint-disable no-useless-constructor -->
23112340
```js
@@ -2346,6 +2375,23 @@ const myReadable = new Readable({
23462375
});
23472376
```
23482377

2378+
Calling `abort` on the `AbortController` corresponding to the passed
2379+
`AbortSignal` will behave the same way as calling `.destroy(new AbortError())`
2380+
on the readable created.
2381+
2382+
```js
2383+
const fs = require('fs');
2384+
const controller = new AbortController();
2385+
const read = new Readable({
2386+
read(size) {
2387+
// ...
2388+
},
2389+
signal: controller.signal
2390+
});
2391+
// Later, abort the operation closing the stream
2392+
controller.abort();
2393+
```
2394+
23492395
#### `readable._construct(callback)`
23502396
<!-- YAML
23512397
added: v15.0.0

lib/internal/streams/add-abort-signal.js

+10-5
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,11 @@ const eos = require('internal/streams/end-of-stream');
99
const { ERR_INVALID_ARG_TYPE } = codes;
1010

1111
// This method is inlined here for readable-stream
12+
// It also does not allow for signal to not exist on the steam
1213
// https://github.com/nodejs/node/pull/36061#discussion_r533718029
1314
const validateAbortSignal = (signal, name) => {
14-
if (signal !== undefined &&
15-
(signal === null ||
16-
typeof signal !== 'object' ||
17-
!('aborted' in signal))) {
15+
if (typeof signal !== 'object' ||
16+
!('aborted' in signal)) {
1817
throw new ERR_INVALID_ARG_TYPE(name, 'AbortSignal', signal);
1918
}
2019
};
@@ -23,11 +22,17 @@ function isStream(obj) {
2322
return !!(obj && typeof obj.pipe === 'function');
2423
}
2524

26-
module.exports = function addAbortSignal(signal, stream) {
25+
module.exports.addAbortSignal = function addAbortSignal(signal, stream) {
2726
validateAbortSignal(signal, 'signal');
2827
if (!isStream(stream)) {
2928
throw new ERR_INVALID_ARG_TYPE('stream', 'stream.Stream', stream);
3029
}
30+
return module.exports.addAbortSignalNoValidate(signal, stream);
31+
};
32+
module.exports.addAbortSignalNoValidate = function(signal, stream) {
33+
if (typeof signal !== 'object' || !('aborted' in signal)) {
34+
return stream;
35+
}
3136
const onAbort = () => {
3237
stream.destroy(new AbortError());
3338
};

lib/internal/streams/readable.js

+6
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ const EE = require('events');
4141
const { Stream, prependListener } = require('internal/streams/legacy');
4242
const { Buffer } = require('buffer');
4343

44+
const {
45+
addAbortSignalNoValidate,
46+
} = require('internal/streams/add-abort-signal');
47+
4448
let debug = require('internal/util/debuglog').debuglog('stream', (fn) => {
4549
debug = fn;
4650
});
@@ -192,6 +196,8 @@ function Readable(options) {
192196

193197
if (typeof options.construct === 'function')
194198
this._construct = options.construct;
199+
if (options.signal && !isDuplex)
200+
addAbortSignalNoValidate(options.signal, this);
195201
}
196202

197203
Stream.call(this, options);

lib/internal/streams/writable.js

+7
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@ const EE = require('events');
4141
const Stream = require('internal/streams/legacy').Stream;
4242
const { Buffer } = require('buffer');
4343
const destroyImpl = require('internal/streams/destroy');
44+
45+
const {
46+
addAbortSignalNoValidate,
47+
} = require('internal/streams/add-abort-signal');
48+
4449
const {
4550
getHighWaterMark,
4651
getDefaultHighWaterMark
@@ -263,6 +268,8 @@ function Writable(options) {
263268

264269
if (typeof options.construct === 'function')
265270
this._construct = options.construct;
271+
if (options.signal)
272+
addAbortSignalNoValidate(options.signal, this);
266273
}
267274

268275
Stream.call(this, options);

lib/stream.js

+2-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ Stream.Duplex = require('internal/streams/duplex');
4343
Stream.Transform = require('internal/streams/transform');
4444
Stream.PassThrough = require('internal/streams/passthrough');
4545
Stream.pipeline = pipeline;
46-
Stream.addAbortSignal = require('internal/streams/add-abort-signal');
46+
const { addAbortSignal } = require('internal/streams/add-abort-signal');
47+
Stream.addAbortSignal = addAbortSignal;
4748
Stream.finished = eos;
4849

4950
function lazyLoadPromises() {

test/parallel/test-stream-duplex-destroy.js

+17
Original file line numberDiff line numberDiff line change
@@ -238,3 +238,20 @@ const assert = require('assert');
238238
});
239239
duplex.on('close', common.mustCall());
240240
}
241+
{
242+
// Check abort signal
243+
const controller = new AbortController();
244+
const { signal } = controller;
245+
const duplex = new Duplex({
246+
write(chunk, enc, cb) { cb(); },
247+
read() {},
248+
signal,
249+
});
250+
let count = 0;
251+
duplex.on('error', common.mustCall((e) => {
252+
assert.strictEqual(count++, 0); // Ensure not called twice
253+
assert.strictEqual(e.name, 'AbortError');
254+
}));
255+
duplex.on('close', common.mustCall());
256+
controller.abort();
257+
}

test/parallel/test-stream-readable-destroy.js

+16
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,22 @@ const assert = require('assert');
284284
read.on('data', common.mustNotCall());
285285
}
286286

287+
{
288+
const controller = new AbortController();
289+
const read = new Readable({
290+
signal: controller.signal,
291+
read() {
292+
this.push('asd');
293+
},
294+
});
295+
296+
read.on('error', common.mustCall((e) => {
297+
assert.strictEqual(e.name, 'AbortError');
298+
}));
299+
controller.abort();
300+
read.on('data', common.mustNotCall());
301+
}
302+
287303
{
288304
const controller = new AbortController();
289305
const read = addAbortSignal(controller.signal, new Readable({

test/parallel/test-stream-writable-destroy.js

+15
Original file line numberDiff line numberDiff line change
@@ -431,3 +431,18 @@ const assert = require('assert');
431431
write.write('asd');
432432
ac.abort();
433433
}
434+
435+
{
436+
const ac = new AbortController();
437+
const write = new Writable({
438+
signal: ac.signal,
439+
write(chunk, enc, cb) { cb(); }
440+
});
441+
442+
write.on('error', common.mustCall((e) => {
443+
assert.strictEqual(e.name, 'AbortError');
444+
assert.strictEqual(write.destroyed, true);
445+
}));
446+
write.write('asd');
447+
ac.abort();
448+
}

0 commit comments

Comments
 (0)