Skip to content

Commit 45032a3

Browse files
ronagtargos
authored andcommitted
stream: fix stream.finished on Duplex
finished would incorrectly believe that a Duplex is already closed if either the readable or writable side has completed. Fixes: #33130 PR-URL: #33133 Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
1 parent d39254a commit 45032a3

File tree

2 files changed

+97
-5
lines changed

2 files changed

+97
-5
lines changed

lib/internal/streams/end-of-stream.js

+12-4
Original file line numberDiff line numberDiff line change
@@ -147,17 +147,25 @@ function eos(stream, opts, callback) {
147147
if (opts.error !== false) stream.on('error', onerror);
148148
stream.on('close', onclose);
149149

150-
const closed = (wState && wState.closed) || (rState && rState.closed) ||
151-
(wState && wState.errorEmitted) || (rState && rState.errorEmitted) ||
152-
(wState && wState.finished) || (rState && rState.endEmitted) ||
153-
(rState && stream.req && stream.aborted);
150+
const closed = (
151+
(wState && wState.closed) ||
152+
(rState && rState.closed) ||
153+
(wState && wState.errorEmitted) ||
154+
(rState && rState.errorEmitted) ||
155+
(rState && stream.req && stream.aborted) ||
156+
(
157+
(!writable || (wState && wState.finished)) &&
158+
(!readable || (rState && rState.endEmitted))
159+
)
160+
);
154161

155162
if (closed) {
156163
// TODO(ronag): Re-throw error if errorEmitted?
157164
// TODO(ronag): Throw premature close as if finished was called?
158165
// before being closed? i.e. if closed but not errored, ended or finished.
159166
// TODO(ronag): Throw some kind of error? Does it make sense
160167
// to call finished() on a "finished" stream?
168+
// TODO(ronag): willEmitClose?
161169
process.nextTick(() => {
162170
callback();
163171
});

test/parallel/test-stream-finished.js

+85-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,14 @@
11
'use strict';
22

33
const common = require('../common');
4-
const { Writable, Readable, Transform, finished, Duplex } = require('stream');
4+
const {
5+
Writable,
6+
Readable,
7+
Transform,
8+
finished,
9+
Duplex,
10+
PassThrough
11+
} = require('stream');
512
const assert = require('assert');
613
const EE = require('events');
714
const fs = require('fs');
@@ -396,3 +403,80 @@ testClosed((opts) => new Writable({ write() {}, ...opts }));
396403
r.destroyed = true;
397404
r.push(null);
398405
}
406+
407+
{
408+
// Regression https://github.com/nodejs/node/issues/33130
409+
const response = new PassThrough();
410+
411+
class HelloWorld extends Duplex {
412+
constructor(response) {
413+
super({
414+
autoDestroy: false
415+
});
416+
417+
this.response = response;
418+
this.readMore = false;
419+
420+
response.once('end', () => {
421+
this.push(null);
422+
});
423+
424+
response.on('readable', () => {
425+
if (this.readMore) {
426+
this._read();
427+
}
428+
});
429+
}
430+
431+
_read() {
432+
const { response } = this;
433+
434+
this.readMore = true;
435+
436+
if (response.readableLength) {
437+
this.readMore = false;
438+
}
439+
440+
let data;
441+
while ((data = response.read()) !== null) {
442+
this.push(data);
443+
}
444+
}
445+
}
446+
447+
const instance = new HelloWorld(response);
448+
instance.setEncoding('utf8');
449+
instance.end();
450+
451+
(async () => {
452+
await EE.once(instance, 'finish');
453+
454+
setImmediate(() => {
455+
response.write('chunk 1');
456+
response.write('chunk 2');
457+
response.write('chunk 3');
458+
response.end();
459+
});
460+
461+
let res = '';
462+
for await (const data of instance) {
463+
res += data;
464+
}
465+
466+
assert.strictEqual(res, 'chunk 1chunk 2chunk 3');
467+
})().then(common.mustCall());
468+
}
469+
470+
{
471+
const p = new PassThrough();
472+
p.end();
473+
finished(p, common.mustNotCall());
474+
}
475+
476+
{
477+
const p = new PassThrough();
478+
p.end();
479+
p.on('finish', common.mustCall(() => {
480+
finished(p, common.mustNotCall());
481+
}));
482+
}

0 commit comments

Comments
 (0)