Skip to content

Commit f4efb7f

Browse files
jakecastellitargos
authored andcommitted
stream: make sure _destroy is called
PR-URL: #53213 Fixes: #51987 Reviewed-By: Robert Nagy <ronagy@icloud.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
1 parent 4a3525b commit f4efb7f

File tree

2 files changed

+49
-3
lines changed

2 files changed

+49
-3
lines changed

lib/internal/streams/compose.js

+4-3
Original file line numberDiff line numberDiff line change
@@ -238,13 +238,14 @@ module.exports = function compose(...streams) {
238238
ondrain = null;
239239
onfinish = null;
240240

241+
if (isNodeStream(tail)) {
242+
destroyer(tail, err);
243+
}
244+
241245
if (onclose === null) {
242246
callback(err);
243247
} else {
244248
onclose = callback;
245-
if (isNodeStream(tail)) {
246-
destroyer(tail, err);
247-
}
248249
}
249250
};
250251

test/parallel/test-stream-compose.js

+45
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
const common = require('../common');
66
const {
7+
Duplex,
78
Readable,
89
Transform,
910
Writable,
@@ -494,3 +495,47 @@ const assert = require('assert');
494495
assert.deepStrictEqual(await newStream.toArray(), [Buffer.from('Steve RogersOn your left')]);
495496
})().then(common.mustCall());
496497
}
498+
499+
{
500+
class DuplexProcess extends Duplex {
501+
constructor(options) {
502+
super({ ...options, objectMode: true });
503+
this.stuff = [];
504+
}
505+
506+
_write(message, _, callback) {
507+
this.stuff.push(message);
508+
callback();
509+
}
510+
511+
_destroy(err, cb) {
512+
cb(err);
513+
}
514+
515+
_read() {
516+
if (this.stuff.length) {
517+
this.push(this.stuff.shift());
518+
} else if (this.writableEnded) {
519+
this.push(null);
520+
} else {
521+
this._read();
522+
}
523+
}
524+
}
525+
526+
const pass = new PassThrough({ objectMode: true });
527+
const duplex = new DuplexProcess();
528+
529+
const composed = compose(
530+
pass,
531+
duplex
532+
).on('error', () => {});
533+
534+
composed.write('hello');
535+
composed.write('world');
536+
composed.end();
537+
538+
composed.destroy(new Error('an unexpected error'));
539+
assert.strictEqual(duplex.destroyed, true);
540+
541+
}

0 commit comments

Comments
 (0)