Skip to content

Commit 5a55b1d

Browse files
committed
stream: ensure pipeline always destroys streams
There was an edge case where an incorrect assumption was made in regardos whether eos/finished means that the stream is actually destroyed or not.
1 parent db28739 commit 5a55b1d

File tree

2 files changed

+19
-13
lines changed

2 files changed

+19
-13
lines changed

lib/internal/streams/pipeline.js

+5-12
Original file line numberDiff line numberDiff line change
@@ -27,27 +27,20 @@ let createReadableStreamAsyncIterator;
2727

2828
function destroyer(stream, reading, writing, callback) {
2929
callback = once(callback);
30-
31-
let closed = false;
32-
stream.on('close', () => {
33-
closed = true;
34-
});
30+
let destroyed = false;
3531

3632
if (eos === undefined) eos = require('internal/streams/end-of-stream');
3733
eos(stream, { readable: reading, writable: writing }, (err) => {
38-
if (err) return callback(err);
39-
closed = true;
40-
callback();
34+
if (destroyed) return;
35+
destroyed = true;
36+
destroyImpl.destroyer(stream, err);
37+
callback(err);
4138
});
4239

43-
let destroyed = false;
4440
return (err) => {
45-
if (closed) return;
4641
if (destroyed) return;
4742
destroyed = true;
48-
4943
destroyImpl.destroyer(stream, err);
50-
5144
callback(err || new ERR_STREAM_DESTROYED('pipe'));
5245
};
5346
}

test/parallel/test-stream-pipeline.js

+14-1
Original file line numberDiff line numberDiff line change
@@ -763,7 +763,10 @@ const { promisify } = require('util');
763763
s.emit('data', 'asd');
764764
s.emit('end');
765765
});
766-
s.close = common.mustCall();
766+
// 'destroyer' can be called multiple times,
767+
// once from stream wrapper and
768+
// once from iterator wrapper.
769+
s.close = common.mustCallAtLeast(1);
767770
let ret = '';
768771
pipeline(s, async function(source) {
769772
for await (const chunk of source) {
@@ -909,3 +912,13 @@ const { promisify } = require('util');
909912
assert.strictEqual(err.message, 'kaboom');
910913
}));
911914
}
915+
916+
{
917+
const src = new PassThrough({ autoDestroy: false });
918+
const dst = new PassThrough({ autoDestroy: false });
919+
pipeline(src, dst, common.mustCall(() => {
920+
assert.strictEqual(src.destroyed, true);
921+
assert.strictEqual(dst.destroyed, true);
922+
}));
923+
src.end();
924+
}

0 commit comments

Comments
 (0)