Skip to content

Commit d4ceb59

Browse files
zaide-chrisevanlucas
authored andcommitted
stream: Fixes missing 'unpipe' event
Currently when the destination emits an 'error', 'finish' or 'close' event the pipe calls unpipe to emit 'unpipe' and trigger the clean up of all it's listeners. When the source emits an 'end' event without {end: false} it calls end() on the destination leading it to emit a 'close', this will again lead to the pipe calling unpipe. However the source emitting an 'end' event along side {end: false} is the only time the cleanup gets ran directly without unpipe being called. This fixes that so the 'unpipe' event does get emitted and cleanup in turn gets ran by that event. Fixes: #11837 PR-URL: #11876 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Colin Ihrig <cjihrig@gmail.com>
1 parent 395380a commit d4ceb59

File tree

2 files changed

+89
-2
lines changed

2 files changed

+89
-2
lines changed

lib/_stream_readable.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -494,7 +494,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
494494
dest !== process.stdout &&
495495
dest !== process.stderr;
496496

497-
var endFn = doEnd ? onend : cleanup;
497+
var endFn = doEnd ? onend : unpipe;
498498
if (state.endEmitted)
499499
process.nextTick(endFn);
500500
else
@@ -530,7 +530,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
530530
dest.removeListener('error', onerror);
531531
dest.removeListener('unpipe', onunpipe);
532532
src.removeListener('end', onend);
533-
src.removeListener('end', cleanup);
533+
src.removeListener('end', unpipe);
534534
src.removeListener('data', ondata);
535535

536536
cleanedUp = true;
+87
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
'use strict';
2+
const common = require('../common');
3+
const assert = require('assert');
4+
const {Writable, Readable} = require('stream');
5+
class NullWriteable extends Writable {
6+
_write(chunk, encoding, callback) {
7+
return callback();
8+
}
9+
}
10+
class QuickEndReadable extends Readable {
11+
_read() {
12+
this.push(null);
13+
}
14+
}
15+
class NeverEndReadable extends Readable {
16+
_read() {}
17+
}
18+
19+
function noop() {}
20+
21+
{
22+
const dest = new NullWriteable();
23+
const src = new QuickEndReadable();
24+
dest.on('pipe', common.mustCall(noop));
25+
dest.on('unpipe', common.mustCall(noop));
26+
src.pipe(dest);
27+
setImmediate(() => {
28+
assert.strictEqual(src._readableState.pipesCount, 0);
29+
});
30+
}
31+
32+
{
33+
const dest = new NullWriteable();
34+
const src = new NeverEndReadable();
35+
dest.on('pipe', common.mustCall(noop));
36+
dest.on('unpipe', common.mustNotCall('unpipe should not have been emitted'));
37+
src.pipe(dest);
38+
setImmediate(() => {
39+
assert.strictEqual(src._readableState.pipesCount, 1);
40+
});
41+
}
42+
43+
{
44+
const dest = new NullWriteable();
45+
const src = new NeverEndReadable();
46+
dest.on('pipe', common.mustCall(noop));
47+
dest.on('unpipe', common.mustCall(noop));
48+
src.pipe(dest);
49+
src.unpipe(dest);
50+
setImmediate(() => {
51+
assert.strictEqual(src._readableState.pipesCount, 0);
52+
});
53+
}
54+
55+
{
56+
const dest = new NullWriteable();
57+
const src = new QuickEndReadable();
58+
dest.on('pipe', common.mustCall(noop));
59+
dest.on('unpipe', common.mustCall(noop));
60+
src.pipe(dest, {end: false});
61+
setImmediate(() => {
62+
assert.strictEqual(src._readableState.pipesCount, 0);
63+
});
64+
}
65+
66+
{
67+
const dest = new NullWriteable();
68+
const src = new NeverEndReadable();
69+
dest.on('pipe', common.mustCall(noop));
70+
dest.on('unpipe', common.mustNotCall('unpipe should not have been emitted'));
71+
src.pipe(dest, {end: false});
72+
setImmediate(() => {
73+
assert.strictEqual(src._readableState.pipesCount, 1);
74+
});
75+
}
76+
77+
{
78+
const dest = new NullWriteable();
79+
const src = new NeverEndReadable();
80+
dest.on('pipe', common.mustCall(noop));
81+
dest.on('unpipe', common.mustCall(noop));
82+
src.pipe(dest, {end: false});
83+
src.unpipe(dest);
84+
setImmediate(() => {
85+
assert.strictEqual(src._readableState.pipesCount, 0);
86+
});
87+
}

0 commit comments

Comments
 (0)