Skip to content

Commit 6899094

Browse files
committed
stream: avoid pause with unpipe in buffered write
If a pipe is cleaned up (due to unpipe) during a write that returned false, the source stream can get stuck in a paused state. Fixes: #2323 PR-URL: #2325 Reviewed-By: Sakthipriyan Vairamani <thechargingvolcano@gmail.com>
1 parent 8e21309 commit 6899094

File tree

2 files changed

+51
-3
lines changed

2 files changed

+51
-3
lines changed

lib/_stream_readable.js

+13-3
Original file line numberDiff line numberDiff line change
@@ -497,6 +497,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
497497
var ondrain = pipeOnDrain(src);
498498
dest.on('drain', ondrain);
499499

500+
var cleanedUp = false;
500501
function cleanup() {
501502
debug('cleanup');
502503
// cleanup event handlers once the pipe is broken
@@ -509,6 +510,8 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
509510
src.removeListener('end', cleanup);
510511
src.removeListener('data', ondata);
511512

513+
cleanedUp = true;
514+
512515
// if the reader is waiting for a drain event from this
513516
// specific writer, then it would cause it to never start
514517
// flowing again.
@@ -524,9 +527,16 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
524527
debug('ondata');
525528
var ret = dest.write(chunk);
526529
if (false === ret) {
527-
debug('false write response, pause',
528-
src._readableState.awaitDrain);
529-
src._readableState.awaitDrain++;
530+
// If the user unpiped during `dest.write()`, it is possible
531+
// to get stuck in a permanently paused state if that write
532+
// also returned false.
533+
if (state.pipesCount === 1 &&
534+
state.pipes[0] === dest &&
535+
src.listenerCount('data') === 1 &&
536+
!cleanedUp) {
537+
debug('false write response, pause', src._readableState.awaitDrain);
538+
src._readableState.awaitDrain++;
539+
}
530540
src.pause();
531541
}
532542
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
'use strict';
2+
const common = require('../common');
3+
const assert = require('assert');
4+
const stream = require('stream');
5+
6+
const reader = new stream.Readable();
7+
const writer1 = new stream.Writable();
8+
const writer2 = new stream.Writable();
9+
10+
// 560000 is chosen here because it is larger than the (default) highWaterMark
11+
// and will cause `.write()` to return false
12+
// See: https://github.com/nodejs/node/issues/2323
13+
const buffer = new Buffer(560000);
14+
15+
reader._read = function(n) {};
16+
17+
writer1._write = common.mustCall(function(chunk, encoding, cb) {
18+
this.emit('chunk-received');
19+
cb();
20+
}, 1);
21+
writer1.once('chunk-received', function() {
22+
reader.unpipe(writer1);
23+
reader.pipe(writer2);
24+
reader.push(buffer);
25+
setImmediate(function() {
26+
reader.push(buffer);
27+
setImmediate(function() {
28+
reader.push(buffer);
29+
});
30+
});
31+
});
32+
33+
writer2._write = common.mustCall(function(chunk, encoding, cb) {
34+
cb();
35+
}, 3);
36+
37+
reader.pipe(writer1);
38+
reader.push(buffer);

0 commit comments

Comments
 (0)