Skip to content

Commit f94fef0

Browse files
committedOct 2, 2023
stream: lazy allocate back pressure buffer
PR-URL: #50013
1 parent 85c09f1 commit f94fef0

File tree

1 file changed

+23
-18
lines changed

1 file changed

+23
-18
lines changed
 

‎lib/internal/streams/writable.js

+23-18
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ const kWriteCb = 1 << 26;
108108
const kExpectWriteCb = 1 << 27;
109109
const kAfterWriteTickInfo = 1 << 28;
110110
const kAfterWritePending = 1 << 29;
111+
const kHasBuffer = 1 << 30;
111112

112113
// TODO(benjamingr) it is likely slower to do it this way than with free functions
113114
function makeBitMapDescriptor(bit) {
@@ -337,19 +338,20 @@ function WritableState(options, stream, isDuplex) {
337338
}
338339

339340
function resetBuffer(state) {
340-
state.buffered = [];
341+
state.buffered = null;
341342
state.bufferedIndex = 0;
342343
state.state |= kAllBuffers | kAllNoop;
344+
state.state &= ~kHasBuffer;
343345
}
344346

345347
WritableState.prototype.getBuffer = function getBuffer() {
346-
return ArrayPrototypeSlice(this.buffered, this.bufferedIndex);
348+
return (this.state & kHasBuffer) === 0 ? [] : ArrayPrototypeSlice(this.buffered, this.bufferedIndex);
347349
};
348350

349351
ObjectDefineProperty(WritableState.prototype, 'bufferedRequestCount', {
350352
__proto__: null,
351353
get() {
352-
return this.buffered.length - this.bufferedIndex;
354+
return (this.state & kHasBuffer) === 0 ? 0 : this.buffered.length - this.bufferedIndex;
353355
},
354356
});
355357

@@ -522,6 +524,11 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) {
522524
}
523525

524526
if ((state.state & (kWriting | kErrored | kCorked | kConstructed)) !== kConstructed) {
527+
if ((state.state & kHasBuffer) === 0) {
528+
state.state |= kHasBuffer;
529+
state.buffered = [];
530+
}
531+
525532
state.buffered.push({ chunk, encoding, callback });
526533
if ((state.state & kAllBuffers) !== 0 && encoding !== 'buffer') {
527534
state.state &= ~kAllBuffers;
@@ -607,7 +614,7 @@ function onwrite(stream, er) {
607614
onwriteError(stream, state, er, cb);
608615
}
609616
} else {
610-
if (state.buffered.length > state.bufferedIndex) {
617+
if ((state.state & kHasBuffer) !== 0) {
611618
clearBuffer(stream, state);
612619
}
613620

@@ -677,11 +684,13 @@ function errorBuffer(state) {
677684
return;
678685
}
679686

680-
for (let n = state.bufferedIndex; n < state.buffered.length; ++n) {
681-
const { chunk, callback } = state.buffered[n];
682-
const len = (state.state & kObjectMode) !== 0 ? 1 : chunk.length;
683-
state.length -= len;
684-
callback(state.errored ?? new ERR_STREAM_DESTROYED('write'));
687+
if ((state.state & kHasBuffer) !== 0) {
688+
for (let n = state.bufferedIndex; n < state.buffered.length; ++n) {
689+
const { chunk, callback } = state.buffered[n];
690+
const len = (state.state & kObjectMode) !== 0 ? 1 : chunk.length;
691+
state.length -= len;
692+
callback(state.errored ?? new ERR_STREAM_DESTROYED('write'));
693+
}
685694
}
686695

687696

@@ -692,8 +701,7 @@ function errorBuffer(state) {
692701

693702
// If there's something in the buffer waiting, then process it.
694703
function clearBuffer(stream, state) {
695-
if ((state.state & (kDestroyed | kBufferProcessing | kCorked)) !== 0 ||
696-
(state.state & kConstructed) === 0) {
704+
if ((state.state & (kDestroyed | kBufferProcessing | kCorked | kHasBuffer)) !== kHasBuffer) {
697705
return;
698706
}
699707

@@ -828,10 +836,9 @@ function needFinish(state) {
828836
kWriting |
829837
kErrorEmitted |
830838
kCloseEmitted |
831-
kErrored
832-
)) === (kEnding | kConstructed) &&
833-
state.length === 0 &&
834-
state.buffered.length === 0);
839+
kErrored |
840+
kHasBuffer
841+
)) === (kEnding | kConstructed) && state.length === 0);
835842
}
836843

837844
function callFinal(stream, state) {
@@ -1073,9 +1080,7 @@ Writable.prototype.destroy = function(err, cb) {
10731080
const state = this._writableState;
10741081

10751082
// Invoke pending callbacks.
1076-
if ((state.state & kDestroyed) === 0 &&
1077-
(state.bufferedIndex < state.buffered.length ||
1078-
(state.state & kOnFinished) !== 0)) {
1083+
if ((state.state & (kHasBuffer | kOnFinished | kDestroyed)) !== kDestroyed) {
10791084
process.nextTick(errorBuffer, state);
10801085
}
10811086

0 commit comments

Comments
 (0)