Skip to content

Commit 4793f16

Browse files
ronagtargos
authored andcommitted
stream: fix pipe deadlock when starting with needDrain
Fixes: #36544 PR-URL: #36563 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
1 parent 617f2dc commit 4793f16

File tree

2 files changed

+35
-28
lines changed

2 files changed

+35
-28
lines changed

lib/internal/streams/readable.js

+28-24
Original file line numberDiff line numberDiff line change
@@ -713,35 +713,39 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
713713
ondrain();
714714
}
715715

716+
function pause() {
717+
// If the user unpiped during `dest.write()`, it is possible
718+
// to get stuck in a permanently paused state if that write
719+
// also returned false.
720+
// => Check whether `dest` is still a piping destination.
721+
if (!cleanedUp) {
722+
if (state.pipes.length === 1 && state.pipes[0] === dest) {
723+
debug('false write response, pause', 0);
724+
state.awaitDrainWriters = dest;
725+
state.multiAwaitDrain = false;
726+
} else if (state.pipes.length > 1 && state.pipes.includes(dest)) {
727+
debug('false write response, pause', state.awaitDrainWriters.size);
728+
state.awaitDrainWriters.add(dest);
729+
}
730+
src.pause();
731+
}
732+
if (!ondrain) {
733+
// When the dest drains, it reduces the awaitDrain counter
734+
// on the source. This would be more elegant with a .once()
735+
// handler in flow(), but adding and removing repeatedly is
736+
// too slow.
737+
ondrain = pipeOnDrain(src, dest);
738+
dest.on('drain', ondrain);
739+
}
740+
}
741+
716742
src.on('data', ondata);
717743
function ondata(chunk) {
718744
debug('ondata');
719745
const ret = dest.write(chunk);
720746
debug('dest.write', ret);
721747
if (ret === false) {
722-
// If the user unpiped during `dest.write()`, it is possible
723-
// to get stuck in a permanently paused state if that write
724-
// also returned false.
725-
// => Check whether `dest` is still a piping destination.
726-
if (!cleanedUp) {
727-
if (state.pipes.length === 1 && state.pipes[0] === dest) {
728-
debug('false write response, pause', 0);
729-
state.awaitDrainWriters = dest;
730-
state.multiAwaitDrain = false;
731-
} else if (state.pipes.length > 1 && state.pipes.includes(dest)) {
732-
debug('false write response, pause', state.awaitDrainWriters.size);
733-
state.awaitDrainWriters.add(dest);
734-
}
735-
src.pause();
736-
}
737-
if (!ondrain) {
738-
// When the dest drains, it reduces the awaitDrain counter
739-
// on the source. This would be more elegant with a .once()
740-
// handler in flow(), but adding and removing repeatedly is
741-
// too slow.
742-
ondrain = pipeOnDrain(src, dest);
743-
dest.on('drain', ondrain);
744-
}
748+
pause();
745749
}
746750
}
747751

@@ -790,7 +794,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
790794

791795
if (dest.writableNeedDrain === true) {
792796
if (state.flowing) {
793-
src.pause();
797+
pause();
794798
}
795799
} else if (!state.flowing) {
796800
debug('pipe resume');

test/parallel/test-stream-pipe-needDrain.js

+7-4
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,13 @@ const assert = require('assert');
55
const Readable = require('_stream_readable');
66
const Writable = require('_stream_writable');
77

8-
// Pipe should not continue writing if writable needs drain.
8+
// Pipe should pause temporarily if writable needs drain.
99
{
1010
const w = new Writable({
1111
write(buf, encoding, callback) {
12-
13-
}
12+
process.nextTick(callback);
13+
},
14+
highWaterMark: 1
1415
});
1516

1617
while (w.write('asd'));
@@ -20,10 +21,12 @@ const Writable = require('_stream_writable');
2021
const r = new Readable({
2122
read() {
2223
this.push('asd');
24+
this.push(null);
2325
}
2426
});
2527

26-
w.write = common.mustNotCall();
28+
r.on('pause', common.mustCall(2));
29+
r.on('end', common.mustCall());
2730

2831
r.pipe(w);
2932
}

0 commit comments

Comments
 (0)