Skip to content

Commit ab94e41

Browse files
committed
stream: optimize creation
Refs: nodejs/performance#79 PR-URL: #50337
1 parent 25576b5 commit ab94e41

File tree

4 files changed

+83
-57
lines changed

4 files changed

+83
-57
lines changed

lib/internal/streams/duplex.js

+40-2
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,17 @@ const {
3535

3636
module.exports = Duplex;
3737

38+
const Stream = require('internal/streams/legacy').Stream;
3839
const Readable = require('internal/streams/readable');
3940
const Writable = require('internal/streams/writable');
4041

42+
const {
43+
addAbortSignal,
44+
} = require('internal/streams/add-abort-signal');
45+
46+
const destroyImpl = require('internal/streams/destroy');
47+
const { kOnConstructed } = require('internal/streams/utils');
48+
4149
ObjectSetPrototypeOf(Duplex.prototype, Readable.prototype);
4250
ObjectSetPrototypeOf(Duplex, Readable);
4351

@@ -55,8 +63,8 @@ function Duplex(options) {
5563
if (!(this instanceof Duplex))
5664
return new Duplex(options);
5765

58-
Readable.call(this, options);
59-
Writable.call(this, options);
66+
this._readableState = new Readable.ReadableState(options, this, true);
67+
this._writableState = new Writable.WritableState(options, this, true);
6068

6169
if (options) {
6270
this.allowHalfOpen = options.allowHalfOpen !== false;
@@ -73,9 +81,39 @@ function Duplex(options) {
7381
this._writableState.ended = true;
7482
this._writableState.finished = true;
7583
}
84+
85+
if (typeof options.read === 'function')
86+
this._read = options.read;
87+
88+
if (typeof options.write === 'function')
89+
this._write = options.write;
90+
91+
if (typeof options.writev === 'function')
92+
this._writev = options.writev;
93+
94+
if (typeof options.destroy === 'function')
95+
this._destroy = options.destroy;
96+
97+
if (typeof options.final === 'function')
98+
this._final = options.final;
99+
100+
if (typeof options.construct === 'function')
101+
this._construct = options.construct;
102+
103+
if (options.signal)
104+
addAbortSignal(options.signal, this);
76105
} else {
77106
this.allowHalfOpen = true;
78107
}
108+
109+
Stream.call(this, options);
110+
111+
if (this._construct != null) {
112+
destroyImpl.construct(this, () => {
113+
this._readableState[kOnConstructed](this);
114+
this._writableState[kOnConstructed](this);
115+
});
116+
}
79117
}
80118

81119
ObjectDefineProperties(Duplex.prototype, {

lib/internal/streams/readable.js

+16-20
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ const {
7171
AbortError,
7272
} = require('internal/errors');
7373
const { validateObject } = require('internal/validators');
74+
const { kOnConstructed } = require('internal/streams/utils');
7475

7576
const kState = Symbol('kState');
7677

@@ -251,20 +252,14 @@ ObjectDefineProperties(ReadableState.prototype, {
251252

252253

253254
function ReadableState(options, stream, isDuplex) {
254-
// Duplex streams are both readable and writable, but share
255-
// the same options object.
256-
// However, some cases require setting options to different
257-
// values for the readable and the writable sides of the duplex stream.
258-
// These options can be provided separately as readableXXX and writableXXX.
259-
if (typeof isDuplex !== 'boolean')
260-
isDuplex = stream instanceof Stream.Duplex;
261-
262255
// Bit map field to store ReadableState more effciently with 1 bit per field
263256
// instead of a V8 slot per field.
264257
this[kState] = kEmitClose | kAutoDestroy | kConstructed | kSync;
258+
265259
// Object stream flag. Used to make read(n) ignore n and to
266260
// make all the buffer merging and length checks go away.
267-
if (options && options.objectMode) this[kState] |= kObjectMode;
261+
if (options && options.objectMode)
262+
this[kState] |= kObjectMode;
268263

269264
if (isDuplex && options && options.readableObjectMode)
270265
this[kState] |= kObjectMode;
@@ -310,16 +305,17 @@ function ReadableState(options, stream, isDuplex) {
310305
}
311306
}
312307

308+
ReadableState.prototype[kOnConstructed] = function onConstructed(stream) {
309+
if ((this[kState] & kNeedReadable) !== 0) {
310+
maybeReadMore(stream, this);
311+
}
312+
};
313313

314314
function Readable(options) {
315315
if (!(this instanceof Readable))
316316
return new Readable(options);
317317

318-
// Checking for a Stream.Duplex instance is faster here instead of inside
319-
// the ReadableState constructor, at least with V8 6.5.
320-
const isDuplex = this instanceof Stream.Duplex;
321-
322-
this._readableState = new ReadableState(options, this, isDuplex);
318+
this._readableState = new ReadableState(options, this, false);
323319

324320
if (options) {
325321
if (typeof options.read === 'function')
@@ -331,17 +327,17 @@ function Readable(options) {
331327
if (typeof options.construct === 'function')
332328
this._construct = options.construct;
333329

334-
if (options.signal && !isDuplex)
330+
if (options.signal)
335331
addAbortSignal(options.signal, this);
336332
}
337333

338334
Stream.call(this, options);
339335

340-
destroyImpl.construct(this, () => {
341-
if (this._readableState.needReadable) {
342-
maybeReadMore(this, this._readableState);
343-
}
344-
});
336+
if (this._construct != null) {
337+
destroyImpl.construct(this, () => {
338+
this._readableState[kOnConstructed](this);
339+
});
340+
}
345341
}
346342

347343
Readable.prototype.destroy = destroyImpl.destroy;

lib/internal/streams/utils.js

+4
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ const {
44
SymbolAsyncIterator,
55
SymbolIterator,
66
SymbolFor,
7+
Symbol,
78
} = primordials;
89

910
// We need to use SymbolFor to make these globally available
@@ -16,6 +17,8 @@ const kIsReadable = SymbolFor('nodejs.stream.readable');
1617
const kIsWritable = SymbolFor('nodejs.stream.writable');
1718
const kIsDisturbed = SymbolFor('nodejs.stream.disturbed');
1819

20+
const kOnConstructed = Symbol('kOnConstructed');
21+
1922
const kIsClosedPromise = SymbolFor('nodejs.webstream.isClosedPromise');
2023
const kControllerErrorFunction = SymbolFor('nodejs.webstream.controllerErrorFunction');
2124

@@ -303,6 +306,7 @@ function isErrored(stream) {
303306
}
304307

305308
module.exports = {
309+
kOnConstructed,
306310
isDestroyed,
307311
kIsDestroyed,
308312
isDisturbed,

lib/internal/streams/writable.js

+23-35
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ const EE = require('events');
4444
const Stream = require('internal/streams/legacy').Stream;
4545
const { Buffer } = require('buffer');
4646
const destroyImpl = require('internal/streams/destroy');
47+
const { kOnConstructed } = require('internal/streams/utils');
4748

4849
const {
4950
addAbortSignal,
@@ -290,20 +291,15 @@ ObjectDefineProperties(WritableState.prototype, {
290291
});
291292

292293
function WritableState(options, stream, isDuplex) {
293-
// Duplex streams are both readable and writable, but share
294-
// the same options object.
295-
// However, some cases require setting options to different
296-
// values for the readable and the writable sides of the duplex stream,
297-
// e.g. options.readableObjectMode vs. options.writableObjectMode, etc.
298-
if (typeof isDuplex !== 'boolean')
299-
isDuplex = stream instanceof Stream.Duplex;
300-
301294
// Bit map field to store WritableState more effciently with 1 bit per field
302295
// instead of a V8 slot per field.
303296
this[kState] = kSync | kConstructed | kEmitClose | kAutoDestroy;
304297

305-
if (options && options.objectMode) this[kState] |= kObjectMode;
306-
if (isDuplex && options && options.writableObjectMode) this[kState] |= kObjectMode;
298+
if (options && options.objectMode)
299+
this[kState] |= kObjectMode;
300+
301+
if (isDuplex && options && options.writableObjectMode)
302+
this[kState] |= kObjectMode;
307303

308304
// The point at which write() starts returning false
309305
// Note: 0 is a valid value, means that we always return false if
@@ -323,7 +319,7 @@ function WritableState(options, stream, isDuplex) {
323319
// Crypto is kind of old and crusty. Historically, its default string
324320
// encoding is 'binary' so we have to make this configurable.
325321
// Everything else in the universe uses 'utf8', though.
326-
const defaultEncoding = options?.defaultEncoding;
322+
const defaultEncoding = options ? options.defaultEncoding : null;
327323
if (defaultEncoding == null || defaultEncoding === 'utf8' || defaultEncoding === 'utf-8') {
328324
this[kState] |= kDefaultUTF8Encoding;
329325
} else if (Buffer.isEncoding(defaultEncoding)) {
@@ -372,23 +368,21 @@ ObjectDefineProperty(WritableState.prototype, 'bufferedRequestCount', {
372368
},
373369
});
374370

375-
function Writable(options) {
376-
// Writable ctor is applied to Duplexes, too.
377-
// `realHasInstance` is necessary because using plain `instanceof`
378-
// would return false, as no `_writableState` property is attached.
379-
380-
// Trying to use the custom `instanceof` for Writable here will also break the
381-
// Node.js LazyTransform implementation, which has a non-trivial getter for
382-
// `_writableState` that would lead to infinite recursion.
371+
WritableState.prototype[kOnConstructed] = function onConstructed(stream) {
372+
if ((this[kState] & kWriting) === 0) {
373+
clearBuffer(stream, this);
374+
}
383375

384-
// Checking for a Stream.Duplex instance is faster here instead of inside
385-
// the WritableState constructor, at least with V8 6.5.
386-
const isDuplex = (this instanceof Stream.Duplex);
376+
if ((this[kState] & kEnding) !== 0) {
377+
finishMaybe(stream, this);
378+
}
379+
};
387380

388-
if (!isDuplex && !FunctionPrototypeSymbolHasInstance(Writable, this))
381+
function Writable(options) {
382+
if (!(this instanceof Writable))
389383
return new Writable(options);
390384

391-
this._writableState = new WritableState(options, this, isDuplex);
385+
this._writableState = new WritableState(options, this, false);
392386

393387
if (options) {
394388
if (typeof options.write === 'function')
@@ -412,17 +406,11 @@ function Writable(options) {
412406

413407
Stream.call(this, options);
414408

415-
destroyImpl.construct(this, () => {
416-
const state = this._writableState;
417-
418-
if ((state[kState] & kWriting) === 0) {
419-
clearBuffer(this, state);
420-
}
421-
422-
if ((state[kState] & kEnding) !== 0) {
423-
finishMaybe(this, state);
424-
}
425-
});
409+
if (this._construct != null) {
410+
destroyImpl.construct(this, () => {
411+
this._writableState[kOnConstructed](this);
412+
});
413+
}
426414
}
427415

428416
ObjectDefineProperty(Writable, SymbolHasInstance, {

0 commit comments

Comments
 (0)