Skip to content

Commit d7fe554

Browse files
committed
stream: do not swallow errors with async iterators and pipeline
Before this patch, pipeline() could swallow errors by pre-emptively producing a ERR_STREAM_PREMATURE_CLOSE that was not really helpful to the user. Co-Authored-By: Robert Nagy <ronagy@icloud.com> PR-URL: #32051 Reviewed-By: Robert Nagy <ronagy@icloud.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
1 parent e00e77e commit d7fe554

File tree

2 files changed

+51
-15
lines changed

2 files changed

+51
-15
lines changed

lib/internal/streams/pipeline.js

+24-15
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ async function pump(iterable, writable, finish) {
123123
if (!EE) {
124124
EE = require('events');
125125
}
126+
let error;
126127
try {
127128
for await (const chunk of iterable) {
128129
if (!writable.write(chunk)) {
@@ -132,7 +133,9 @@ async function pump(iterable, writable, finish) {
132133
}
133134
writable.end();
134135
} catch (err) {
135-
finish(err);
136+
error = err;
137+
} finally {
138+
finish(error);
136139
}
137140
}
138141

@@ -149,36 +152,37 @@ function pipeline(...streams) {
149152
let value;
150153
const destroys = [];
151154

152-
function finish(err, final) {
153-
if (!error && err) {
155+
let finishCount = 0;
156+
157+
function finish(err) {
158+
const final = --finishCount === 0;
159+
160+
if (err && (!error || error.code === 'ERR_STREAM_PREMATURE_CLOSE')) {
154161
error = err;
155162
}
156163

157-
if (error || final) {
158-
for (const destroy of destroys) {
159-
destroy(error);
160-
}
164+
if (!error && !final) {
165+
return;
166+
}
167+
168+
while (destroys.length) {
169+
destroys.shift()(error);
161170
}
162171

163172
if (final) {
164173
callback(error, value);
165174
}
166175
}
167176

168-
function wrap(stream, reading, writing, final) {
169-
destroys.push(destroyer(stream, reading, writing, final, (err) => {
170-
finish(err, final);
171-
}));
172-
}
173-
174177
let ret;
175178
for (let i = 0; i < streams.length; i++) {
176179
const stream = streams[i];
177180
const reading = i < streams.length - 1;
178181
const writing = i > 0;
179182

180183
if (isStream(stream)) {
181-
wrap(stream, reading, writing, !reading);
184+
finishCount++;
185+
destroys.push(destroyer(stream, reading, writing, !reading, finish));
182186
}
183187

184188
if (i === 0) {
@@ -224,20 +228,25 @@ function pipeline(...streams) {
224228
pt.destroy(err);
225229
});
226230
} else if (isIterable(ret, true)) {
231+
finishCount++;
227232
pump(ret, pt, finish);
228233
} else {
229234
throw new ERR_INVALID_RETURN_VALUE(
230235
'AsyncIterable or Promise', 'destination', ret);
231236
}
232237

233238
ret = pt;
234-
wrap(ret, false, true, true);
239+
240+
finishCount++;
241+
destroys.push(destroyer(ret, false, true, true, finish));
235242
}
236243
} else if (isStream(stream)) {
237244
if (isReadable(ret)) {
238245
ret.pipe(stream);
239246
} else {
240247
ret = makeAsyncIterable(ret);
248+
249+
finishCount++;
241250
pump(ret, stream, finish);
242251
}
243252
ret = stream;

test/parallel/test-stream-pipeline.js

+27
Original file line numberDiff line numberDiff line change
@@ -984,3 +984,30 @@ const { promisify } = require('util');
984984
}));
985985
src.end();
986986
}
987+
988+
{
989+
let res = '';
990+
const rs = new Readable({
991+
read() {
992+
setImmediate(() => {
993+
rs.push('hello');
994+
});
995+
}
996+
});
997+
const ws = new Writable({
998+
write: common.mustNotCall()
999+
});
1000+
pipeline(rs, async function*(stream) {
1001+
/* eslint no-unused-vars: off */
1002+
for await (const chunk of stream) {
1003+
throw new Error('kaboom');
1004+
}
1005+
}, async function *(source) {
1006+
for await (const chunk of source) {
1007+
res += chunk;
1008+
}
1009+
}, ws, common.mustCall((err) => {
1010+
assert.strictEqual(err.message, 'kaboom');
1011+
assert.strictEqual(res, '');
1012+
}));
1013+
}

0 commit comments

Comments
 (0)