Skip to content

Commit aaddf97

Browse files
mcollinatargos
authored andcommitted
stream: async iteration should work with destroyed stream
Fixes #23730. PR-URL: #23785 Reviewed-By: Gus Caplan <me@gus.host> Reviewed-By: Luigi Pinca <luigipinca@gmail.com> Reviewed-By: Matheus Marchini <mat@mmarchini.me>
1 parent 35c3c4b commit aaddf97

File tree

2 files changed

+82
-27
lines changed

2 files changed

+82
-27
lines changed

lib/internal/streams/async_iterator.js

+43-26
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
'use strict';
22

3+
const finished = require('internal/streams/end-of-stream');
4+
35
const kLastResolve = Symbol('lastResolve');
46
const kLastReject = Symbol('lastReject');
57
const kError = Symbol('error');
@@ -34,30 +36,6 @@ function onReadable(iter) {
3436
process.nextTick(readAndResolve, iter);
3537
}
3638

37-
function onEnd(iter) {
38-
const resolve = iter[kLastResolve];
39-
if (resolve !== null) {
40-
iter[kLastPromise] = null;
41-
iter[kLastResolve] = null;
42-
iter[kLastReject] = null;
43-
resolve(createIterResult(null, true));
44-
}
45-
iter[kEnded] = true;
46-
}
47-
48-
function onError(iter, err) {
49-
const reject = iter[kLastReject];
50-
// reject if we are waiting for data in the Promise
51-
// returned by next() and store the error
52-
if (reject !== null) {
53-
iter[kLastPromise] = null;
54-
iter[kLastResolve] = null;
55-
iter[kLastReject] = null;
56-
reject(err);
57-
}
58-
iter[kError] = err;
59-
}
60-
6139
function wrapForNext(lastPromise, iter) {
6240
return function(resolve, reject) {
6341
lastPromise.then(function() {
@@ -86,6 +64,22 @@ const ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf({
8664
return Promise.resolve(createIterResult(null, true));
8765
}
8866

67+
if (this[kStream].destroyed) {
68+
// We need to defer via nextTick because if .destroy(err) is
69+
// called, the error will be emitted via nextTick, and
70+
// we cannot guarantee that there is no error lingering around
71+
// waiting to be emitted.
72+
return new Promise((resolve, reject) => {
73+
process.nextTick(() => {
74+
if (this[kError]) {
75+
reject(this[kError]);
76+
} else {
77+
resolve(createIterResult(null, true));
78+
}
79+
});
80+
});
81+
}
82+
8983
// if we have multiple next() calls
9084
// we will wait for the previous Promise to finish
9185
// this logic is optimized to support for await loops,
@@ -155,9 +149,32 @@ const createReadableStreamAsyncIterator = (stream) => {
155149
},
156150
});
157151

152+
finished(stream, (err) => {
153+
if (err) {
154+
const reject = iterator[kLastReject];
155+
// reject if we are waiting for data in the Promise
156+
// returned by next() and store the error
157+
if (reject !== null) {
158+
iterator[kLastPromise] = null;
159+
iterator[kLastResolve] = null;
160+
iterator[kLastReject] = null;
161+
reject(err);
162+
}
163+
iterator[kError] = err;
164+
return;
165+
}
166+
167+
const resolve = iterator[kLastResolve];
168+
if (resolve !== null) {
169+
iterator[kLastPromise] = null;
170+
iterator[kLastResolve] = null;
171+
iterator[kLastReject] = null;
172+
resolve(createIterResult(null, true));
173+
}
174+
iterator[kEnded] = true;
175+
});
176+
158177
stream.on('readable', onReadable.bind(null, iterator));
159-
stream.on('end', onEnd.bind(null, iterator));
160-
stream.on('error', onError.bind(null, iterator));
161178

162179
return iterator;
163180
};

test/parallel/test-stream-readable-async-iterators.js

+39-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
'use strict';
22

33
const common = require('../common');
4-
const { Readable } = require('stream');
4+
const { Readable, PassThrough, pipeline } = require('stream');
55
const assert = require('assert');
66

77
async function tests() {
@@ -324,6 +324,44 @@ async function tests() {
324324

325325
assert.strictEqual(data, expected);
326326
})();
327+
328+
await (async function() {
329+
console.log('.next() on destroyed stream');
330+
const readable = new Readable({
331+
read() {
332+
// no-op
333+
}
334+
});
335+
336+
readable.destroy();
337+
338+
try {
339+
await readable[Symbol.asyncIterator]().next();
340+
} catch (e) {
341+
assert.strictEqual(e.code, 'ERR_STREAM_PREMATURE_CLOSE');
342+
}
343+
})();
344+
345+
await (async function() {
346+
console.log('.next() on pipelined stream');
347+
const readable = new Readable({
348+
read() {
349+
// no-op
350+
}
351+
});
352+
353+
const passthrough = new PassThrough();
354+
const err = new Error('kaboom');
355+
pipeline(readable, passthrough, common.mustCall((e) => {
356+
assert.strictEqual(e, err);
357+
}));
358+
readable.destroy(err);
359+
try {
360+
await readable[Symbol.asyncIterator]().next();
361+
} catch (e) {
362+
assert.strictEqual(e, err);
363+
}
364+
})();
327365
}
328366

329367
// to avoid missing some tests if a promise does not resolve

0 commit comments

Comments
 (0)