Skip to content

Commit d68d0ea

Browse files
authored
http: reduce parts in chunked response when corking
Refs: nodejs/performance#57 PR-URL: #50167 Reviewed-By: Stephen Belanger <admin@stephenbelanger.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Yagiz Nizipli <yagiz@nizipli.com>
1 parent 8609915 commit d68d0ea

File tree

1 file changed

+87
-41
lines changed

1 file changed

+87
-41
lines changed

lib/_http_outgoing.js

+87-41
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,9 @@ let debug = require('internal/util/debuglog').debuglog('http', (fn) => {
8282
});
8383

8484
const kCorked = Symbol('corked');
85+
const kSocket = Symbol('kSocket');
86+
const kChunkedBuffer = Symbol('kChunkedBuffer');
87+
const kChunkedLength = Symbol('kChunkedLength');
8588
const kUniqueHeaders = Symbol('kUniqueHeaders');
8689
const kBytesWritten = Symbol('kBytesWritten');
8790
const kErrored = Symbol('errored');
@@ -140,9 +143,11 @@ function OutgoingMessage(options) {
140143
this.finished = false;
141144
this._headerSent = false;
142145
this[kCorked] = 0;
146+
this[kChunkedBuffer] = [];
147+
this[kChunkedLength] = 0;
143148
this._closed = false;
144149

145-
this.socket = null;
150+
this[kSocket] = null;
146151
this._header = null;
147152
this[kOutHeaders] = null;
148153

@@ -177,7 +182,7 @@ ObjectDefineProperty(OutgoingMessage.prototype, 'writableFinished', {
177182
return (
178183
this.finished &&
179184
this.outputSize === 0 &&
180-
(!this.socket || this.socket.writableLength === 0)
185+
(!this[kSocket] || this[kSocket].writableLength === 0)
181186
);
182187
},
183188
});
@@ -192,22 +197,21 @@ ObjectDefineProperty(OutgoingMessage.prototype, 'writableObjectMode', {
192197
ObjectDefineProperty(OutgoingMessage.prototype, 'writableLength', {
193198
__proto__: null,
194199
get() {
195-
return this.outputSize + (this.socket ? this.socket.writableLength : 0);
200+
return this.outputSize + this[kChunkedLength] + (this[kSocket] ? this[kSocket].writableLength : 0);
196201
},
197202
});
198203

199204
ObjectDefineProperty(OutgoingMessage.prototype, 'writableHighWaterMark', {
200205
__proto__: null,
201206
get() {
202-
return this.socket ? this.socket.writableHighWaterMark : this[kHighWaterMark];
207+
return this[kSocket] ? this[kSocket].writableHighWaterMark : this[kHighWaterMark];
203208
},
204209
});
205210

206211
ObjectDefineProperty(OutgoingMessage.prototype, 'writableCorked', {
207212
__proto__: null,
208213
get() {
209-
const corked = this.socket ? this.socket.writableCorked : 0;
210-
return corked + this[kCorked];
214+
return this[kCorked];
211215
},
212216
});
213217

@@ -235,13 +239,27 @@ ObjectDefineProperty(OutgoingMessage.prototype, '_headers', {
235239
ObjectDefineProperty(OutgoingMessage.prototype, 'connection', {
236240
__proto__: null,
237241
get: function() {
238-
return this.socket;
242+
return this[kSocket];
239243
},
240244
set: function(val) {
241245
this.socket = val;
242246
},
243247
});
244248

249+
ObjectDefineProperty(OutgoingMessage.prototype, 'socket', {
250+
__proto__: null,
251+
get: function() {
252+
return this[kSocket];
253+
},
254+
set: function(val) {
255+
for (let n = 0; n < this[kCorked]; n++) {
256+
val?.cork();
257+
this[kSocket]?.uncork();
258+
}
259+
this[kSocket] = val;
260+
},
261+
});
262+
245263
ObjectDefineProperty(OutgoingMessage.prototype, '_headerNames', {
246264
__proto__: null,
247265
get: internalUtil.deprecate(function() {
@@ -299,19 +317,45 @@ OutgoingMessage.prototype._renderHeaders = function _renderHeaders() {
299317
};
300318

301319
OutgoingMessage.prototype.cork = function() {
302-
if (this.socket) {
303-
this.socket.cork();
304-
} else {
305-
this[kCorked]++;
320+
this[kCorked]++;
321+
if (this[kSocket]) {
322+
this[kSocket].cork();
306323
}
307324
};
308325

309326
OutgoingMessage.prototype.uncork = function() {
310-
if (this.socket) {
311-
this.socket.uncork();
312-
} else if (this[kCorked]) {
313-
this[kCorked]--;
327+
this[kCorked]--;
328+
if (this[kSocket]) {
329+
this[kSocket].uncork();
330+
}
331+
332+
if (this[kCorked] || this[kChunkedBuffer].length === 0) {
333+
return;
314334
}
335+
336+
const len = this[kChunkedLength];
337+
const buf = this[kChunkedBuffer];
338+
339+
assert(this.chunkedEncoding);
340+
341+
let callbacks;
342+
this._send(NumberPrototypeToString(len, 16), 'latin1', null);
343+
this._send(crlf_buf, null, null);
344+
for (let n = 0; n < buf.length; n += 3) {
345+
this._send(buf[n + 0], buf[n + 1], null);
346+
if (buf[n + 2]) {
347+
callbacks ??= [];
348+
callbacks.push(buf[n + 2]);
349+
}
350+
}
351+
this._send(crlf_buf, null, callbacks.length ? (err) => {
352+
for (const callback of callbacks) {
353+
callback(err);
354+
}
355+
} : null);
356+
357+
this[kChunkedBuffer].length = 0;
358+
this[kChunkedLength] = 0;
315359
};
316360

317361
OutgoingMessage.prototype.setTimeout = function setTimeout(msecs, callback) {
@@ -320,12 +364,12 @@ OutgoingMessage.prototype.setTimeout = function setTimeout(msecs, callback) {
320364
this.on('timeout', callback);
321365
}
322366

323-
if (!this.socket) {
367+
if (!this[kSocket]) {
324368
this.once('socket', function socketSetTimeoutOnConnect(socket) {
325369
socket.setTimeout(msecs);
326370
});
327371
} else {
328-
this.socket.setTimeout(msecs);
372+
this[kSocket].setTimeout(msecs);
329373
}
330374
return this;
331375
};
@@ -342,8 +386,8 @@ OutgoingMessage.prototype.destroy = function destroy(error) {
342386

343387
this[kErrored] = error;
344388

345-
if (this.socket) {
346-
this.socket.destroy(error);
389+
if (this[kSocket]) {
390+
this[kSocket].destroy(error);
347391
} else {
348392
this.once('socket', function socketDestroyOnConnect(socket) {
349393
socket.destroy(error);
@@ -382,7 +426,7 @@ OutgoingMessage.prototype._send = function _send(data, encoding, callback, byteL
382426

383427
OutgoingMessage.prototype._writeRaw = _writeRaw;
384428
function _writeRaw(data, encoding, callback, size) {
385-
const conn = this.socket;
429+
const conn = this[kSocket];
386430
if (conn && conn.destroyed) {
387431
// The socket was destroyed. If we're still trying to write to it,
388432
// then we haven't gotten the 'close' event yet.
@@ -938,10 +982,16 @@ function write_(msg, chunk, encoding, callback, fromEnd) {
938982
let ret;
939983
if (msg.chunkedEncoding && chunk.length !== 0) {
940984
len ??= typeof chunk === 'string' ? Buffer.byteLength(chunk, encoding) : chunk.byteLength;
941-
msg._send(NumberPrototypeToString(len, 16), 'latin1', null);
942-
msg._send(crlf_buf, null, null);
943-
msg._send(chunk, encoding, null, len);
944-
ret = msg._send(crlf_buf, null, callback);
985+
if (msg[kCorked] && msg._headerSent) {
986+
msg[kChunkedBuffer].push(chunk, encoding, callback);
987+
msg[kChunkedLength] += len;
988+
ret = msg[kChunkedLength] < msg[kHighWaterMark];
989+
} else {
990+
msg._send(NumberPrototypeToString(len, 16), 'latin1', null);
991+
msg._send(crlf_buf, null, null);
992+
msg._send(chunk, encoding, null, len);
993+
ret = msg._send(crlf_buf, null, callback);
994+
}
945995
} else {
946996
ret = msg._send(chunk, encoding, callback, len);
947997
}
@@ -1023,8 +1073,8 @@ OutgoingMessage.prototype.end = function end(chunk, encoding, callback) {
10231073
return this;
10241074
}
10251075

1026-
if (this.socket) {
1027-
this.socket.cork();
1076+
if (this[kSocket]) {
1077+
this[kSocket].cork();
10281078
}
10291079

10301080
write_(this, chunk, encoding, null, true);
@@ -1038,8 +1088,8 @@ OutgoingMessage.prototype.end = function end(chunk, encoding, callback) {
10381088
}
10391089
return this;
10401090
} else if (!this._header) {
1041-
if (this.socket) {
1042-
this.socket.cork();
1091+
if (this[kSocket]) {
1092+
this[kSocket].cork();
10431093
}
10441094

10451095
this._contentLength = 0;
@@ -1063,21 +1113,22 @@ OutgoingMessage.prototype.end = function end(chunk, encoding, callback) {
10631113
process.nextTick(finish);
10641114
}
10651115

1066-
if (this.socket) {
1116+
if (this[kSocket]) {
10671117
// Fully uncork connection on end().
1068-
this.socket._writableState.corked = 1;
1069-
this.socket.uncork();
1118+
this[kSocket]._writableState.corked = 1;
1119+
this[kSocket].uncork();
10701120
}
1071-
this[kCorked] = 0;
1121+
this[kCorked] = 1;
1122+
this.uncork();
10721123

10731124
this.finished = true;
10741125

10751126
// There is the first message on the outgoing queue, and we've sent
10761127
// everything to the socket.
10771128
debug('outgoing message end.');
10781129
if (this.outputData.length === 0 &&
1079-
this.socket &&
1080-
this.socket._httpMessage === this) {
1130+
this[kSocket] &&
1131+
this[kSocket]._httpMessage === this) {
10811132
this._finish();
10821133
}
10831134

@@ -1088,7 +1139,7 @@ OutgoingMessage.prototype.end = function end(chunk, encoding, callback) {
10881139
// This function is called once all user data are flushed to the socket.
10891140
// Note that it has a chance that the socket is not drained.
10901141
OutgoingMessage.prototype._finish = function _finish() {
1091-
assert(this.socket);
1142+
assert(this[kSocket]);
10921143
this.emit('prefinish');
10931144
};
10941145

@@ -1113,7 +1164,7 @@ OutgoingMessage.prototype._finish = function _finish() {
11131164
// This function, _flush(), is called by both the Server and Client
11141165
// to attempt to flush any pending messages out to the socket.
11151166
OutgoingMessage.prototype._flush = function _flush() {
1116-
const socket = this.socket;
1167+
const socket = this[kSocket];
11171168

11181169
if (socket && socket.writable) {
11191170
// There might be remaining data in this.output; write it out
@@ -1130,11 +1181,6 @@ OutgoingMessage.prototype._flush = function _flush() {
11301181
};
11311182

11321183
OutgoingMessage.prototype._flushOutput = function _flushOutput(socket) {
1133-
while (this[kCorked]) {
1134-
this[kCorked]--;
1135-
socket.cork();
1136-
}
1137-
11381184
const outputLength = this.outputData.length;
11391185
if (outputLength <= 0)
11401186
return undefined;

0 commit comments

Comments
 (0)