Skip to content

Commit 0100a60

Browse files
author
Nitzan Uziely
committed
stream: add AbortSignal to promisified pipeline
add support for AbortSignal to promisified pipeline. Resolves: #37321
1 parent 88d9268 commit 0100a60

File tree

2 files changed

+93
-1
lines changed

2 files changed

+93
-1
lines changed

lib/stream/promises.js

+21-1
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,42 @@
11
'use strict';
22

33
const {
4+
ArrayPrototypePop,
45
Promise,
56
} = primordials;
67

8+
const {
9+
addAbortSignalNoValidate,
10+
} = require('internal/streams/add-abort-signal');
11+
12+
const {
13+
validateAbortSignal,
14+
} = require('internal/validators');
15+
716
let pl;
817
let eos;
918

1019
function pipeline(...streams) {
1120
if (!pl) pl = require('internal/streams/pipeline');
21+
let signal;
22+
const lastArg = streams[streams.length - 1];
23+
if (typeof lastArg === 'object' && 'aborted' in lastArg) {
24+
signal = ArrayPrototypePop(streams);
25+
}
1226
return new Promise((resolve, reject) => {
13-
pl(...streams, (err, value) => {
27+
if (signal) {
28+
validateAbortSignal(signal);
29+
}
30+
const pipe = pl(...streams, (err, value) => {
1431
if (err) {
1532
reject(err);
1633
} else {
1734
resolve(value);
1835
}
1936
});
37+
if (signal) {
38+
addAbortSignalNoValidate(signal, pipe);
39+
}
2040
});
2141
}
2242

test/parallel/test-stream-pipeline.js

+72
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,78 @@ const net = require('net');
469469
run();
470470
}
471471

472+
{
473+
// Check aborted signal without values
474+
const pipelinePromise = promisify(pipeline);
475+
async function run() {
476+
const ac = new AbortController();
477+
const { signal } = ac;
478+
async function* producer() {
479+
ac.abort();
480+
await Promise.resolve();
481+
yield '8';
482+
}
483+
484+
const w = new Writable({
485+
write(chunk, encoding, callback) {
486+
callback();
487+
}
488+
});
489+
await pipelinePromise(producer, w, signal);
490+
}
491+
492+
assert.rejects(run, { name: 'AbortError' }).then(common.mustCall());
493+
}
494+
495+
{
496+
// Check aborted signal after init.
497+
const pipelinePromise = promisify(pipeline);
498+
async function run() {
499+
const ac = new AbortController();
500+
const { signal } = ac;
501+
async function* producer() {
502+
yield '5';
503+
await Promise.resolve();
504+
ac.abort();
505+
await Promise.resolve();
506+
yield '8';
507+
}
508+
509+
const w = new Writable({
510+
write(chunk, encoding, callback) {
511+
callback();
512+
}
513+
});
514+
await pipelinePromise(producer, w, signal);
515+
}
516+
517+
assert.rejects(run, { name: 'AbortError' }).then(common.mustCall());
518+
}
519+
520+
{
521+
// Check pre-aborted signal
522+
const pipelinePromise = promisify(pipeline);
523+
async function run() {
524+
const ac = new AbortController();
525+
const { signal } = ac;
526+
ac.abort();
527+
async function* producer() {
528+
yield '5';
529+
await Promise.resolve();
530+
yield '8';
531+
}
532+
533+
const w = new Writable({
534+
write(chunk, encoding, callback) {
535+
callback();
536+
}
537+
});
538+
await pipelinePromise(producer, w, signal);
539+
}
540+
541+
assert.rejects(run, { name: 'AbortError' }).then(common.mustCall());
542+
}
543+
472544
{
473545
const read = new Readable({
474546
read() {}

0 commit comments

Comments
 (0)