Skip to content

Commit e2f5bb7

Browse files
committed
http: align with stream.Writable
Futher aligns OutgoingMessage with stream.Writable. In particular re-uses the construct/destroy logic from streams. Due to a lot of subtle assumptions this PR unfortunately touches a lot of different parts. PR-URL: #36816 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
1 parent 38f3238 commit e2f5bb7

8 files changed

+245
-232
lines changed

lib/_http_client.js

+32-20
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ const Agent = require('_http_agent');
5858
const { Buffer } = require('buffer');
5959
const { defaultTriggerAsyncIdScope } = require('internal/async_hooks');
6060
const { URL, urlToHttpOptions, searchParamsSymbol } = require('internal/url');
61-
const { kOutHeaders, kNeedDrain } = require('internal/http');
61+
const { kOutHeaders } = require('internal/http');
6262
const { connResetException, codes } = require('internal/errors');
6363
const {
6464
ERR_HTTP_HEADERS_SENT,
@@ -98,7 +98,7 @@ class HTTPClientAsyncResource {
9898
}
9999

100100
function ClientRequest(input, options, cb) {
101-
FunctionPrototypeCall(OutgoingMessage, this);
101+
FunctionPrototypeCall(OutgoingMessage, this, { autoDestroy: false });
102102

103103
if (typeof input === 'string') {
104104
const urlStr = input;
@@ -298,7 +298,7 @@ function ClientRequest(input, options, cb) {
298298
if (typeof options.createConnection === 'function') {
299299
const oncreate = once((err, socket) => {
300300
if (err) {
301-
process.nextTick(() => this.emit('error', err));
301+
process.nextTick(() => emitError(this, err));
302302
} else {
303303
this.onSocket(socket);
304304
}
@@ -366,8 +366,8 @@ function emitAbortNT(req) {
366366

367367
function ondrain() {
368368
const msg = this._httpMessage;
369-
if (msg && !msg.finished && msg[kNeedDrain]) {
370-
msg[kNeedDrain] = false;
369+
if (msg && !msg.finished && msg._writableState.needDrain) {
370+
msg._writableState.needDrain = false;
371371
msg.emit('drain');
372372
}
373373
}
@@ -393,8 +393,7 @@ function socketCloseListener() {
393393
if (!res.complete) {
394394
res.destroy(connResetException('aborted'));
395395
}
396-
req._closed = true;
397-
req.emit('close');
396+
emitClose(req);
398397
if (!res.aborted && res.readable) {
399398
res.push(null);
400399
}
@@ -404,10 +403,9 @@ function socketCloseListener() {
404403
// receive a response. The error needs to
405404
// fire on the request.
406405
req.socket._hadError = true;
407-
req.emit('error', connResetException('socket hang up'));
406+
emitError(req, connResetException('socket hang up'));
408407
}
409-
req._closed = true;
410-
req.emit('close');
408+
emitClose(req);
411409
}
412410

413411
// Too bad. That output wasn't getting written.
@@ -431,7 +429,7 @@ function socketErrorListener(err) {
431429
// For Safety. Some additional errors might fire later on
432430
// and we need to make sure we don't double-fire the error event.
433431
req.socket._hadError = true;
434-
req.emit('error', err);
432+
emitError(req, err);
435433
}
436434

437435
const parser = socket.parser;
@@ -455,7 +453,7 @@ function socketOnEnd() {
455453
// If we don't have a response then we know that the socket
456454
// ended prematurely and we need to emit an error on the request.
457455
req.socket._hadError = true;
458-
req.emit('error', connResetException('socket hang up'));
456+
emitError(req, connResetException('socket hang up'));
459457
}
460458
if (parser) {
461459
parser.finish();
@@ -478,7 +476,7 @@ function socketOnData(d) {
478476
freeParser(parser, req, socket);
479477
socket.destroy();
480478
req.socket._hadError = true;
481-
req.emit('error', ret);
479+
emitError(req, ret);
482480
} else if (parser.incoming && parser.incoming.upgrade) {
483481
// Upgrade (if status code 101) or CONNECT
484482
const bytesParsed = ret;
@@ -510,9 +508,7 @@ function socketOnData(d) {
510508
socket.readableFlowing = null;
511509

512510
req.emit(eventName, res, socket, bodyHead);
513-
req.destroyed = true;
514-
req._closed = true;
515-
req.emit('close');
511+
emitClose(req);
516512
} else {
517513
// Requested Upgrade or used CONNECT method, but have no handler.
518514
socket.destroy();
@@ -697,8 +693,7 @@ function requestOnPrefinish() {
697693
}
698694

699695
function emitFreeNT(req) {
700-
req._closed = true;
701-
req.emit('close');
696+
emitClose(req);
702697
if (req.socket) {
703698
req.socket.emit('free');
704699
}
@@ -779,10 +774,10 @@ function onSocketNT(req, socket, err) {
779774
err = connResetException('socket hang up');
780775
}
781776
if (err) {
782-
req.emit('error', err);
777+
emitError(req, err);
783778
}
784779
req._closed = true;
785-
req.emit('close');
780+
emitClose(req);
786781
}
787782

788783
if (socket) {
@@ -862,6 +857,23 @@ function setSocketTimeout(sock, msecs) {
862857
}
863858
}
864859

860+
function emitError(req, err) {
861+
req.destroyed = true;
862+
req._writableState.errored = err;
863+
// Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
864+
err.stack; // eslint-disable-line no-unused-expressions
865+
req._writableState.errorEmitted = true;
866+
req.emit('error', err);
867+
}
868+
869+
function emitClose(req) {
870+
req.destroyed = true;
871+
req._closed = true;
872+
req._writableState.closed = true;
873+
req._writableState.closeEmitted = true;
874+
req.emit('close');
875+
}
876+
865877
ClientRequest.prototype.setNoDelay = function setNoDelay(noDelay) {
866878
this._deferToConnect('setNoDelay', [noDelay]);
867879
};

0 commit comments

Comments
 (0)