Skip to content

Commit 36a4f54

Browse files
vadzimBethGriggs
authored andcommitted
stream: close iterator in Readable.from
Call iterator.return() if not all of its values are consumed. Fixes: #32842 PR-URL: #32844 Reviewed-By: Robert Nagy <ronagy@icloud.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Zeyu Yang <himself65@outlook.com>
1 parent 7f49812 commit 36a4f54

File tree

2 files changed

+229
-1
lines changed

2 files changed

+229
-1
lines changed

lib/internal/streams/from.js

+31-1
Original file line numberDiff line numberDiff line change
@@ -34,21 +34,51 @@ function from(Readable, iterable, opts) {
3434
objectMode: true,
3535
...opts
3636
});
37+
3738
// Reading boolean to protect against _read
3839
// being called before last iteration completion.
3940
let reading = false;
41+
42+
// needToClose boolean if iterator needs to be explicitly closed
43+
let needToClose = false;
44+
4045
readable._read = function() {
4146
if (!reading) {
4247
reading = true;
4348
next();
4449
}
4550
};
51+
52+
readable._destroy = function(error, cb) {
53+
if (needToClose) {
54+
needToClose = false;
55+
close().then(
56+
() => process.nextTick(cb, error),
57+
(e) => process.nextTick(cb, error || e),
58+
);
59+
} else {
60+
cb(error);
61+
}
62+
};
63+
64+
async function close() {
65+
if (typeof iterator.return === 'function') {
66+
const { value } = await iterator.return();
67+
await value;
68+
}
69+
}
70+
4671
async function next() {
4772
try {
73+
needToClose = false;
4874
const { value, done } = await iterator.next();
75+
needToClose = !done;
76+
const resolved = await value;
4977
if (done) {
5078
readable.push(null);
51-
} else if (readable.push(await value)) {
79+
} else if (readable.destroyed) {
80+
await close();
81+
} else if (readable.push(resolved)) {
5282
next();
5383
} else {
5484
reading = false;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
'use strict';
2+
3+
const { mustCall, mustNotCall } = require('../common');
4+
const { Readable } = require('stream');
5+
const { strictEqual } = require('assert');
6+
7+
async function asyncSupport() {
8+
const finallyMustCall = mustCall();
9+
const bodyMustCall = mustCall();
10+
11+
async function* infiniteGenerate() {
12+
try {
13+
while (true) yield 'a';
14+
} finally {
15+
finallyMustCall();
16+
}
17+
}
18+
19+
const stream = Readable.from(infiniteGenerate());
20+
21+
for await (const chunk of stream) {
22+
bodyMustCall();
23+
strictEqual(chunk, 'a');
24+
break;
25+
}
26+
}
27+
28+
async function syncSupport() {
29+
const finallyMustCall = mustCall();
30+
const bodyMustCall = mustCall();
31+
32+
function* infiniteGenerate() {
33+
try {
34+
while (true) yield 'a';
35+
} finally {
36+
finallyMustCall();
37+
}
38+
}
39+
40+
const stream = Readable.from(infiniteGenerate());
41+
42+
for await (const chunk of stream) {
43+
bodyMustCall();
44+
strictEqual(chunk, 'a');
45+
break;
46+
}
47+
}
48+
49+
async function syncPromiseSupport() {
50+
const returnMustBeAwaited = mustCall();
51+
const bodyMustCall = mustCall();
52+
53+
function* infiniteGenerate() {
54+
try {
55+
while (true) yield Promise.resolve('a');
56+
} finally {
57+
// eslint-disable-next-line no-unsafe-finally
58+
return { then(cb) {
59+
returnMustBeAwaited();
60+
cb();
61+
} };
62+
}
63+
}
64+
65+
const stream = Readable.from(infiniteGenerate());
66+
67+
for await (const chunk of stream) {
68+
bodyMustCall();
69+
strictEqual(chunk, 'a');
70+
break;
71+
}
72+
}
73+
74+
async function syncRejectedSupport() {
75+
const returnMustBeAwaited = mustCall();
76+
const bodyMustNotCall = mustNotCall();
77+
const catchMustCall = mustCall();
78+
const secondNextMustNotCall = mustNotCall();
79+
80+
function* generate() {
81+
try {
82+
yield Promise.reject('a');
83+
secondNextMustNotCall();
84+
} finally {
85+
// eslint-disable-next-line no-unsafe-finally
86+
return { then(cb) {
87+
returnMustBeAwaited();
88+
cb();
89+
} };
90+
}
91+
}
92+
93+
const stream = Readable.from(generate());
94+
95+
try {
96+
for await (const chunk of stream) {
97+
bodyMustNotCall(chunk);
98+
}
99+
} catch {
100+
catchMustCall();
101+
}
102+
}
103+
104+
async function noReturnAfterThrow() {
105+
const returnMustNotCall = mustNotCall();
106+
const bodyMustNotCall = mustNotCall();
107+
const catchMustCall = mustCall();
108+
const nextMustCall = mustCall();
109+
110+
const stream = Readable.from({
111+
[Symbol.asyncIterator]() { return this; },
112+
async next() {
113+
nextMustCall();
114+
throw new Error('a');
115+
},
116+
async return() {
117+
returnMustNotCall();
118+
return { done: true };
119+
},
120+
});
121+
122+
try {
123+
for await (const chunk of stream) {
124+
bodyMustNotCall(chunk);
125+
}
126+
} catch {
127+
catchMustCall();
128+
}
129+
}
130+
131+
async function closeStreamWhileNextIsPending() {
132+
const finallyMustCall = mustCall();
133+
const dataMustCall = mustCall();
134+
135+
let resolveDestroy;
136+
const destroyed =
137+
new Promise((resolve) => { resolveDestroy = mustCall(resolve); });
138+
let resolveYielded;
139+
const yielded =
140+
new Promise((resolve) => { resolveYielded = mustCall(resolve); });
141+
142+
async function* infiniteGenerate() {
143+
try {
144+
while (true) {
145+
yield 'a';
146+
resolveYielded();
147+
await destroyed;
148+
}
149+
} finally {
150+
finallyMustCall();
151+
}
152+
}
153+
154+
const stream = Readable.from(infiniteGenerate());
155+
156+
stream.on('data', (data) => {
157+
dataMustCall();
158+
strictEqual(data, 'a');
159+
});
160+
161+
yielded.then(() => {
162+
stream.destroy();
163+
resolveDestroy();
164+
});
165+
}
166+
167+
async function closeAfterNullYielded() {
168+
const finallyMustCall = mustCall();
169+
const dataMustCall = mustCall(3);
170+
171+
function* infiniteGenerate() {
172+
try {
173+
yield 'a';
174+
yield 'a';
175+
yield 'a';
176+
while (true) yield null;
177+
} finally {
178+
finallyMustCall();
179+
}
180+
}
181+
182+
const stream = Readable.from(infiniteGenerate());
183+
184+
stream.on('data', (chunk) => {
185+
dataMustCall();
186+
strictEqual(chunk, 'a');
187+
});
188+
}
189+
190+
Promise.all([
191+
asyncSupport(),
192+
syncSupport(),
193+
syncPromiseSupport(),
194+
syncRejectedSupport(),
195+
noReturnAfterThrow(),
196+
closeStreamWhileNextIsPending(),
197+
closeAfterNullYielded(),
198+
]).then(mustCall());

0 commit comments

Comments
 (0)