Skip to content

Commit 6e30fe7

Browse files
committed
quic: convert openStream to Promise
Although most of the time openStream will be able to create the stream immediately, when a stream is opened before the handshake is complete we have to wait for the handshake to be complete before continuing. PR-URL: #34351 Reviewed-By: Robert Nagy <ronagy@icloud.com> Reviewed-By: Anna Henningsen <anna@addaleax.net>
1 parent 69c78be commit 6e30fe7

27 files changed

+334
-461
lines changed

lib/internal/quic/core.js

+62-32
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ const {
1919
Promise,
2020
PromiseAll,
2121
PromiseReject,
22+
PromiseResolve,
2223
RegExp,
2324
Set,
2425
Symbol,
@@ -213,13 +214,13 @@ const kDestroy = Symbol('kDestroy');
213214
const kEndpointBound = Symbol('kEndpointBound');
214215
const kEndpointClose = Symbol('kEndpointClose');
215216
const kHandshake = Symbol('kHandshake');
217+
const kHandshakeComplete = Symbol('kHandshakeComplete');
216218
const kHandshakePost = Symbol('kHandshakePost');
217219
const kHeaders = Symbol('kHeaders');
218220
const kInternalState = Symbol('kInternalState');
219221
const kInternalClientState = Symbol('kInternalClientState');
220222
const kInternalServerState = Symbol('kInternalServerState');
221223
const kListen = Symbol('kListen');
222-
const kMakeStream = Symbol('kMakeStream');
223224
const kMaybeBind = Symbol('kMaybeBind');
224225
const kOnFileOpened = Symbol('kOnFileOpened');
225226
const kOnFileUnpipe = Symbol('kOnFileUnpipe');
@@ -1651,6 +1652,9 @@ class QuicSession extends EventEmitter {
16511652
destroyed: false,
16521653
earlyData: false,
16531654
handshakeComplete: false,
1655+
handshakeCompletePromise: undefined,
1656+
handshakeCompletePromiseResolve: undefined,
1657+
handshakeCompletePromiseReject: undefined,
16541658
idleTimeout: false,
16551659
maxPacketLength: NGTCP2_DEFAULT_MAX_PKTLEN,
16561660
servername: undefined,
@@ -1715,6 +1719,26 @@ class QuicSession extends EventEmitter {
17151719
});
17161720
}
17171721

1722+
[kHandshakeComplete]() {
1723+
const state = this[kInternalState];
1724+
if (state.handshakeComplete)
1725+
return PromiseResolve();
1726+
1727+
if (state.handshakeCompletePromise !== undefined)
1728+
return state.handshakeCompletePromise;
1729+
1730+
state.handshakeCompletePromise = new Promise((resolve, reject) => {
1731+
state.handshakeCompletePromiseResolve = resolve;
1732+
state.handshakeCompletePromiseReject = reject;
1733+
}).finally(() => {
1734+
state.handshakeCompletePromise = undefined;
1735+
state.handshakeCompletePromiseReject = undefined;
1736+
state.handshakeCompletePromiseResolve = undefined;
1737+
});
1738+
1739+
return state.handshakeCompletePromise;
1740+
}
1741+
17181742
// Sets the internal handle for the QuicSession instance. For
17191743
// server QuicSessions, this is called immediately as the
17201744
// handle is created before the QuicServerSession JS object.
@@ -1827,8 +1851,18 @@ class QuicSession extends EventEmitter {
18271851
state.verifyErrorReason = verifyErrorReason;
18281852
state.verifyErrorCode = verifyErrorCode;
18291853
state.earlyData = earlyData;
1830-
if (!this[kHandshakePost]())
1854+
1855+
if (!this[kHandshakePost]()) {
1856+
if (typeof state.handshakeCompletePromiseReject === 'function') {
1857+
// TODO(@jasnell): Proper error
1858+
state.handshakeCompletePromiseReject(
1859+
new ERR_OPERATION_FAILED('Handshake failed'));
1860+
}
18311861
return;
1862+
}
1863+
1864+
if (typeof state.handshakeCompletePromiseResolve === 'function')
1865+
state.handshakeCompletePromiseResolve();
18321866

18331867
process.nextTick(() => {
18341868
try {
@@ -1971,6 +2005,12 @@ class QuicSession extends EventEmitter {
19712005
} else if (typeof state.closePromiseResolve === 'function')
19722006
state.closePromiseResolve();
19732007

2008+
if (typeof state.handshakeCompletePromiseReject === 'function') {
2009+
// TODO(@jasnell): Proper error
2010+
state.handshakeCompletePromiseReject(
2011+
new ERR_OPERATION_FAILED('Handshake failed'));
2012+
}
2013+
19742014
process.nextTick(emit.bind(this, 'close'));
19752015
}
19762016

@@ -2113,8 +2153,7 @@ class QuicSession extends EventEmitter {
21132153
return this[kInternalState].statelessReset;
21142154
}
21152155

2116-
openStream(options) {
2117-
const state = this[kInternalState];
2156+
async openStream(options) {
21182157
if (this.destroyed) {
21192158
throw new ERR_INVALID_STATE(
21202159
`${this.constructor.name} is already destroyed`);
@@ -2123,51 +2162,42 @@ class QuicSession extends EventEmitter {
21232162
throw new ERR_INVALID_STATE(
21242163
`${this.constructor.name} is closing`);
21252164
}
2165+
21262166
const {
21272167
halfOpen, // Unidirectional or Bidirectional
21282168
highWaterMark,
21292169
defaultEncoding,
21302170
} = validateQuicStreamOptions(options);
21312171

2132-
const stream = new QuicStream({
2133-
highWaterMark,
2134-
defaultEncoding,
2135-
readable: !halfOpen
2136-
}, this);
2172+
await this[kHandshakeComplete]();
21372173

2138-
state.pendingStreams.add(stream);
2139-
2140-
// If early data is being used, we can create the internal QuicStream on the
2141-
// ready event, that is immediately after the internal QuicSession handle
2142-
// has been created. Otherwise, we have to wait until the secure event
2143-
// signaling the completion of the TLS handshake.
2144-
const makeStream = QuicSession[kMakeStream].bind(this, stream, halfOpen);
2145-
let deferred = false;
2146-
if (!this.handshakeComplete) {
2147-
deferred = true;
2148-
this.once('secure', makeStream);
2174+
if (this.destroyed) {
2175+
throw new ERR_INVALID_STATE(
2176+
`${this.constructor.name} is already destroyed`);
2177+
}
2178+
if (this.closing) {
2179+
throw new ERR_INVALID_STATE(
2180+
`${this.constructor.name} is closing`);
21492181
}
21502182

2151-
if (!deferred)
2152-
makeStream(stream, halfOpen);
2153-
2154-
return stream;
2155-
}
2156-
2157-
static [kMakeStream](stream, halfOpen) {
2158-
this[kInternalState].pendingStreams.delete(stream);
21592183
const handle =
21602184
halfOpen ?
21612185
_openUnidirectionalStream(this[kHandle]) :
21622186
_openBidirectionalStream(this[kHandle]);
21632187

2164-
if (handle === undefined) {
2165-
stream.destroy(new ERR_OPERATION_FAILED('Unable to create QuicStream'));
2166-
return;
2167-
}
2188+
if (handle === undefined)
2189+
throw new ERR_OPERATION_FAILED('Unable to create QuicStream');
2190+
2191+
const stream = new QuicStream({
2192+
highWaterMark,
2193+
defaultEncoding,
2194+
readable: !halfOpen
2195+
}, this);
21682196

21692197
stream[kSetHandle](handle);
21702198
this[kAddStream](stream.id, stream);
2199+
2200+
return stream;
21712201
}
21722202

21732203
get duration() {

test/parallel/test-quic-client-connect-multiple-parallel.js

+3-5
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,9 @@ async function connect(server, client) {
4444
for (let i = 0; i < kCount; i++) {
4545
const server = createQuicSocket({ server: options });
4646

47-
server.on('session', common.mustCall((session) => {
48-
session.on('secure', common.mustCall(() => {
49-
const stream = session.openStream({ halfOpen: true });
50-
stream.end('Hi!');
51-
}));
47+
server.on('session', common.mustCall(async (session) => {
48+
const stream = await session.openStream({ halfOpen: true });
49+
stream.end('Hi!');
5250
}));
5351

5452
server.on('close', common.mustCall());

test/parallel/test-quic-client-connect-multiple-sequential.js

+3-5
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,9 @@ async function connect(server, client) {
4545
for (let i = 0; i < kCount; i++) {
4646
const server = createQuicSocket({ server: options });
4747

48-
server.on('session', common.mustCall((session) => {
49-
session.on('secure', common.mustCall(() => {
50-
const stream = session.openStream({ halfOpen: true });
51-
stream.end('Hi!');
52-
}));
48+
server.on('session', common.mustCall(async (session) => {
49+
const stream = await session.openStream({ halfOpen: true });
50+
stream.end('Hi!');
5351
}));
5452

5553
server.on('close', common.mustCall());

test/parallel/test-quic-client-empty-preferred-address.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ const options = { key, cert, ca, alpn: 'zzz' };
4242
preferredAddressPolicy: 'accept',
4343
});
4444

45-
const stream = clientSession.openStream();
45+
const stream = await clientSession.openStream();
4646
stream.end('hello');
4747

4848
await Promise.all([

test/parallel/test-quic-client-server.js

+5-5
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ client.on('close', common.mustCall(onSocketClose.bind(client)));
129129
});
130130
}));
131131

132-
session.on('secure', common.mustCall((servername, alpn, cipher) => {
132+
session.on('secure', common.mustCall(async (servername, alpn, cipher) => {
133133
debug('QuicServerSession TLS Handshake Complete');
134134
debug(' Server name: %s', servername);
135135
debug(' ALPN: %s', alpn);
@@ -143,7 +143,7 @@ client.on('close', common.mustCall(onSocketClose.bind(client)));
143143
assert(session.authenticated);
144144
assert.strictEqual(session.authenticationError, undefined);
145145

146-
const uni = session.openStream({ halfOpen: true });
146+
const uni = await session.openStream({ halfOpen: true });
147147
assert(uni.unidirectional);
148148
assert(!uni.bidirectional);
149149
assert(uni.serverInitiated);
@@ -221,8 +221,8 @@ client.on('close', common.mustCall(onSocketClose.bind(client)));
221221
name: 'Error'
222222
};
223223
assert.throws(() => session.ping(), err);
224-
assert.throws(() => session.openStream(), err);
225224
assert.throws(() => session.updateKey(), err);
225+
assert.rejects(() => session.openStream(), err);
226226
}));
227227
}));
228228

@@ -264,7 +264,7 @@ client.on('close', common.mustCall(onSocketClose.bind(client)));
264264
debug(' Params: %s', params.toString('hex'));
265265
}, 2));
266266

267-
req.on('secure', common.mustCall((servername, alpn, cipher) => {
267+
req.on('secure', common.mustCall(async (servername, alpn, cipher) => {
268268
debug('QuicClientSession TLS Handshake Complete');
269269
debug(' Server name: %s', servername);
270270
debug(' ALPN: %s', alpn);
@@ -308,7 +308,7 @@ client.on('close', common.mustCall(onSocketClose.bind(client)));
308308
}
309309

310310
const file = fs.createReadStream(__filename);
311-
const stream = req.openStream();
311+
const stream = await req.openStream();
312312
file.pipe(stream);
313313
let data = '';
314314
stream.resume();

test/parallel/test-quic-errors-quicsession-openstream.js

+31-28
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ if (!common.hasQuic)
77
// Test errors thrown when openStream is called incorrectly
88
// or is not permitted
99

10+
const { once } = require('events');
1011
const { createHook } = require('async_hooks');
1112
const assert = require('assert');
1213
const { createQuicSocket } = require('net');
@@ -18,18 +19,12 @@ createHook({
1819
}
1920
}).enable();
2021

21-
const Countdown = require('../common/countdown');
2222
const { key, cert, ca } = require('../common/quic');
2323

2424
const options = { key, cert, ca, alpn: 'zzz', maxStreamsUni: 0 };
2525
const server = createQuicSocket({ server: options });
2626
const client = createQuicSocket({ client: options });
2727

28-
const countdown = new Countdown(1, () => {
29-
server.close();
30-
client.close();
31-
});
32-
3328
server.on('close', common.mustCall());
3429
client.on('close', common.mustCall());
3530

@@ -44,40 +39,48 @@ client.on('close', common.mustCall());
4439
port: server.endpoints[0].address.port
4540
});
4641

47-
['z', 1, {}, [], null, Infinity, 1n].forEach((i) => {
48-
assert.throws(
49-
() => req.openStream({ halfOpen: i }),
50-
{ code: 'ERR_INVALID_ARG_TYPE' }
51-
);
52-
});
42+
for (const halfOpen of ['z', 1, {}, [], null, Infinity, 1n]) {
43+
await assert.rejects(req.openStream({ halfOpen }), {
44+
code: 'ERR_INVALID_ARG_TYPE'
45+
});
46+
}
5347

54-
['', 1n, {}, [], false, 'zebra'].forEach((defaultEncoding) => {
55-
assert.throws(() => req.openStream({ defaultEncoding }), {
48+
for (const defaultEncoding of ['', 1n, {}, [], false, 'zebra']) {
49+
await assert.rejects(req.openStream({ defaultEncoding }), {
5650
code: 'ERR_INVALID_ARG_VALUE'
5751
});
58-
});
52+
}
5953

60-
[-1, Number.MAX_SAFE_INTEGER + 1].forEach((highWaterMark) => {
61-
assert.throws(() => req.openStream({ highWaterMark }), {
54+
for (const highWaterMark of [-1, Number.MAX_SAFE_INTEGER + 1]) {
55+
await assert.rejects(req.openStream({ highWaterMark }), {
6256
code: 'ERR_OUT_OF_RANGE'
6357
});
64-
});
58+
}
6559

66-
['a', 1n, [], {}, false].forEach((highWaterMark) => {
67-
assert.throws(() => req.openStream({ highWaterMark }), {
60+
for (const highWaterMark of ['a', 1n, [], {}, false]) {
61+
await assert.rejects(req.openStream({ highWaterMark }), {
6862
code: 'ERR_INVALID_ARG_TYPE'
6963
});
70-
});
64+
}
7165

7266
// Unidirectional streams are not allowed. openStream will succeeed
7367
// but the stream will be destroyed immediately. The underlying
7468
// QuicStream C++ handle will not be created.
75-
req.openStream({
76-
halfOpen: true,
77-
highWaterMark: 10,
78-
defaultEncoding: 'utf16le'
79-
}).on('error', common.expectsError({
80-
code: 'ERR_OPERATION_FAILED'
81-
})).on('error', common.mustCall(() => countdown.dec()));
69+
await assert.rejects(
70+
req.openStream({
71+
halfOpen: true,
72+
highWaterMark: 10,
73+
defaultEncoding: 'utf16le'
74+
}), {
75+
code: 'ERR_OPERATION_FAILED'
76+
});
77+
78+
server.close();
79+
client.close();
80+
81+
await Promise.all([
82+
once(server, 'close'),
83+
once(client, 'close')
84+
]);
8285

8386
})().then(common.mustCall());

0 commit comments

Comments
 (0)