Skip to content

Commit 5e3f516

Browse files
mafintoshmcollina
authored andcommitted
stream: updated streams error handling
This improves error handling for streams in a few ways. 1. It ensures that no user defined methods (_read, _write, ...) are run after .destroy has been called. 2. It introduces an explicit error to tell the user if they are write to write, etc to the stream after it has been destroyed. 3. It makes streams always emit close as the last thing after they have been destroyed 4. Changes the default _destroy to not gracefully end streams. It also updates net, http2, zlib and fs to the new error handling. PR-URL: nodejs#18438 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Ruben Bridgewater <ruben@bridgewater.de> Reviewed-By: Anna Henningsen <anna@addaleax.net>
1 parent acac0f8 commit 5e3f516

18 files changed

+107
-55
lines changed

doc/api/errors.md

+6-5
Original file line numberDiff line numberDiff line change
@@ -1449,6 +1449,12 @@ An unspecified or non-specific system error has occurred within the Node.js
14491449
process. The error object will have an `err.info` object property with
14501450
additional details.
14511451

1452+
<a id="ERR_STREAM_DESTROYED"></a>
1453+
### ERR_STREAM_DESTROYED
1454+
1455+
A stream method was called that cannot complete because the stream was
1456+
destroyed using `stream.destroy()`.
1457+
14521458
<a id="ERR_TLS_CERT_ALTNAME_INVALID"></a>
14531459
### ERR_TLS_CERT_ALTNAME_INVALID
14541460

@@ -1615,11 +1621,6 @@ The fulfilled value of a linking promise is not a `vm.Module` object.
16151621
The current module's status does not allow for this operation. The specific
16161622
meaning of the error depends on the specific function.
16171623

1618-
<a id="ERR_ZLIB_BINDING_CLOSED"></a>
1619-
### ERR_ZLIB_BINDING_CLOSED
1620-
1621-
An attempt was made to use a `zlib` object after it has already been closed.
1622-
16231624
<a id="ERR_ZLIB_INITIALIZATION_FAILED"></a>
16241625
### ERR_ZLIB_INITIALIZATION_FAILED
16251626

doc/api/stream.md

+15-4
Original file line numberDiff line numberDiff line change
@@ -543,8 +543,10 @@ added: v8.0.0
543543

544544
* Returns: {this}
545545

546-
Destroy the stream, and emit the passed error. After this call, the
547-
writable stream has ended. Implementors should not override this method,
546+
Destroy the stream, and emit the passed `error` and a `close` event.
547+
After this call, the writable stream has ended and subsequent calls
548+
to `write` / `end` will give an `ERR_STREAM_DESTROYED` error.
549+
Implementors should not override this method,
548550
but instead implement [`writable._destroy`][writable-_destroy].
549551

550552
### Readable Streams
@@ -1167,8 +1169,9 @@ myReader.on('readable', () => {
11671169
added: v8.0.0
11681170
-->
11691171

1170-
Destroy the stream, and emit `'error'`. After this call, the
1171-
readable stream will release any internal resources.
1172+
Destroy the stream, and emit `'error'` and `close`. After this call, the
1173+
readable stream will release any internal resources and subsequent calls
1174+
to `push` will be ignored.
11721175
Implementors should not override this method, but instead implement
11731176
[`readable._destroy`][readable-_destroy].
11741177

@@ -1382,6 +1385,12 @@ constructor and implement the `writable._write()` method. The
13821385
`writable._writev()` method *may* also be implemented.
13831386

13841387
#### Constructor: new stream.Writable([options])
1388+
<!-- YAML
1389+
changes:
1390+
- version: REPLACEME
1391+
pr-url: https://github.com/nodejs/node/pull/18438
1392+
description: Add `emitClose` option to specify if `close` is emitted on destroy
1393+
-->
13851394

13861395
* `options` {Object}
13871396
* `highWaterMark` {number} Buffer level when
@@ -1395,6 +1404,8 @@ constructor and implement the `writable._write()` method. The
13951404
it becomes possible to write JavaScript values other than string,
13961405
`Buffer` or `Uint8Array` if supported by the stream implementation.
13971406
Defaults to `false`
1407+
* `emitClose` {boolean} Whether or not the stream should emit `close`
1408+
after it has been destroyed. Defaults to `true`
13981409
* `write` {Function} Implementation for the
13991410
[`stream._write()`][stream-_write] method.
14001411
* `writev` {Function} Implementation for the

lib/_stream_duplex.js

-7
Original file line numberDiff line numberDiff line change
@@ -135,10 +135,3 @@ Object.defineProperty(Duplex.prototype, 'destroyed', {
135135
this._writableState.destroyed = value;
136136
}
137137
});
138-
139-
Duplex.prototype._destroy = function(err, cb) {
140-
this.push(null);
141-
this.end();
142-
143-
process.nextTick(cb, err);
144-
};

lib/_stream_readable.js

+5-1
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,9 @@ function ReadableState(options, stream) {
106106
this.readableListening = false;
107107
this.resumeScheduled = false;
108108

109+
// Should close be emitted on destroy. Defaults to true.
110+
this.emitClose = options.emitClose !== false;
111+
109112
// has it been destroyed
110113
this.destroyed = false;
111114

@@ -177,7 +180,6 @@ Object.defineProperty(Readable.prototype, 'destroyed', {
177180
Readable.prototype.destroy = destroyImpl.destroy;
178181
Readable.prototype._undestroy = destroyImpl.undestroy;
179182
Readable.prototype._destroy = function(err, cb) {
180-
this.push(null);
181183
cb(err);
182184
};
183185

@@ -236,6 +238,8 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
236238
addChunk(stream, state, chunk, true);
237239
} else if (state.ended) {
238240
stream.emit('error', new errors.Error('ERR_STREAM_PUSH_AFTER_EOF'));
241+
} else if (state.destroyed) {
242+
return false;
239243
} else {
240244
state.reading = false;
241245
if (state.decoder && !encoding) {

lib/_stream_transform.js

+1-2
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ function Transform(options) {
132132
}
133133

134134
function prefinish() {
135-
if (typeof this._flush === 'function') {
135+
if (typeof this._flush === 'function' && !this._readableState.destroyed) {
136136
this._flush((er, data) => {
137137
done(this, er, data);
138138
});
@@ -194,7 +194,6 @@ Transform.prototype._read = function(n) {
194194
Transform.prototype._destroy = function(err, cb) {
195195
Duplex.prototype._destroy.call(this, err, (err2) => {
196196
cb(err2);
197-
this.emit('close');
198197
});
199198
};
200199

lib/_stream_writable.js

+7-3
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,9 @@ function WritableState(options, stream) {
134134
// True if the error was already emitted and should not be thrown again
135135
this.errorEmitted = false;
136136

137+
// Should close be emitted on destroy. Defaults to true.
138+
this.emitClose = options.emitClose !== false;
139+
137140
// count buffered requests
138141
this.bufferedRequestCount = 0;
139142

@@ -390,7 +393,9 @@ function doWrite(stream, state, writev, len, chunk, encoding, cb) {
390393
state.writecb = cb;
391394
state.writing = true;
392395
state.sync = true;
393-
if (writev)
396+
if (state.destroyed)
397+
state.onwrite(new errors.Error('ERR_STREAM_DESTROYED', 'write'));
398+
else if (writev)
394399
stream._writev(chunk, state.onwrite);
395400
else
396401
stream._write(chunk, encoding, state.onwrite);
@@ -604,7 +609,7 @@ function callFinal(stream, state) {
604609
}
605610
function prefinish(stream, state) {
606611
if (!state.prefinished && !state.finalCalled) {
607-
if (typeof stream._final === 'function') {
612+
if (typeof stream._final === 'function' && !state.destroyed) {
608613
state.pendingcb++;
609614
state.finalCalled = true;
610615
process.nextTick(callFinal, stream, state);
@@ -681,6 +686,5 @@ Object.defineProperty(Writable.prototype, 'destroyed', {
681686
Writable.prototype.destroy = destroyImpl.destroy;
682687
Writable.prototype._undestroy = destroyImpl.undestroy;
683688
Writable.prototype._destroy = function(err, cb) {
684-
this.end();
685689
cb(err);
686690
};

lib/fs.js

+6
Original file line numberDiff line numberDiff line change
@@ -1929,6 +1929,9 @@ function ReadStream(path, options) {
19291929
if (options.highWaterMark === undefined)
19301930
options.highWaterMark = 64 * 1024;
19311931

1932+
// for backwards compat do not emit close on destroy.
1933+
options.emitClose = false;
1934+
19321935
Readable.call(this, options);
19331936

19341937
// path will be ignored when fd is specified, so it can be falsy
@@ -2084,6 +2087,9 @@ function WriteStream(path, options) {
20842087

20852088
options = copyObject(getOptions(options, {}));
20862089

2090+
// for backwards compat do not emit close on destroy.
2091+
options.emitClose = false;
2092+
20872093
Writable.call(this, options);
20882094

20892095
// path will be ignored when fd is specified, so it can be falsy

lib/internal/errors.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -843,6 +843,7 @@ E('ERR_SOCKET_DGRAM_NOT_RUNNING', 'Not running', Error);
843843
E('ERR_STDERR_CLOSE', 'process.stderr cannot be closed', Error);
844844
E('ERR_STDOUT_CLOSE', 'process.stdout cannot be closed', Error);
845845
E('ERR_STREAM_CANNOT_PIPE', 'Cannot pipe, not readable', Error);
846+
E('ERR_STREAM_DESTROYED', 'Cannot call %s after a stream was destroyed');
846847
E('ERR_STREAM_NULL_VALUES', 'May not write null values to stream', TypeError);
847848
E('ERR_STREAM_PUSH_AFTER_EOF', 'stream.push() after EOF', Error);
848849
E('ERR_STREAM_READ_NOT_IMPLEMENTED', '_read() is not implemented', Error);
@@ -908,7 +909,6 @@ E('ERR_VM_MODULE_NOT_LINKED',
908909
E('ERR_VM_MODULE_NOT_MODULE',
909910
'Provided module is not an instance of Module', Error);
910911
E('ERR_VM_MODULE_STATUS', 'Module status %s', Error);
911-
E('ERR_ZLIB_BINDING_CLOSED', 'zlib binding closed', Error);
912912
E('ERR_ZLIB_INITIALIZATION_FAILED', 'Initialization failed', Error);
913913

914914
function sysError(code, syscall, path, dest,

lib/internal/http2/core.js

+1
Original file line numberDiff line numberDiff line change
@@ -1475,6 +1475,7 @@ class Http2Stream extends Duplex {
14751475
constructor(session, options) {
14761476
options.allowHalfOpen = true;
14771477
options.decodeStrings = false;
1478+
options.emitClose = false;
14781479
super(options);
14791480
this[async_id_symbol] = -1;
14801481

lib/internal/streams/destroy.js

+9
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ function destroy(err, cb) {
3030
}
3131

3232
this._destroy(err || null, (err) => {
33+
process.nextTick(emitCloseNT, this);
3334
if (!cb && err) {
3435
process.nextTick(emitErrorNT, this, err);
3536
if (this._writableState) {
@@ -43,6 +44,14 @@ function destroy(err, cb) {
4344
return this;
4445
}
4546

47+
function emitCloseNT(self) {
48+
if (self._writableState && !self._writableState.emitClose)
49+
return;
50+
if (self._readableState && !self._readableState.emitClose)
51+
return;
52+
self.emit('close');
53+
}
54+
4655
function undestroy() {
4756
if (this._readableState) {
4857
this._readableState.destroyed = false;

lib/net.js

+5
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,11 @@ function Socket(options) {
232232
options = { fd: options }; // Legacy interface.
233233
else if (options === undefined)
234234
options = {};
235+
else
236+
options = util._extend({}, options);
237+
238+
// For backwards compat do not emit close on destroy.
239+
options.emitClose = false;
235240

236241
stream.Duplex.call(this, options);
237242

lib/zlib.js

+2-7
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ const {
2525
ERR_BUFFER_TOO_LARGE,
2626
ERR_INVALID_ARG_TYPE,
2727
ERR_OUT_OF_RANGE,
28-
ERR_ZLIB_BINDING_CLOSED,
2928
ERR_ZLIB_INITIALIZATION_FAILED
3029
} = require('internal/errors').codes;
3130
const Transform = require('_stream_transform');
@@ -392,7 +391,7 @@ Zlib.prototype.flush = function flush(kind, callback) {
392391

393392
Zlib.prototype.close = function close(callback) {
394393
_close(this, callback);
395-
process.nextTick(emitCloseNT, this);
394+
this.destroy();
396395
};
397396

398397
Zlib.prototype._transform = function _transform(chunk, encoding, cb) {
@@ -510,7 +509,7 @@ function processChunkSync(self, chunk, flushFlag) {
510509
function processChunk(self, chunk, flushFlag, cb) {
511510
var handle = self._handle;
512511
if (!handle)
513-
return cb(new ERR_ZLIB_BINDING_CLOSED());
512+
assert(false, 'zlib binding closed');
514513

515514
handle.buffer = chunk;
516515
handle.cb = cb;
@@ -603,10 +602,6 @@ function _close(engine, callback) {
603602
engine._handle = null;
604603
}
605604

606-
function emitCloseNT(self) {
607-
self.emit('close');
608-
}
609-
610605
// generic zlib
611606
// minimal 2-byte header
612607
function Deflate(opts) {

test/parallel/test-net-socket-destroy-send.js

+4-4
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,14 @@ server.listen(0, common.mustCall(function() {
1313
// Test destroy returns this, even on multiple calls when it short-circuits.
1414
assert.strictEqual(conn, conn.destroy().destroy());
1515
conn.on('error', common.expectsError({
16-
code: 'ERR_SOCKET_CLOSED',
17-
message: 'Socket is closed',
16+
code: 'ERR_STREAM_DESTROYED',
17+
message: 'Cannot call write after a stream was destroyed',
1818
type: Error
1919
}));
2020

2121
conn.write(Buffer.from('kaboom'), common.expectsError({
22-
code: 'ERR_SOCKET_CLOSED',
23-
message: 'Socket is closed',
22+
code: 'ERR_STREAM_DESTROYED',
23+
message: 'Cannot call write after a stream was destroyed',
2424
type: Error
2525
}));
2626
server.close();

test/parallel/test-stream-duplex-destroy.js

+8-6
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,9 @@ const { inherits } = require('util');
1313

1414
duplex.resume();
1515

16-
duplex.on('end', common.mustCall());
17-
duplex.on('finish', common.mustCall());
16+
duplex.on('end', common.mustNotCall());
17+
duplex.on('finish', common.mustNotCall());
18+
duplex.on('close', common.mustCall());
1819

1920
duplex.destroy();
2021
assert.strictEqual(duplex.destroyed, true);
@@ -29,8 +30,8 @@ const { inherits } = require('util');
2930

3031
const expected = new Error('kaboom');
3132

32-
duplex.on('end', common.mustCall());
33-
duplex.on('finish', common.mustCall());
33+
duplex.on('end', common.mustNotCall());
34+
duplex.on('finish', common.mustNotCall());
3435
duplex.on('error', common.mustCall((err) => {
3536
assert.strictEqual(err, expected);
3637
}));
@@ -78,6 +79,7 @@ const { inherits } = require('util');
7879

7980
// error is swallowed by the custom _destroy
8081
duplex.on('error', common.mustNotCall('no error event'));
82+
duplex.on('close', common.mustCall());
8183

8284
duplex.destroy(expected);
8385
assert.strictEqual(duplex.destroyed, true);
@@ -159,8 +161,8 @@ const { inherits } = require('util');
159161
});
160162
duplex.resume();
161163

162-
duplex.on('finish', common.mustCall());
163-
duplex.on('end', common.mustCall());
164+
duplex.on('finish', common.mustNotCall());
165+
duplex.on('end', common.mustNotCall());
164166

165167
duplex.destroy();
166168
assert.strictEqual(duplex.destroyed, true);

0 commit comments

Comments
 (0)