Skip to content

Commit 359a659

Browse files
ronagdanielleadams
authored andcommitted
stream: writableNeedDrain
Don't write to a stream which already has a full buffer. Fixes: #35341 PR-URL: #35348 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Luigi Pinca <luigipinca@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
1 parent 0d74226 commit 359a659

File tree

7 files changed

+63
-1
lines changed

7 files changed

+63
-1
lines changed

doc/api/stream.md

+9
Original file line numberDiff line numberDiff line change
@@ -580,6 +580,15 @@ This property contains the number of bytes (or objects) in the queue
580580
ready to be written. The value provides introspection data regarding
581581
the status of the `highWaterMark`.
582582

583+
##### `writable.writableNeedDrain`
584+
<!-- YAML
585+
added: REPLACEME
586+
-->
587+
588+
* {boolean}
589+
590+
Is `true` if the stream's buffer has been full and stream will emit `'drain'`.
591+
583592
##### `writable.writableObjectMode`
584593
<!-- YAML
585594
added: v12.3.0

lib/_http_outgoing.js

+5
Original file line numberDiff line numberDiff line change
@@ -660,6 +660,11 @@ ObjectDefineProperty(OutgoingMessage.prototype, 'writableEnded', {
660660
get: function() { return this.finished; }
661661
});
662662

663+
ObjectDefineProperty(OutgoingMessage.prototype, 'writableNeedDrain', {
664+
get: function() {
665+
return !this.destroyed && !this.finished && this[kNeedDrain];
666+
}
667+
});
663668

664669
const crlf_buf = Buffer.from('\r\n');
665670
OutgoingMessage.prototype.write = function write(chunk, encoding, callback) {

lib/internal/streams/duplex.js

+2
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ ObjectDefineProperties(Duplex.prototype, {
8787
ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableCorked'),
8888
writableEnded:
8989
ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableEnded'),
90+
writableNeedDrain:
91+
ObjectGetOwnPropertyDescriptor(Writable.prototype, 'writableNeedDrain'),
9092

9193
destroyed: {
9294
get() {

lib/internal/streams/pipeline.js

+4
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,10 @@ async function pump(iterable, writable, finish) {
123123
}
124124
let error;
125125
try {
126+
if (writable.writableNeedDrain === true) {
127+
await EE.once(writable, 'drain');
128+
}
129+
126130
for await (const chunk of iterable) {
127131
if (!writable.write(chunk)) {
128132
if (writable.destroyed) return;

lib/internal/streams/readable.js

+6-1
Original file line numberDiff line numberDiff line change
@@ -783,7 +783,12 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
783783
dest.emit('pipe', src);
784784

785785
// Start the flow if it hasn't been started already.
786-
if (!state.flowing) {
786+
787+
if (dest.writableNeedDrain === true) {
788+
if (state.flowing) {
789+
src.pause();
790+
}
791+
} else if (!state.flowing) {
787792
debug('pipe resume');
788793
src.resume();
789794
}

lib/internal/streams/writable.js

+8
Original file line numberDiff line numberDiff line change
@@ -805,6 +805,14 @@ ObjectDefineProperties(Writable.prototype, {
805805
}
806806
},
807807

808+
writableNeedDrain: {
809+
get() {
810+
const wState = this._writableState;
811+
if (!wState) return false;
812+
return !wState.destroyed && !wState.ending && wState.needDrain;
813+
}
814+
},
815+
808816
writableHighWaterMark: {
809817
get() {
810818
return this._writableState && this._writableState.highWaterMark;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const assert = require('assert');
5+
const Readable = require('_stream_readable');
6+
const Writable = require('_stream_writable');
7+
8+
// Pipe should not continue writing if writable needs drain.
9+
{
10+
const w = new Writable({
11+
write(buf, encoding, callback) {
12+
13+
}
14+
});
15+
16+
while (w.write('asd'));
17+
18+
assert.strictEqual(w.writableNeedDrain, true);
19+
20+
const r = new Readable({
21+
read() {
22+
this.push('asd');
23+
}
24+
});
25+
26+
w.write = common.mustNotCall();
27+
28+
r.pipe(w);
29+
}

0 commit comments

Comments
 (0)