Skip to content

Commit e1edd39

Browse files
committed
stream: pipeline don't destroy Duplex src before 'finish'
pipeline was too agressive with destroying Duplex streams which were the first argument into pipeline. Just because it's !writable does not mean that it is safe to be destroyed, unless it has also emitted 'finish'. Fixes: #32955
1 parent 8a3fa32 commit e1edd39

File tree

2 files changed

+74
-6
lines changed

2 files changed

+74
-6
lines changed

lib/internal/streams/pipeline.js

+25-6
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,30 @@ function destroyer(stream, reading, writing, final, callback) {
5050
return callback();
5151
}
5252

53-
if (!err && reading && !writing && stream.writable) {
54-
return callback();
55-
}
53+
const wState = stream._writableState;
54+
55+
const writableEnded = stream.writableEnded ||
56+
(wState && wState.ended);
57+
const writableFinished = stream.writableFinished ||
58+
(wState && wState.finished);
59+
60+
const willFinish = stream.writable ||
61+
(writableEnded && !writableFinished);
62+
const willEnd = stream.readable;
5663

57-
if (err || !final || !stream.readable) {
58-
destroyImpl.destroyer(stream, err);
64+
if (!err) {
65+
// First
66+
if (reading && !writing && willFinish) {
67+
return callback();
68+
}
69+
70+
// Last
71+
if (!reading && writing && willEnd) {
72+
return callback();
73+
}
5974
}
75+
76+
destroyImpl.destroyer(stream, err);
6077
callback(err);
6178
});
6279

@@ -81,7 +98,9 @@ function destroyer(stream, reading, writing, final, callback) {
8198
.once('end', _destroy)
8299
.once('error', _destroy);
83100
} else {
84-
_destroy(err);
101+
// Do an extra tick so that 'finish' has a chance to be emitted if
102+
// first stream is Duplex.
103+
process.nextTick(_destroy, err);
85104
}
86105
});
87106

test/parallel/test-stream-pipeline.js

+49
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ const {
1313
const assert = require('assert');
1414
const http = require('http');
1515
const { promisify } = require('util');
16+
const net = require('net');
1617

1718
{
1819
let finished = false;
@@ -1118,3 +1119,51 @@ const { promisify } = require('util');
11181119
assert.strictEqual(closed, true);
11191120
}));
11201121
}
1122+
1123+
{
1124+
const server = net.createServer(common.mustCall((socket) => {
1125+
// echo server
1126+
pipeline(socket, socket, common.mustCall());
1127+
// 13 force destroys the socket before it has a chance to emit finish
1128+
socket.on('finish', common.mustCall(() => {
1129+
server.close();
1130+
}));
1131+
})).listen(0, common.mustCall(() => {
1132+
const socket = net.connect(server.address().port);
1133+
socket.end();
1134+
}));
1135+
}
1136+
1137+
{
1138+
const d = new Duplex({
1139+
autoDestroy: false,
1140+
write: common.mustCall((data, enc, cb) => {
1141+
d.push(data);
1142+
cb();
1143+
}),
1144+
read: common.mustCall(() => {
1145+
d.push(null);
1146+
}),
1147+
final: common.mustCall((cb) => {
1148+
setTimeout(() => {
1149+
assert.strictEqual(d.destroyed, false);
1150+
cb();
1151+
}, 1000);
1152+
}),
1153+
// `destroy()` won't be invoked by pipeline since
1154+
// the writable side has not completed when
1155+
// the pipeline has completed.
1156+
destroy: common.mustNotCall()
1157+
});
1158+
1159+
const sink = new Writable({
1160+
write: common.mustCall((data, enc, cb) => {
1161+
cb();
1162+
})
1163+
});
1164+
1165+
pipeline(d, sink, common.mustCall());
1166+
1167+
d.write('test');
1168+
d.end();
1169+
}

0 commit comments

Comments
 (0)