Skip to content

Commit 912eb30

Browse files
ronagdanielleadams
authored andcommitted
stream: add setter & getter for default highWaterMark (#46929)
Adds stream.(get|set)DefaultHighWaterMark to read or update the default hwm. PR-URL: #46929 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Paolo Insogna <paolo@cowtech.it> Reviewed-By: Moshe Atlow <moshe@atlow.co.il> Reviewed-By: Luigi Pinca <luigipinca@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Michael Dawson <midawson@redhat.com> Reviewed-By: Erick Wendel <erick.workspace@gmail.com>
1 parent b620f5f commit 912eb30

File tree

5 files changed

+81
-5
lines changed

5 files changed

+81
-5
lines changed

doc/api/stream.md

+23
Original file line numberDiff line numberDiff line change
@@ -3324,6 +3324,29 @@ reader.read().then(({ value, done }) => {
33243324
});
33253325
```
33263326

3327+
### `stream.getDefaultHighWaterMark(objectMode)`
3328+
3329+
<!-- YAML
3330+
added: REPLACEME
3331+
-->
3332+
3333+
* {boolean} objectMode
3334+
* Returns: {integer}
3335+
3336+
Returns the default highWaterMark used by streams.
3337+
Defaults to `16384` (16 KiB), or `16` for `objectMode`.
3338+
3339+
### `stream.setDefaultHighWaterMark(objectMode, value)`
3340+
3341+
<!-- YAML
3342+
added: REPLACEME
3343+
-->
3344+
3345+
* {boolean} objectMode
3346+
* {integer} highWaterMark value
3347+
3348+
Sets the default highWaterMark used by streams.
3349+
33273350
## API for stream implementers
33283351

33293352
<!--type=misc-->

lib/_http_outgoing.js

+4-4
Original file line numberDiff line numberDiff line change
@@ -81,12 +81,11 @@ let debug = require('internal/util/debuglog').debuglog('http', (fn) => {
8181
debug = fn;
8282
});
8383

84-
const HIGH_WATER_MARK = getDefaultHighWaterMark();
85-
8684
const kCorked = Symbol('corked');
8785
const kUniqueHeaders = Symbol('kUniqueHeaders');
8886
const kBytesWritten = Symbol('kBytesWritten');
8987
const kErrored = Symbol('errored');
88+
const kHighWaterMark = Symbol('kHighWaterMark');
9089

9190
const nop = () => {};
9291

@@ -151,6 +150,7 @@ function OutgoingMessage() {
151150
this._onPendingData = nop;
152151

153152
this[kErrored] = null;
153+
this[kHighWaterMark] = getDefaultHighWaterMark();
154154
}
155155
ObjectSetPrototypeOf(OutgoingMessage.prototype, Stream.prototype);
156156
ObjectSetPrototypeOf(OutgoingMessage, Stream);
@@ -197,7 +197,7 @@ ObjectDefineProperty(OutgoingMessage.prototype, 'writableLength', {
197197
ObjectDefineProperty(OutgoingMessage.prototype, 'writableHighWaterMark', {
198198
__proto__: null,
199199
get() {
200-
return this.socket ? this.socket.writableHighWaterMark : HIGH_WATER_MARK;
200+
return this.socket ? this.socket.writableHighWaterMark : this[kHighWaterMark];
201201
},
202202
});
203203

@@ -404,7 +404,7 @@ function _writeRaw(data, encoding, callback, size) {
404404
this.outputData.push({ data, encoding, callback });
405405
this.outputSize += data.length;
406406
this._onPendingData(data.length);
407-
return this.outputSize < HIGH_WATER_MARK;
407+
return this.outputSize < this[kHighWaterMark];
408408
}
409409

410410

lib/internal/streams/state.js

+15-1
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,29 @@ const {
44
MathFloor,
55
NumberIsInteger,
66
} = primordials;
7+
const { validateInteger } = require('internal/validators');
78

89
const { ERR_INVALID_ARG_VALUE } = require('internal/errors').codes;
910

11+
let defaultHighWaterMarkBytes = 16 * 1024;
12+
let defaultHighWaterMarkObjectMode = 16;
13+
1014
function highWaterMarkFrom(options, isDuplex, duplexKey) {
1115
return options.highWaterMark != null ? options.highWaterMark :
1216
isDuplex ? options[duplexKey] : null;
1317
}
1418

1519
function getDefaultHighWaterMark(objectMode) {
16-
return objectMode ? 16 : 16 * 1024;
20+
return objectMode ? defaultHighWaterMarkObjectMode : defaultHighWaterMarkBytes;
21+
}
22+
23+
function setDefaultHighWaterMark(objectMode, value) {
24+
validateInteger(value, 'value', 0);
25+
if (objectMode) {
26+
defaultHighWaterMarkObjectMode = value;
27+
} else {
28+
defaultHighWaterMarkBytes = value;
29+
}
1730
}
1831

1932
function getHighWaterMark(state, options, duplexKey, isDuplex) {
@@ -33,4 +46,5 @@ function getHighWaterMark(state, options, duplexKey, isDuplex) {
3346
module.exports = {
3447
getHighWaterMark,
3548
getDefaultHighWaterMark,
49+
setDefaultHighWaterMark,
3650
};

lib/stream.js

+3
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ const {
4242
},
4343
} = require('internal/errors');
4444
const compose = require('internal/streams/compose');
45+
const { setDefaultHighWaterMark, getDefaultHighWaterMark } = require('internal/streams/state');
4546
const { pipeline } = require('internal/streams/pipeline');
4647
const { destroyer } = require('internal/streams/destroy');
4748
const eos = require('internal/streams/end-of-stream');
@@ -105,6 +106,8 @@ Stream.addAbortSignal = addAbortSignal;
105106
Stream.finished = eos;
106107
Stream.destroy = destroyer;
107108
Stream.compose = compose;
109+
Stream.setDefaultHighWaterMark = setDefaultHighWaterMark;
110+
Stream.getDefaultHighWaterMark = getDefaultHighWaterMark;
108111

109112
ObjectDefineProperty(Stream, 'promises', {
110113
__proto__: null,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
'use strict';
2+
3+
require('../common');
4+
5+
const assert = require('node:assert');
6+
const {
7+
setDefaultHighWaterMark,
8+
getDefaultHighWaterMark,
9+
Writable,
10+
Readable,
11+
Transform
12+
} = require('stream');
13+
14+
assert.notStrictEqual(getDefaultHighWaterMark(false), 32 * 1000);
15+
setDefaultHighWaterMark(false, 32 * 1000);
16+
assert.strictEqual(getDefaultHighWaterMark(false), 32 * 1000);
17+
18+
assert.notStrictEqual(getDefaultHighWaterMark(true), 32);
19+
setDefaultHighWaterMark(true, 32);
20+
assert.strictEqual(getDefaultHighWaterMark(true), 32);
21+
22+
const w = new Writable({
23+
write() {}
24+
});
25+
assert.strictEqual(w.writableHighWaterMark, 32 * 1000);
26+
27+
const r = new Readable({
28+
read() {}
29+
});
30+
assert.strictEqual(r.readableHighWaterMark, 32 * 1000);
31+
32+
const t = new Transform({
33+
transform() {}
34+
});
35+
assert.strictEqual(t.writableHighWaterMark, 32 * 1000);
36+
assert.strictEqual(t.readableHighWaterMark, 32 * 1000);

0 commit comments

Comments
 (0)