Skip to content

Commit 180b935

Browse files
ronagBethGriggs
authored andcommitted
stream: pipeline should only destroy un-finished streams
This PR logically reverts #31940 which has caused lots of unnecessary breakage in the ecosystem. This PR also aligns better with the actual documented behavior: `stream.pipeline()` will call `stream.destroy(err)` on all streams except: * `Readable` streams which have emitted `'end'` or `'close'`. * `Writable` streams which have emitted `'finish'` or `'close'`. The behavior introduced in #31940 was much more aggressive in terms of destroying streams. This was good for avoiding potential resources leaks however breaks some common assumputions in legacy streams. Furthermore, it makes the code simpler and removes some hacks. Fixes: #32954 Fixes: #32955 PR-URL: #32968 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Mathias Buus <mathiasbuus@gmail.com>
1 parent bbed1e5 commit 180b935

File tree

2 files changed

+112
-39
lines changed

2 files changed

+112
-39
lines changed

lib/internal/streams/pipeline.js

+18-38
Original file line numberDiff line numberDiff line change
@@ -25,43 +25,18 @@ let EE;
2525
let PassThrough;
2626
let createReadableStreamAsyncIterator;
2727

28-
function isIncoming(stream) {
29-
return (
30-
stream.socket &&
31-
typeof stream.complete === 'boolean' &&
32-
ArrayIsArray(stream.rawTrailers) &&
33-
ArrayIsArray(stream.rawHeaders)
34-
);
35-
}
36-
37-
function isOutgoing(stream) {
38-
return (
39-
stream.socket &&
40-
typeof stream.setHeader === 'function'
41-
);
42-
}
43-
44-
function destroyer(stream, reading, writing, final, callback) {
45-
const _destroy = once((err) => {
46-
if (!err && (isIncoming(stream) || isOutgoing(stream))) {
47-
// http/1 request objects have a coupling to their response and should
48-
// not be prematurely destroyed. Assume they will handle their own
49-
// lifecycle.
50-
return callback();
51-
}
28+
function destroyer(stream, reading, writing, callback) {
29+
callback = once(callback);
5230

53-
if (!err && reading && !writing && stream.writable) {
54-
return callback();
55-
}
56-
57-
if (err || !final || !stream.readable) {
58-
destroyImpl.destroyer(stream, err);
59-
}
60-
callback(err);
31+
let finished = false;
32+
stream.on('close', () => {
33+
finished = true;
6134
});
6235

6336
if (eos === undefined) eos = require('internal/streams/end-of-stream');
6437
eos(stream, { readable: reading, writable: writing }, (err) => {
38+
finished = !err;
39+
6540
const rState = stream._readableState;
6641
if (
6742
err &&
@@ -78,14 +53,19 @@ function destroyer(stream, reading, writing, final, callback) {
7853
// eos will only fail with premature close on the reading side for
7954
// duplex streams.
8055
stream
81-
.once('end', _destroy)
82-
.once('error', _destroy);
56+
.once('end', callback)
57+
.once('error', callback);
8358
} else {
84-
_destroy(err);
59+
callback(err);
8560
}
8661
});
8762

88-
return (err) => _destroy(err || new ERR_STREAM_DESTROYED('pipe'));
63+
return (err) => {
64+
if (finished) return;
65+
finished = true;
66+
destroyImpl.destroyer(stream, err);
67+
callback(err || new ERR_STREAM_DESTROYED('pipe'));
68+
};
8969
}
9070

9171
function popCallback(streams) {
@@ -204,7 +184,7 @@ function pipeline(...streams) {
204184

205185
if (isStream(stream)) {
206186
finishCount++;
207-
destroys.push(destroyer(stream, reading, writing, !reading, finish));
187+
destroys.push(destroyer(stream, reading, writing, finish));
208188
}
209189

210190
if (i === 0) {
@@ -262,7 +242,7 @@ function pipeline(...streams) {
262242
ret = pt;
263243

264244
finishCount++;
265-
destroys.push(destroyer(ret, false, true, true, finish));
245+
destroys.push(destroyer(ret, false, true, finish));
266246
}
267247
} else if (isStream(stream)) {
268248
if (isReadable(ret)) {

test/parallel/test-stream-pipeline.js

+94-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ const {
1313
const assert = require('assert');
1414
const http = require('http');
1515
const { promisify } = require('util');
16+
const net = require('net');
1617

1718
{
1819
let finished = false;
@@ -916,7 +917,7 @@ const { promisify } = require('util');
916917
const src = new PassThrough({ autoDestroy: false });
917918
const dst = new PassThrough({ autoDestroy: false });
918919
pipeline(src, dst, common.mustCall(() => {
919-
assert.strictEqual(src.destroyed, true);
920+
assert.strictEqual(src.destroyed, false);
920921
assert.strictEqual(dst.destroyed, false);
921922
}));
922923
src.end();
@@ -1118,3 +1119,95 @@ const { promisify } = require('util');
11181119
assert.strictEqual(closed, true);
11191120
}));
11201121
}
1122+
1123+
{
1124+
const server = net.createServer(common.mustCall((socket) => {
1125+
// echo server
1126+
pipeline(socket, socket, common.mustCall());
1127+
// 13 force destroys the socket before it has a chance to emit finish
1128+
socket.on('finish', common.mustCall(() => {
1129+
server.close();
1130+
}));
1131+
})).listen(0, common.mustCall(() => {
1132+
const socket = net.connect(server.address().port);
1133+
socket.end();
1134+
}));
1135+
}
1136+
1137+
{
1138+
const d = new Duplex({
1139+
autoDestroy: false,
1140+
write: common.mustCall((data, enc, cb) => {
1141+
d.push(data);
1142+
cb();
1143+
}),
1144+
read: common.mustCall(() => {
1145+
d.push(null);
1146+
}),
1147+
final: common.mustCall((cb) => {
1148+
setTimeout(() => {
1149+
assert.strictEqual(d.destroyed, false);
1150+
cb();
1151+
}, 1000);
1152+
}),
1153+
destroy: common.mustNotCall()
1154+
});
1155+
1156+
const sink = new Writable({
1157+
write: common.mustCall((data, enc, cb) => {
1158+
cb();
1159+
})
1160+
});
1161+
1162+
pipeline(d, sink, common.mustCall());
1163+
1164+
d.write('test');
1165+
d.end();
1166+
}
1167+
1168+
{
1169+
const server = net.createServer(common.mustCall((socket) => {
1170+
// echo server
1171+
pipeline(socket, socket, common.mustCall());
1172+
socket.on('finish', common.mustCall(() => {
1173+
server.close();
1174+
}));
1175+
})).listen(0, common.mustCall(() => {
1176+
const socket = net.connect(server.address().port);
1177+
socket.end();
1178+
}));
1179+
}
1180+
1181+
{
1182+
const d = new Duplex({
1183+
autoDestroy: false,
1184+
write: common.mustCall((data, enc, cb) => {
1185+
d.push(data);
1186+
cb();
1187+
}),
1188+
read: common.mustCall(() => {
1189+
d.push(null);
1190+
}),
1191+
final: common.mustCall((cb) => {
1192+
setTimeout(() => {
1193+
assert.strictEqual(d.destroyed, false);
1194+
cb();
1195+
}, 1000);
1196+
}),
1197+
// `destroy()` won't be invoked by pipeline since
1198+
// the writable side has not completed when
1199+
// the pipeline has completed.
1200+
destroy: common.mustNotCall()
1201+
});
1202+
1203+
const sink = new Writable({
1204+
write: common.mustCall((data, enc, cb) => {
1205+
cb();
1206+
})
1207+
});
1208+
1209+
pipeline(d, sink, common.mustCall());
1210+
1211+
d.write('test');
1212+
d.end();
1213+
}

0 commit comments

Comments
 (0)