Skip to content

Commit d6bcf8b

Browse files
mafintoshtargos
authored andcommitted
stream: add auto-destroy mode
PR-URL: #22795 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Jeremiah Senkpiel <fishrock123@rocketmail.com>
1 parent 7a2134c commit d6bcf8b

File tree

5 files changed

+158
-12
lines changed

5 files changed

+158
-12
lines changed

doc/api/stream.md

+17
Original file line numberDiff line numberDiff line change
@@ -1493,6 +1493,11 @@ changes:
14931493
pr-url: https://github.com/nodejs/node/pull/18438
14941494
description: >
14951495
Add `emitClose` option to specify if `'close'` is emitted on destroy
1496+
- version: REPLACEME
1497+
pr-url: https://github.com/nodejs/node/pull/22795
1498+
description: >
1499+
Add `autoDestroy` option to automatically `destroy()` the stream
1500+
when it emits `'finish'` or errors
14961501
-->
14971502

14981503
* `options` {Object}
@@ -1521,6 +1526,8 @@ changes:
15211526
[`stream._destroy()`][writable-_destroy] method.
15221527
* `final` {Function} Implementation for the
15231528
[`stream._final()`][stream-_final] method.
1529+
* `autoDestroy` {boolean} Whether this stream should automatically call
1530+
`.destroy()` on itself after ending. **Default:** `false`.
15241531

15251532
```js
15261533
const { Writable } = require('stream');
@@ -1756,6 +1763,14 @@ Custom `Readable` streams *must* call the `new stream.Readable([options])`
17561763
constructor and implement the `readable._read()` method.
17571764

17581765
#### new stream.Readable([options])
1766+
<!-- YAML
1767+
changes:
1768+
- version: REPLACEME
1769+
pr-url: https://github.com/nodejs/node/pull/22795
1770+
description: >
1771+
Add `autoDestroy` option to automatically `destroy()` the stream
1772+
when it emits `'end'` or errors
1773+
-->
17591774

17601775
* `options` {Object}
17611776
* `highWaterMark` {number} The maximum [number of bytes][hwm-gotcha] to store
@@ -1770,6 +1785,8 @@ constructor and implement the `readable._read()` method.
17701785
method.
17711786
* `destroy` {Function} Implementation for the
17721787
[`stream._destroy()`][readable-_destroy] method.
1788+
* `autoDestroy` {boolean} Whether this stream should automatically call
1789+
`.destroy()` on itself after ending. **Default:** `false`.
17731790

17741791
```js
17751792
const { Readable } = require('stream');

lib/_stream_readable.js

+18-5
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ let createReadableStreamAsyncIterator;
4646

4747
util.inherits(Readable, Stream);
4848

49+
const { errorOrDestroy } = destroyImpl;
4950
const kProxyEvents = ['error', 'close', 'destroy', 'pause', 'resume'];
5051

5152
function prependListener(emitter, event, fn) {
@@ -117,6 +118,9 @@ function ReadableState(options, stream, isDuplex) {
117118
// Should close be emitted on destroy. Defaults to true.
118119
this.emitClose = options.emitClose !== false;
119120

121+
// Should .destroy() be called after 'end' (and potentially 'finish')
122+
this.autoDestroy = !!options.autoDestroy;
123+
120124
// has it been destroyed
121125
this.destroyed = false;
122126

@@ -235,7 +239,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
235239
if (!skipChunkCheck)
236240
er = chunkInvalid(state, chunk);
237241
if (er) {
238-
stream.emit('error', er);
242+
errorOrDestroy(stream, er);
239243
} else if (state.objectMode || chunk && chunk.length > 0) {
240244
if (typeof chunk !== 'string' &&
241245
!state.objectMode &&
@@ -245,11 +249,11 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
245249

246250
if (addToFront) {
247251
if (state.endEmitted)
248-
stream.emit('error', new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
252+
errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
249253
else
250254
addChunk(stream, state, chunk, true);
251255
} else if (state.ended) {
252-
stream.emit('error', new ERR_STREAM_PUSH_AFTER_EOF());
256+
errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
253257
} else if (state.destroyed) {
254258
return false;
255259
} else {
@@ -581,7 +585,7 @@ function maybeReadMore_(stream, state) {
581585
// for virtual (non-string, non-buffer) streams, "length" is somewhat
582586
// arbitrary, and perhaps not very meaningful.
583587
Readable.prototype._read = function(n) {
584-
this.emit('error', new ERR_METHOD_NOT_IMPLEMENTED('_read()'));
588+
errorOrDestroy(this, new ERR_METHOD_NOT_IMPLEMENTED('_read()'));
585589
};
586590

587591
Readable.prototype.pipe = function(dest, pipeOpts) {
@@ -687,7 +691,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
687691
unpipe();
688692
dest.removeListener('error', onerror);
689693
if (EE.listenerCount(dest, 'error') === 0)
690-
dest.emit('error', er);
694+
errorOrDestroy(dest, er);
691695
}
692696

693697
// Make sure our error handler is attached before userland ones.
@@ -1092,5 +1096,14 @@ function endReadableNT(state, stream) {
10921096
state.endEmitted = true;
10931097
stream.readable = false;
10941098
stream.emit('end');
1099+
1100+
if (state.autoDestroy) {
1101+
// In case of duplex streams we need a way to detect
1102+
// if the writable side is ready for autoDestroy as well
1103+
const wState = stream._writableState;
1104+
if (!wState || (wState.autoDestroy && wState.finished)) {
1105+
stream.destroy();
1106+
}
1107+
}
10951108
}
10961109
}

lib/_stream_writable.js

+20-6
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ const {
4545
ERR_UNKNOWN_ENCODING
4646
} = require('internal/errors').codes;
4747

48+
const { errorOrDestroy } = destroyImpl;
49+
4850
util.inherits(Writable, Stream);
4951

5052
function nop() {}
@@ -147,6 +149,9 @@ function WritableState(options, stream, isDuplex) {
147149
// Should close be emitted on destroy. Defaults to true.
148150
this.emitClose = options.emitClose !== false;
149151

152+
// Should .destroy() be called after 'finish' (and potentially 'end')
153+
this.autoDestroy = !!options.autoDestroy;
154+
150155
// count buffered requests
151156
this.bufferedRequestCount = 0;
152157

@@ -235,14 +240,14 @@ function Writable(options) {
235240

236241
// Otherwise people can pipe Writable streams, which is just wrong.
237242
Writable.prototype.pipe = function() {
238-
this.emit('error', new ERR_STREAM_CANNOT_PIPE());
243+
errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE());
239244
};
240245

241246

242247
function writeAfterEnd(stream, cb) {
243248
var er = new ERR_STREAM_WRITE_AFTER_END();
244249
// TODO: defer error events consistently everywhere, not just the cb
245-
stream.emit('error', er);
250+
errorOrDestroy(stream, er);
246251
process.nextTick(cb, er);
247252
}
248253

@@ -258,7 +263,7 @@ function validChunk(stream, state, chunk, cb) {
258263
er = new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer'], chunk);
259264
}
260265
if (er) {
261-
stream.emit('error', er);
266+
errorOrDestroy(stream, er);
262267
process.nextTick(cb, er);
263268
return false;
264269
}
@@ -422,13 +427,13 @@ function onwriteError(stream, state, sync, er, cb) {
422427
// after error
423428
process.nextTick(finishMaybe, stream, state);
424429
stream._writableState.errorEmitted = true;
425-
stream.emit('error', er);
430+
errorOrDestroy(stream, er);
426431
} else {
427432
// the caller expect this to happen before if
428433
// it is async
429434
cb(er);
430435
stream._writableState.errorEmitted = true;
431-
stream.emit('error', er);
436+
errorOrDestroy(stream, er);
432437
// this can emit finish, but finish must
433438
// always follow error
434439
finishMaybe(stream, state);
@@ -612,7 +617,7 @@ function callFinal(stream, state) {
612617
stream._final((err) => {
613618
state.pendingcb--;
614619
if (err) {
615-
stream.emit('error', err);
620+
errorOrDestroy(stream, err);
616621
}
617622
state.prefinished = true;
618623
stream.emit('prefinish');
@@ -639,6 +644,15 @@ function finishMaybe(stream, state) {
639644
if (state.pendingcb === 0) {
640645
state.finished = true;
641646
stream.emit('finish');
647+
648+
if (state.autoDestroy) {
649+
// In case of duplex streams we need a way to detect
650+
// if the readable side is ready for autoDestroy as well
651+
const rState = stream._readableState;
652+
if (!rState || (rState.autoDestroy && rState.endEmitted)) {
653+
stream.destroy();
654+
}
655+
}
642656
}
643657
}
644658
return need;

lib/internal/streams/destroy.js

+19-1
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,25 @@ function emitErrorNT(self, err) {
8282
self.emit('error', err);
8383
}
8484

85+
function errorOrDestroy(stream, err) {
86+
// We have tests that rely on errors being emitted
87+
// in the same tick, so changing this is semver major.
88+
// For now when you opt-in to autoDestroy we allow
89+
// the error to be emitted nextTick. In a future
90+
// semver major update we should change the default to this.
91+
92+
const rState = stream._readableState;
93+
const wState = stream._writableState;
94+
95+
if ((rState && rState.autoDestroy) || (wState && wState.autoDestroy))
96+
stream.destroy(err);
97+
else
98+
stream.emit('error', err);
99+
}
100+
101+
85102
module.exports = {
86103
destroy,
87-
undestroy
104+
undestroy,
105+
errorOrDestroy
88106
};
+84
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
'use strict';
2+
const common = require('../common');
3+
const stream = require('stream');
4+
const assert = require('assert');
5+
6+
{
7+
const r = new stream.Readable({
8+
autoDestroy: true,
9+
read() {
10+
this.push('hello');
11+
this.push('world');
12+
this.push(null);
13+
},
14+
destroy: common.mustCall((err, cb) => cb())
15+
});
16+
17+
let ended = false;
18+
19+
r.resume();
20+
21+
r.on('end', common.mustCall(() => {
22+
ended = true;
23+
}));
24+
25+
r.on('close', common.mustCall(() => {
26+
assert(ended);
27+
}));
28+
}
29+
30+
{
31+
const w = new stream.Writable({
32+
autoDestroy: true,
33+
write(data, enc, cb) {
34+
cb(null);
35+
},
36+
destroy: common.mustCall((err, cb) => cb())
37+
});
38+
39+
let finished = false;
40+
41+
w.write('hello');
42+
w.write('world');
43+
w.end();
44+
45+
w.on('finish', common.mustCall(() => {
46+
finished = true;
47+
}));
48+
49+
w.on('close', common.mustCall(() => {
50+
assert(finished);
51+
}));
52+
}
53+
54+
{
55+
const t = new stream.Transform({
56+
autoDestroy: true,
57+
transform(data, enc, cb) {
58+
cb(null, data);
59+
},
60+
destroy: common.mustCall((err, cb) => cb())
61+
});
62+
63+
let ended = false;
64+
let finished = false;
65+
66+
t.write('hello');
67+
t.write('world');
68+
t.end();
69+
70+
t.resume();
71+
72+
t.on('end', common.mustCall(() => {
73+
ended = true;
74+
}));
75+
76+
t.on('finish', common.mustCall(() => {
77+
finished = true;
78+
}));
79+
80+
t.on('close', common.mustCall(() => {
81+
assert(ended);
82+
assert(finished);
83+
}));
84+
}

0 commit comments

Comments
 (0)