Skip to content

Commit 57c1129

Browse files
committed
quic: implement QuicSession close as promise
PR-URL: #34283 Reviewed-By: Anna Henningsen <anna@addaleax.net>
1 parent 8e5c5b1 commit 57c1129

File tree

2 files changed

+80
-80
lines changed

2 files changed

+80
-80
lines changed

doc/api/quic.md

+4-5
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,7 @@ Binds the `QuicEndpoint` if it has not already been bound. User code will
352352
not typically be responsible for binding a `QuicEndpoint` as the owning
353353
`QuicSocket` will do that automatically.
354354

355-
* `options` {object}
355+
* `options` {Object}
356356
* `signal` {AbortSignal} Optionally allows the `bind()` to be canceled
357357
using an `AbortController`.
358358
* Returns: {Promise}
@@ -821,17 +821,16 @@ added: REPLACEME
821821

822822
Information about the cipher algorithm selected for the session.
823823

824-
#### quicsession.close(\[callback\])
824+
#### quicsession.close()
825825
<!-- YAML
826826
added: REPLACEME
827827
-->
828828

829-
* `callback` {Function} Callback invoked when the close operation is completed
830-
831829
Begins a graceful close of the `QuicSession`. Existing `QuicStream` instances
832830
will be permitted to close naturally. New `QuicStream` instances will not be
833831
permitted. Once all `QuicStream` instances have closed, the `QuicSession`
834-
instance will be destroyed.
832+
instance will be destroyed. Returns a `Promise` that is resolved once the
833+
`QuicSession` instance is destroyed.
835834

836835
#### quicsession.closeCode
837836
<!-- YAML

lib/internal/quic/core.js

+76-75
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ const {
1818
Number,
1919
Promise,
2020
PromiseAll,
21+
PromiseReject,
2122
RegExp,
2223
Set,
2324
Symbol,
@@ -98,7 +99,6 @@ const {
9899
const {
99100
codes: {
100101
ERR_INVALID_ARG_TYPE,
101-
ERR_INVALID_CALLBACK,
102102
ERR_INVALID_STATE,
103103
ERR_OPERATION_FAILED,
104104
ERR_QUIC_FAILED_TO_CREATE_SESSION,
@@ -795,7 +795,7 @@ class QuicEndpoint {
795795

796796
[kClose]() {
797797
if (this.destroyed) {
798-
return Promise.reject(
798+
return PromiseReject(
799799
new ERR_INVALID_STATE('QuicEndpoint is already destroyed'));
800800
}
801801
const promise = deferredClosePromise(this[kInternalState]);
@@ -1392,7 +1392,7 @@ class QuicSocket extends EventEmitter {
13921392

13931393
[kClose]() {
13941394
if (this.destroyed) {
1395-
return Promise.reject(
1395+
return PromiseReject(
13961396
new ERR_INVALID_STATE('QuicSocket is already destroyed'));
13971397
}
13981398
const state = this[kInternalState];
@@ -1413,14 +1413,13 @@ class QuicSocket extends EventEmitter {
14131413
return promise;
14141414
}
14151415

1416-
// Otherwise, loop through each of the known sessions
1417-
// and close them.
1418-
// TODO(@jasnell): These will be promises soon, but we
1419-
// do not want to await them.
1420-
for (const session of state.sessions)
1421-
session.close();
1422-
1423-
return promise;
1416+
// Otherwise, loop through each of the known sessions and close them.
1417+
const reqs = [promise];
1418+
for (const session of state.sessions) {
1419+
reqs.push(session.close()
1420+
.catch((error) => this.destroy(error)));
1421+
}
1422+
return PromiseAll(reqs);
14241423
}
14251424

14261425
// Initiate an abrupt close and destruction of the QuicSocket.
@@ -1656,7 +1655,9 @@ class QuicSession extends EventEmitter {
16561655
cipherVersion: undefined,
16571656
closeCode: NGTCP2_NO_ERROR,
16581657
closeFamily: QUIC_ERROR_APPLICATION,
1659-
closing: false,
1658+
closePromise: undefined,
1659+
closePromiseResolve: undefined,
1660+
closePromiseReject: undefined,
16601661
destroyed: false,
16611662
earlyData: false,
16621663
handshakeComplete: false,
@@ -1768,17 +1769,6 @@ class QuicSession extends EventEmitter {
17681769
this.destroy(err);
17691770
}
17701771

1771-
// Causes the QuicSession to be immediately destroyed, but with
1772-
// additional metadata set.
1773-
[kDestroy](code, family, silent, statelessReset) {
1774-
const state = this[kInternalState];
1775-
state.closeCode = code;
1776-
state.closeFamily = family;
1777-
state.silentClose = silent;
1778-
state.statelessReset = statelessReset;
1779-
this.destroy();
1780-
}
1781-
17821772
// Closes the specified stream with the given code. The
17831773
// QuicStream object will be destroyed.
17841774
[kStreamClose](id, code) {
@@ -1789,23 +1779,23 @@ class QuicSession extends EventEmitter {
17891779
stream.destroy();
17901780
}
17911781

1792-
// Delivers a block of headers to the appropriate QuicStream
1793-
// instance. This will only be called if the ALPN selected
1794-
// is known to support headers.
1795-
[kHeaders](id, headers, kind, push_id) {
1782+
[kStreamReset](id, code) {
17961783
const stream = this[kInternalState].streams.get(id);
17971784
if (stream === undefined)
17981785
return;
17991786

1800-
stream[kHeaders](headers, kind, push_id);
1787+
stream[kStreamReset](code);
18011788
}
18021789

1803-
[kStreamReset](id, code) {
1790+
// Delivers a block of headers to the appropriate QuicStream
1791+
// instance. This will only be called if the ALPN selected
1792+
// is known to support headers.
1793+
[kHeaders](id, headers, kind, push_id) {
18041794
const stream = this[kInternalState].streams.get(id);
18051795
if (stream === undefined)
18061796
return;
18071797

1808-
stream[kStreamReset](code);
1798+
stream[kHeaders](headers, kind, push_id);
18091799
}
18101800

18111801
[kInspect](depth, options) {
@@ -1850,8 +1840,13 @@ class QuicSession extends EventEmitter {
18501840
if (!this[kHandshakePost]())
18511841
return;
18521842

1853-
process.nextTick(
1854-
emit.bind(this, 'secure', servername, alpn, this.cipher));
1843+
process.nextTick(() => {
1844+
try {
1845+
this.emit('secure', servername, alpn, this.cipher);
1846+
} catch (error) {
1847+
this.destroy(error);
1848+
}
1849+
});
18551850
}
18561851

18571852
// Non-op for the default case. QuicClientSession
@@ -1863,10 +1858,10 @@ class QuicSession extends EventEmitter {
18631858

18641859
[kRemoveStream](stream) {
18651860
this[kInternalState].streams.delete(stream.id);
1861+
this[kMaybeDestroy]();
18661862
}
18671863

18681864
[kAddStream](id, stream) {
1869-
stream.once('close', this[kMaybeDestroy].bind(this));
18701865
this[kInternalState].streams.set(id, stream);
18711866
}
18721867

@@ -1875,49 +1870,55 @@ class QuicSession extends EventEmitter {
18751870
// informationational notification. It is not called on
18761871
// server QuicSession instances.
18771872
[kUsePreferredAddress](address) {
1878-
process.nextTick(
1879-
emit.bind(this, 'usePreferredAddress', address));
1873+
process.nextTick(() => {
1874+
try {
1875+
this.emit('usePreferredAddress', address);
1876+
} catch (error) {
1877+
this.destroy(error);
1878+
}
1879+
});
1880+
}
1881+
1882+
close() {
1883+
return this[kInternalState].closePromise || this[kClose]();
1884+
}
1885+
1886+
[kClose]() {
1887+
if (this.destroyed) {
1888+
return PromiseReject(
1889+
new ERR_INVALID_STATE('QuicSession is already destroyed'));
1890+
}
1891+
const promise = deferredClosePromise(this[kInternalState]);
1892+
if (!this[kMaybeDestroy]()) {
1893+
this[kHandle].gracefulClose();
1894+
}
1895+
return promise;
1896+
}
1897+
1898+
get closing() {
1899+
return this[kInternalState].closePromise !== undefined;
18801900
}
18811901

18821902
// The QuicSession will be destroyed if close() has been
18831903
// called and there are no remaining streams
18841904
[kMaybeDestroy]() {
18851905
const state = this[kInternalState];
1886-
if (state.closing && state.streams.size === 0) {
1906+
if (this.closing && state.streams.size === 0) {
18871907
this.destroy();
18881908
return true;
18891909
}
18901910
return false;
18911911
}
18921912

1893-
// Closing allows any existing QuicStream's to gracefully
1894-
// complete while disallowing any new QuicStreams from being
1895-
// opened (in either direction). Calls to openStream() will
1896-
// fail, and new streams from the peer will be rejected/ignored.
1897-
close(callback) {
1913+
// Causes the QuicSession to be immediately destroyed, but with
1914+
// additional metadata set.
1915+
[kDestroy](code, family, silent, statelessReset) {
18981916
const state = this[kInternalState];
1899-
if (state.destroyed) {
1900-
throw new ERR_INVALID_STATE(
1901-
`${this.constructor.name} is already destroyed`);
1902-
}
1903-
1904-
if (callback) {
1905-
if (typeof callback !== 'function')
1906-
throw new ERR_INVALID_CALLBACK();
1907-
this.once('close', callback);
1908-
}
1909-
1910-
// If we're already closing, do nothing else.
1911-
// Callback will be invoked once the session
1912-
// has been destroyed
1913-
if (state.closing)
1914-
return;
1915-
state.closing = true;
1916-
1917-
// If there are no active streams, we can close immediately,
1918-
// otherwise set the graceful close flag.
1919-
if (!this[kMaybeDestroy]())
1920-
this[kHandle].gracefulClose();
1917+
state.closeCode = code;
1918+
state.closeFamily = family;
1919+
state.silentClose = silent;
1920+
state.statelessReset = statelessReset;
1921+
this.destroy();
19211922
}
19221923

19231924
// Destroying synchronously shuts down and frees the
@@ -1939,7 +1940,6 @@ class QuicSession extends EventEmitter {
19391940
if (state.destroyed)
19401941
return;
19411942
state.destroyed = true;
1942-
state.closing = false;
19431943

19441944
// Destroy any pending streams immediately. These
19451945
// are streams that have been created but have not
@@ -1974,7 +1974,13 @@ class QuicSession extends EventEmitter {
19741974

19751975
// If we are destroying with an error, schedule the
19761976
// error to be emitted on process.nextTick.
1977-
if (error) process.nextTick(emit.bind(this, 'error', error));
1977+
if (error) {
1978+
if (typeof state.closePromiseReject === 'function')
1979+
state.closePromiseReject(error);
1980+
process.nextTick(emit.bind(this, 'error', error));
1981+
} else if (typeof state.closePromiseResolve === 'function')
1982+
state.closePromiseResolve();
1983+
19781984
process.nextTick(emit.bind(this, 'close'));
19791985
}
19801986

@@ -2100,10 +2106,6 @@ class QuicSession extends EventEmitter {
21002106
return this[kInternalState].destroyed;
21012107
}
21022108

2103-
get closing() {
2104-
return this[kInternalState].closing;
2105-
}
2106-
21072109
get closeCode() {
21082110
const state = this[kInternalState];
21092111
return {
@@ -2123,11 +2125,11 @@ class QuicSession extends EventEmitter {
21232125

21242126
openStream(options) {
21252127
const state = this[kInternalState];
2126-
if (state.destroyed) {
2128+
if (this.destroyed) {
21272129
throw new ERR_INVALID_STATE(
21282130
`${this.constructor.name} is already destroyed`);
21292131
}
2130-
if (state.closing) {
2132+
if (this.closing) {
21312133
throw new ERR_INVALID_STATE(
21322134
`${this.constructor.name} is closing`);
21332135
}
@@ -2255,11 +2257,11 @@ class QuicSession extends EventEmitter {
22552257
updateKey() {
22562258
const state = this[kInternalState];
22572259
// Initiates a key update for the connection.
2258-
if (state.destroyed) {
2260+
if (this.destroyed) {
22592261
throw new ERR_INVALID_STATE(
22602262
`${this.constructor.name} is already destroyed`);
22612263
}
2262-
if (state.closing) {
2264+
if (this.closing) {
22632265
throw new ERR_INVALID_STATE(
22642266
`${this.constructor.name} is closing`);
22652267
}
@@ -2282,7 +2284,6 @@ class QuicSession extends EventEmitter {
22822284
return this[kHandle].removeFromSocket();
22832285
}
22842286
}
2285-
22862287
class QuicServerSession extends QuicSession {
22872288
[kInternalServerState] = {
22882289
contexts: []
@@ -2914,7 +2915,6 @@ class QuicStream extends Duplex {
29142915

29152916
_destroy(error, callback) {
29162917
const state = this[kInternalState];
2917-
state.session[kRemoveStream](this);
29182918
const handle = this[kHandle];
29192919
// Do not use handle after this point as the underlying C++
29202920
// object has been destroyed. Any attempt to use the object
@@ -2925,6 +2925,7 @@ class QuicStream extends Duplex {
29252925
state.stats = new BigInt64Array(handle.stats);
29262926
handle.destroy();
29272927
}
2928+
state.session[kRemoveStream](this);
29282929
// The destroy callback must be invoked in a nextTick
29292930
process.nextTick(() => callback(error));
29302931
}

0 commit comments

Comments
 (0)