Skip to content

Commit 1fea051

Browse files
benjamingrdanielleadams
authored andcommitted
stream: improve Readable.from error handling
PR-URL: #37158 Reviewed-By: Robert Nagy <ronagy@icloud.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
1 parent e63b380 commit 1fea051

File tree

2 files changed

+38
-18
lines changed

2 files changed

+38
-18
lines changed

lib/internal/streams/from.js

+16-17
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
'use strict';
22

33
const {
4+
PromisePrototypeThen,
45
SymbolAsyncIterator,
56
SymbolIterator
67
} = primordials;
@@ -42,9 +43,6 @@ function from(Readable, iterable, opts) {
4243
// being called before last iteration completion.
4344
let reading = false;
4445

45-
// Flag for when iterator needs to be explicitly closed.
46-
let needToClose = false;
47-
4846
readable._read = function() {
4947
if (!reading) {
5048
reading = true;
@@ -53,18 +51,23 @@ function from(Readable, iterable, opts) {
5351
};
5452

5553
readable._destroy = function(error, cb) {
56-
if (needToClose) {
57-
needToClose = false;
58-
close().then(
59-
() => process.nextTick(cb, error),
60-
(e) => process.nextTick(cb, error || e),
61-
);
62-
} else {
63-
cb(error);
64-
}
54+
PromisePrototypeThen(
55+
close(error),
56+
() => process.nextTick(cb, error), // nextTick is here in case cb throws
57+
(e) => process.nextTick(cb, e || error),
58+
);
6559
};
6660

67-
async function close() {
61+
async function close(error) {
62+
const hadError = (error !== undefined) && (error !== null);
63+
const hasThrow = typeof iterator.throw === 'function';
64+
if (hadError && hasThrow) {
65+
const { value, done } = await iterator.throw(error);
66+
await value;
67+
if (done) {
68+
return;
69+
}
70+
}
6871
if (typeof iterator.return === 'function') {
6972
const { value } = await iterator.return();
7073
await value;
@@ -73,13 +76,9 @@ function from(Readable, iterable, opts) {
7376

7477
async function next() {
7578
try {
76-
needToClose = false;
7779
const { value, done } = await iterator.next();
78-
needToClose = !done;
7980
if (done) {
8081
readable.push(null);
81-
} else if (readable.destroyed) {
82-
await close();
8382
} else {
8483
const res = await value;
8584
if (res === null) {

test/parallel/test-readable-from.js

+22-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ const { mustCall } = require('../common');
44
const { once } = require('events');
55
const { Readable } = require('stream');
66
const { strictEqual, throws } = require('assert');
7+
const common = require('../common');
78

89
{
910
throws(() => {
@@ -187,6 +188,25 @@ async function endWithError() {
187188
}
188189
}
189190

191+
async function destroyingStreamWithErrorThrowsInGenerator() {
192+
const validateError = common.mustCall((e) => {
193+
strictEqual(e, 'Boum');
194+
});
195+
async function* generate() {
196+
try {
197+
yield 1;
198+
yield 2;
199+
yield 3;
200+
throw new Error();
201+
} catch (e) {
202+
validateError(e);
203+
}
204+
}
205+
const stream = Readable.from(generate());
206+
stream.read();
207+
stream.once('error', common.mustCall());
208+
stream.destroy('Boum');
209+
}
190210

191211
Promise.all([
192212
toReadableBasicSupport(),
@@ -198,5 +218,6 @@ Promise.all([
198218
toReadableOnDataNonObject(),
199219
destroysTheStreamWhenThrowing(),
200220
asTransformStream(),
201-
endWithError()
221+
endWithError(),
222+
destroyingStreamWithErrorThrowsInGenerator(),
202223
]).then(mustCall());

0 commit comments

Comments
 (0)