Skip to content

Commit 2bb4ac4

Browse files
ronagBethGriggs
authored andcommitted
stream: avoid drain for sync streams
Previously a sync writable receiving chunks larger than highwatermark would unecessarily ping pong needDrain. PR-URL: #32887 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
1 parent 44c157e commit 2bb4ac4

8 files changed

+25
-17
lines changed

benchmark/streams/writable-manywrites.js

+4-3
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,12 @@ const bench = common.createBenchmark(main, {
77
n: [2e6],
88
sync: ['yes', 'no'],
99
writev: ['yes', 'no'],
10-
callback: ['yes', 'no']
10+
callback: ['yes', 'no'],
11+
len: [1024, 32 * 1024]
1112
});
1213

13-
function main({ n, sync, writev, callback }) {
14-
const b = Buffer.allocUnsafe(1024);
14+
function main({ n, sync, writev, callback, len }) {
15+
const b = Buffer.allocUnsafe(len);
1516
const s = new Writable();
1617
sync = sync === 'yes';
1718

lib/_stream_writable.js

+6-5
Original file line numberDiff line numberDiff line change
@@ -337,11 +337,6 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) {
337337

338338
state.length += len;
339339

340-
const ret = state.length < state.highWaterMark;
341-
// We must ensure that previous needDrain will not be reset to false.
342-
if (!ret)
343-
state.needDrain = true;
344-
345340
if (state.writing || state.corked || state.errored) {
346341
state.buffered.push({ chunk, encoding, callback });
347342
if (state.allBuffers && encoding !== 'buffer') {
@@ -359,6 +354,12 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) {
359354
state.sync = false;
360355
}
361356

357+
const ret = state.length < state.highWaterMark;
358+
359+
// We must ensure that previous needDrain will not be reset to false.
360+
if (!ret)
361+
state.needDrain = true;
362+
362363
// Return false if errored or destroyed in order to break
363364
// any synchronous while(stream.write(data)) loops.
364365
return ret && !state.errored && !state.destroyed;

test/parallel/test-stream-big-packet.js

+5-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,11 @@ class TestStream extends stream.Transform {
3636
}
3737
}
3838

39-
const s1 = new stream.PassThrough();
39+
const s1 = new stream.Transform({
40+
transform(chunk, encoding, cb) {
41+
process.nextTick(cb, null, chunk);
42+
}
43+
});
4044
const s2 = new stream.PassThrough();
4145
const s3 = new TestStream();
4246
s1.pipe(s3);

test/parallel/test-stream-catch-rejections.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ const assert = require('assert');
3030
captureRejections: true,
3131
highWaterMark: 1,
3232
write(chunk, enc, cb) {
33-
cb();
33+
process.nextTick(cb);
3434
}
3535
});
3636

test/parallel/test-stream-pipe-await-drain-push-while-write.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ const writable = new stream.Writable({
1919
});
2020
}
2121

22-
cb();
22+
process.nextTick(cb);
2323
}, 3)
2424
});
2525

test/parallel/test-stream-pipe-await-drain.js

+3-3
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ reader._read = () => {};
1919

2020
writer1._write = common.mustCall(function(chunk, encoding, cb) {
2121
this.emit('chunk-received');
22-
cb();
22+
process.nextTick(cb);
2323
}, 1);
2424

2525
writer1.once('chunk-received', () => {
@@ -42,7 +42,7 @@ writer2._write = common.mustCall((chunk, encoding, cb) => {
4242
reader._readableState.awaitDrainWriters.size,
4343
1,
4444
'awaitDrain should be 1 after first push, actual is ' +
45-
reader._readableState.awaitDrainWriters
45+
reader._readableState.awaitDrainWriters.size
4646
);
4747
// Not calling cb here to "simulate" slow stream.
4848
// This should be called exactly once, since the first .write() call
@@ -54,7 +54,7 @@ writer3._write = common.mustCall((chunk, encoding, cb) => {
5454
reader._readableState.awaitDrainWriters.size,
5555
2,
5656
'awaitDrain should be 2 after second push, actual is ' +
57-
reader._readableState.awaitDrainWriters
57+
reader._readableState.awaitDrainWriters.size
5858
);
5959
// Not calling cb here to "simulate" slow stream.
6060
// This should be called exactly once, since the first .write() call

test/parallel/test-stream-writable-needdrain-state.js

+4-2
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,10 @@ const transform = new stream.Transform({
1010
});
1111

1212
function _transform(chunk, encoding, cb) {
13-
assert.strictEqual(transform._writableState.needDrain, true);
14-
cb();
13+
process.nextTick(() => {
14+
assert.strictEqual(transform._writableState.needDrain, true);
15+
cb();
16+
});
1517
}
1618

1719
assert.strictEqual(transform._writableState.needDrain, false);

test/parallel/test-stream2-finish-pipe.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ r._read = function(size) {
3030

3131
const w = new stream.Writable();
3232
w._write = function(data, encoding, cb) {
33-
cb(null);
33+
process.nextTick(cb, null);
3434
};
3535

3636
r.pipe(w);

0 commit comments

Comments
 (0)