Skip to content

Commit 980032d

Browse files
committed
stream: avoid unnecessary drain for sync stream
PR-URL: #50014
1 parent 85c09f1 commit 980032d

File tree

2 files changed

+14
-9
lines changed

2 files changed

+14
-9
lines changed

lib/internal/streams/transform.js

+6-1
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,12 @@ Transform.prototype._write = function(chunk, encoding, callback) {
182182
this.push(val);
183183
}
184184

185-
if (
185+
if (rState.ended) {
186+
// If user has called this.push(null) we have to
187+
// delay the callback to properly progate the new
188+
// state.
189+
process.nextTick(callback);
190+
} else if (
186191
wState.ended || // Backwards compat.
187192
length === rState.length || // Backwards compat.
188193
rState.length < rState.highWaterMark

lib/internal/streams/writable.js

+8-8
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ const kWriteCb = 1 << 26;
108108
const kExpectWriteCb = 1 << 27;
109109
const kAfterWriteTickInfo = 1 << 28;
110110
const kAfterWritePending = 1 << 29;
111+
const kIsDuplex = 1 << 30;
111112

112113
// TODO(benjamingr) it is likely slower to do it this way than with free functions
113114
function makeBitMapDescriptor(bit) {
@@ -286,6 +287,7 @@ function WritableState(options, stream, isDuplex) {
286287

287288
if (options && options.objectMode) this.state |= kObjectMode;
288289
if (isDuplex && options && options.writableObjectMode) this.state |= kObjectMode;
290+
if (isDuplex) this.state |= kIsDuplex;
289291

290292
// The point at which write() starts returning false
291293
// Note: 0 is a valid value, means that we always return false if
@@ -513,14 +515,6 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) {
513515

514516
state.length += len;
515517

516-
// stream._write resets state.length
517-
const ret = state.length < state.highWaterMark;
518-
519-
// We must ensure that previous needDrain will not be reset to false.
520-
if (!ret) {
521-
state.state |= kNeedDrain;
522-
}
523-
524518
if ((state.state & (kWriting | kErrored | kCorked | kConstructed)) !== kConstructed) {
525519
state.buffered.push({ chunk, encoding, callback });
526520
if ((state.state & kAllBuffers) !== 0 && encoding !== 'buffer') {
@@ -539,6 +533,12 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) {
539533
state.state &= ~kSync;
540534
}
541535

536+
const ret = state.length < state.highWaterMark;
537+
538+
if (!ret) {
539+
state.state |= kNeedDrain;
540+
}
541+
542542
// Return false if errored or destroyed in order to break
543543
// any synchronous while(stream.write(data)) loops.
544544
return ret && (state.state & (kDestroyed | kErrored)) === 0;

0 commit comments

Comments
 (0)