Skip to content

Commit 429dffd

Browse files
Nitzan Uzielytargos
Nitzan Uziely
authored andcommitted
stream: add AbortSignal to promisified pipeline
add support for AbortSignal to promisified pipeline. Resolves: #37321 PR-URL: #37359 Reviewed-By: Robert Nagy <ronagy@icloud.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Zijian Liu <lxxyxzj@gmail.com>
1 parent 6675342 commit 429dffd

File tree

3 files changed

+145
-2
lines changed

3 files changed

+145
-2
lines changed

doc/api/stream.md

+29-1
Original file line numberDiff line numberDiff line change
@@ -1719,7 +1719,11 @@ pipeline(
17191719
);
17201720
```
17211721

1722-
The `pipeline` API provides promise version:
1722+
The `pipeline` API provides a promise version, which can also
1723+
receive an options argument as the last parameter with a
1724+
`signal` {AbortSignal} property. When the signal is aborted,
1725+
`destroy` will be called on the underlying pipeline, with an
1726+
`AbortError`.
17231727

17241728
```js
17251729
const { pipeline } = require('stream/promises');
@@ -1736,6 +1740,30 @@ async function run() {
17361740
run().catch(console.error);
17371741
```
17381742

1743+
To use an `AbortSignal`, pass it inside an options object,
1744+
as the last argument:
1745+
1746+
```js
1747+
const { pipeline } = require('stream/promises');
1748+
1749+
async function run() {
1750+
const ac = new AbortController();
1751+
const options = {
1752+
signal: ac.signal,
1753+
};
1754+
1755+
setTimeout(() => ac.abort(), 1);
1756+
await pipeline(
1757+
fs.createReadStream('archive.tar'),
1758+
zlib.createGzip(),
1759+
fs.createWriteStream('archive.tar.gz'),
1760+
options,
1761+
);
1762+
}
1763+
1764+
run().catch(console.error); // AbortError
1765+
```
1766+
17391767
The `pipeline` API also supports async generators:
17401768

17411769
```js

lib/stream/promises.js

+44-1
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,65 @@
11
'use strict';
22

33
const {
4+
ArrayPrototypePop,
45
Promise,
6+
SymbolAsyncIterator,
7+
SymbolIterator,
58
} = primordials;
69

10+
const {
11+
addAbortSignalNoValidate,
12+
} = require('internal/streams/add-abort-signal');
13+
14+
const {
15+
validateAbortSignal,
16+
} = require('internal/validators');
17+
718
let pl;
819
let eos;
920

21+
function isReadable(obj) {
22+
return !!(obj && typeof obj.pipe === 'function');
23+
}
24+
25+
function isWritable(obj) {
26+
return !!(obj && typeof obj.write === 'function');
27+
}
28+
29+
function isStream(obj) {
30+
return isReadable(obj) || isWritable(obj);
31+
}
32+
33+
function isIterable(obj, isAsync) {
34+
if (!obj) return false;
35+
if (isAsync === true) return typeof obj[SymbolAsyncIterator] === 'function';
36+
if (isAsync === false) return typeof obj[SymbolIterator] === 'function';
37+
return typeof obj[SymbolAsyncIterator] === 'function' ||
38+
typeof obj[SymbolIterator] === 'function';
39+
}
40+
1041
function pipeline(...streams) {
1142
if (!pl) pl = require('internal/streams/pipeline');
1243
return new Promise((resolve, reject) => {
13-
pl(...streams, (err, value) => {
44+
let signal;
45+
const lastArg = streams[streams.length - 1];
46+
if (lastArg && typeof lastArg === 'object' &&
47+
!isStream(lastArg) && !isIterable(lastArg)) {
48+
const options = ArrayPrototypePop(streams);
49+
signal = options.signal;
50+
validateAbortSignal(signal, 'options.signal');
51+
}
52+
53+
const pipe = pl(...streams, (err, value) => {
1454
if (err) {
1555
reject(err);
1656
} else {
1757
resolve(value);
1858
}
1959
});
60+
if (signal) {
61+
addAbortSignalNoValidate(signal, pipe);
62+
}
2063
});
2164
}
2265

test/parallel/test-stream-pipeline.js

+72
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,78 @@ const net = require('net');
469469
run();
470470
}
471471

472+
{
473+
// Check aborted signal without values
474+
const pipelinePromise = promisify(pipeline);
475+
async function run() {
476+
const ac = new AbortController();
477+
const { signal } = ac;
478+
async function* producer() {
479+
ac.abort();
480+
await Promise.resolve();
481+
yield '8';
482+
}
483+
484+
const w = new Writable({
485+
write(chunk, encoding, callback) {
486+
callback();
487+
}
488+
});
489+
await pipelinePromise(producer, w, { signal });
490+
}
491+
492+
assert.rejects(run, { name: 'AbortError' }).then(common.mustCall());
493+
}
494+
495+
{
496+
// Check aborted signal after init.
497+
const pipelinePromise = promisify(pipeline);
498+
async function run() {
499+
const ac = new AbortController();
500+
const { signal } = ac;
501+
async function* producer() {
502+
yield '5';
503+
await Promise.resolve();
504+
ac.abort();
505+
await Promise.resolve();
506+
yield '8';
507+
}
508+
509+
const w = new Writable({
510+
write(chunk, encoding, callback) {
511+
callback();
512+
}
513+
});
514+
await pipelinePromise(producer, w, { signal });
515+
}
516+
517+
assert.rejects(run, { name: 'AbortError' }).then(common.mustCall());
518+
}
519+
520+
{
521+
// Check pre-aborted signal
522+
const pipelinePromise = promisify(pipeline);
523+
async function run() {
524+
const ac = new AbortController();
525+
const { signal } = ac;
526+
ac.abort();
527+
async function* producer() {
528+
yield '5';
529+
await Promise.resolve();
530+
yield '8';
531+
}
532+
533+
const w = new Writable({
534+
write(chunk, encoding, callback) {
535+
callback();
536+
}
537+
});
538+
await pipelinePromise(producer, w, { signal });
539+
}
540+
541+
assert.rejects(run, { name: 'AbortError' }).then(common.mustCall());
542+
}
543+
472544
{
473545
const read = new Readable({
474546
read() {}

0 commit comments

Comments
 (0)