Skip to content

Commit b809fa8

Browse files
mcollinatargos
authored andcommitted
stream: make async iterator .next() always resolve
See: nodejs/readable-stream#387 PR-URL: #24668 Reviewed-By: Gus Caplan <me@gus.host> Reviewed-By: Franziska Hinkelmann <franziska.hinkelmann@gmail.com> Reviewed-By: Joyee Cheung <joyeec9h3@gmail.com> Reviewed-By: Ruben Bridgewater <ruben@bridgewater.de>
1 parent 61e0103 commit b809fa8

File tree

2 files changed

+75
-5
lines changed

2 files changed

+75
-5
lines changed

lib/internal/streams/async_iterator.js

+10-5
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@ function onReadable(iter) {
3939
function wrapForNext(lastPromise, iter) {
4040
return (resolve, reject) => {
4141
lastPromise.then(() => {
42+
if (iter[kEnded]) {
43+
resolve(createIterResult(undefined, true));
44+
return;
45+
}
46+
4247
iter[kHandlePromise](resolve, reject);
4348
}, reject);
4449
};
@@ -61,7 +66,7 @@ const ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf({
6166
}
6267

6368
if (this[kEnded]) {
64-
return Promise.resolve(createIterResult(null, true));
69+
return Promise.resolve(createIterResult(undefined, true));
6570
}
6671

6772
if (this[kStream].destroyed) {
@@ -74,7 +79,7 @@ const ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf({
7479
if (this[kError]) {
7580
reject(this[kError]);
7681
} else {
77-
resolve(createIterResult(null, true));
82+
resolve(createIterResult(undefined, true));
7883
}
7984
});
8085
});
@@ -115,7 +120,7 @@ const ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf({
115120
reject(err);
116121
return;
117122
}
118-
resolve(createIterResult(null, true));
123+
resolve(createIterResult(undefined, true));
119124
});
120125
});
121126
},
@@ -131,7 +136,6 @@ const createReadableStreamAsyncIterator = (stream) => {
131136
value: stream._readableState.endEmitted,
132137
writable: true
133138
},
134-
[kLastPromise]: { value: null, writable: true },
135139
// the function passed to new Promise
136140
// is cached so we avoid allocating a new
137141
// closure at every run
@@ -151,6 +155,7 @@ const createReadableStreamAsyncIterator = (stream) => {
151155
writable: true,
152156
},
153157
});
158+
iterator[kLastPromise] = null;
154159

155160
finished(stream, (err) => {
156161
if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
@@ -172,7 +177,7 @@ const createReadableStreamAsyncIterator = (stream) => {
172177
iterator[kLastPromise] = null;
173178
iterator[kLastResolve] = null;
174179
iterator[kLastReject] = null;
175-
resolve(createIterResult(null, true));
180+
resolve(createIterResult(undefined, true));
176181
}
177182
iterator[kEnded] = true;
178183
});

test/parallel/test-stream-readable-async-iterators.js

+65
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,71 @@ async function tests() {
393393
r.destroy(null);
394394
}
395395
})();
396+
397+
await (async () => {
398+
console.log('all next promises must be resolved on end');
399+
const r = new Readable({
400+
objectMode: true,
401+
read() {
402+
}
403+
});
404+
405+
const b = r[Symbol.asyncIterator]();
406+
const c = b.next();
407+
const d = b.next();
408+
r.push(null);
409+
assert.deepStrictEqual(await c, { done: true, value: undefined });
410+
assert.deepStrictEqual(await d, { done: true, value: undefined });
411+
})();
412+
413+
await (async () => {
414+
console.log('all next promises must be resolved on destroy');
415+
const r = new Readable({
416+
objectMode: true,
417+
read() {
418+
}
419+
});
420+
421+
const b = r[Symbol.asyncIterator]();
422+
const c = b.next();
423+
const d = b.next();
424+
r.destroy();
425+
assert.deepStrictEqual(await c, { done: true, value: undefined });
426+
assert.deepStrictEqual(await d, { done: true, value: undefined });
427+
})();
428+
429+
await (async () => {
430+
console.log('all next promises must be resolved on destroy with error');
431+
const r = new Readable({
432+
objectMode: true,
433+
read() {
434+
}
435+
});
436+
437+
const b = r[Symbol.asyncIterator]();
438+
const c = b.next();
439+
const d = b.next();
440+
const err = new Error('kaboom');
441+
r.destroy(err);
442+
443+
await Promise.all([(async () => {
444+
let e;
445+
try {
446+
await c;
447+
} catch (_e) {
448+
e = _e;
449+
}
450+
assert.strictEqual(e, err);
451+
})(), (async () => {
452+
let e;
453+
try {
454+
await d;
455+
} catch (_e) {
456+
e = _e;
457+
}
458+
assert.strictEqual(e, err);
459+
})()]);
460+
})();
396461
}
397462

398463
// to avoid missing some tests if a promise does not resolve

0 commit comments

Comments
 (0)