Skip to content

Commit b06fe33

Browse files
committed
quic: use async _construct for QuicStream
PR-URL: #34351 Reviewed-By: Robert Nagy <ronagy@icloud.com> Reviewed-By: Anna Henningsen <anna@addaleax.net>
1 parent 8bd61d4 commit b06fe33

File tree

5 files changed

+89
-37
lines changed

5 files changed

+89
-37
lines changed

doc/api/quic.md

+3-2
Original file line numberDiff line numberDiff line change
@@ -1047,8 +1047,9 @@ added: REPLACEME
10471047

10481048
Returns a `Promise` that resolves a new `QuicStream`.
10491049

1050-
The `Promise` will be rejected if the `QuicSession` has been destroyed or is in
1051-
the process of a graceful shutdown.
1050+
The `Promise` will be rejected if the `QuicSession` has been destroyed, is in
1051+
the process of a graceful shutdown, or the `QuicSession` is otherwise blocked
1052+
from opening a new stream.
10521053

10531054
#### `quicsession.ping()`
10541055
<!--YAML

lib/internal/quic/core.js

+83-34
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ const {
4848
QLogStream,
4949
} = require('internal/quic/util');
5050
const assert = require('internal/assert');
51-
const EventEmitter = require('events');
51+
const { EventEmitter, once } = require('events');
5252
const fs = require('fs');
5353
const fsPromisesInternal = require('internal/fs/promises');
5454
const { Duplex } = require('stream');
@@ -226,6 +226,7 @@ const kMaybeBind = Symbol('kMaybeBind');
226226
const kOnFileOpened = Symbol('kOnFileOpened');
227227
const kOnFileUnpipe = Symbol('kOnFileUnpipe');
228228
const kOnPipedFileHandleRead = Symbol('kOnPipedFileHandleRead');
229+
const kReady = Symbol('kReady');
229230
const kRemoveSession = Symbol('kRemove');
230231
const kRemoveStream = Symbol('kRemoveStream');
231232
const kServerBusy = Symbol('kServerBusy');
@@ -2167,30 +2168,15 @@ class QuicSession extends EventEmitter {
21672168
defaultEncoding,
21682169
} = validateQuicStreamOptions(options);
21692170

2170-
await this[kHandshakeComplete]();
2171-
2172-
if (this.destroyed) {
2173-
throw new ERR_INVALID_STATE(
2174-
`${this.constructor.name} is already destroyed`);
2175-
}
2176-
if (this.closing) {
2177-
throw new ERR_INVALID_STATE(
2178-
`${this.constructor.name} is closing`);
2179-
}
2180-
2181-
const handle =
2182-
halfOpen ?
2183-
_openUnidirectionalStream(this[kHandle]) :
2184-
_openBidirectionalStream(this[kHandle]);
2185-
2186-
if (handle === undefined)
2187-
throw new ERR_OPERATION_FAILED('Unable to create QuicStream');
2188-
2189-
return new QuicStream({
2171+
const stream = new QuicStream({
21902172
highWaterMark,
21912173
defaultEncoding,
21922174
readable: !halfOpen
2193-
}, this, handle);
2175+
}, this);
2176+
2177+
await once(stream, kReady);
2178+
2179+
return stream;
21942180
}
21952181

21962182
get duration() {
@@ -2532,6 +2518,7 @@ function streamOnPause() {
25322518
this[kHandle].readStop();
25332519
}
25342520
class QuicStream extends Duplex {
2521+
#count = 0;
25352522
[kInternalState] = {
25362523
closed: false,
25372524
closePromise: undefined,
@@ -2547,6 +2534,7 @@ class QuicStream extends Duplex {
25472534
dataRateHistogram: undefined,
25482535
dataSizeHistogram: undefined,
25492536
dataAckHistogram: undefined,
2537+
ready: false,
25502538
sharedState: undefined,
25512539
stats: undefined,
25522540
};
@@ -2578,7 +2566,45 @@ class QuicStream extends Duplex {
25782566
this._readableState.readingMore = true;
25792567
this.on('pause', streamOnPause);
25802568

2581-
this[kSetHandle](handle);
2569+
if (handle !== undefined)
2570+
this[kSetHandle](handle);
2571+
}
2572+
2573+
async _construct(callback) {
2574+
try {
2575+
if (this[kInternalState].ready)
2576+
return callback();
2577+
2578+
// Handle is already initialized
2579+
const unidirectional = !this.readable;
2580+
2581+
await this.session[kHandshakeComplete]();
2582+
2583+
if (this.destroyed) {
2584+
throw new ERR_INVALID_STATE('QuicStream was destroyed');
2585+
}
2586+
if (this.session.destroyed) {
2587+
throw new ERR_INVALID_STATE(
2588+
`${this.session.constructor.name} was destroyed`);
2589+
}
2590+
if (this.session.closing) {
2591+
throw new ERR_INVALID_STATE(
2592+
`${this.session.constructor.name} is closing`);
2593+
}
2594+
2595+
const handle =
2596+
unidirectional ?
2597+
_openUnidirectionalStream(this.session[kHandle]) :
2598+
_openBidirectionalStream(this.session[kHandle]);
2599+
2600+
if (handle === undefined)
2601+
throw new ERR_OPERATION_FAILED('Unable to create QuicStream');
2602+
2603+
this[kSetHandle](handle);
2604+
callback();
2605+
} catch (error) {
2606+
callback(error);
2607+
}
25822608
}
25832609

25842610
// Set handle is called once the QuicSession has been able
@@ -2589,6 +2615,8 @@ class QuicStream extends Duplex {
25892615
// written will be buffered until kSetHandle is called.
25902616
[kSetHandle](handle) {
25912617
const state = this[kInternalState];
2618+
const current = this[kHandle];
2619+
this[kHandle] = handle;
25922620
if (handle !== undefined) {
25932621
handle.onread = onStreamRead;
25942622
handle[owner_symbol] = this;
@@ -2599,11 +2627,13 @@ class QuicStream extends Duplex {
25992627
state.dataAckHistogram = new Histogram(handle.ack);
26002628
state.sharedState = new QuicStreamSharedState(handle.state);
26012629
state.session[kAddStream](state.id, this);
2630+
state.ready = true;
2631+
this.emit(kReady);
26022632
} else {
2603-
if (this[kHandle] !== undefined) {
2604-
this[kHandle].stats[IDX_QUIC_STREAM_STATS_DESTROYED_AT] =
2633+
if (current !== undefined) {
2634+
current.stats[IDX_QUIC_STREAM_STATS_DESTROYED_AT] =
26052635
process.hrtime.bigint();
2606-
state.stats = new BigInt64Array(this[kHandle].stats);
2636+
state.stats = new BigInt64Array(current.stats);
26072637
}
26082638
state.sharedState = undefined;
26092639
if (state.dataRateHistogram)
@@ -2613,7 +2643,6 @@ class QuicStream extends Duplex {
26132643
if (state.dataAckHistogram)
26142644
state.dataAckHistogram[kDestroyHistogram]();
26152645
}
2616-
this[kHandle] = handle;
26172646
}
26182647

26192648
[kStreamReset](code) {
@@ -2643,6 +2672,8 @@ class QuicStream extends Duplex {
26432672
this.end();
26442673
}
26452674

2675+
// TODO(@jasnell): Investigate later if a Promise version
2676+
// of finished() can work here instead.
26462677
return promise;
26472678
}
26482679

@@ -2663,6 +2694,7 @@ class QuicStream extends Duplex {
26632694
else if (typeof state.closePromiseResolve === 'function')
26642695
state.closePromiseResolve();
26652696

2697+
// TODO(@jasnell): Investigate how we can eliminate the nextTick here
26662698
process.nextTick(() => callback(error));
26672699
}
26682700

@@ -2754,7 +2786,7 @@ class QuicStream extends Duplex {
27542786
}
27552787

27562788
[kWriteGeneric](writev, data, encoding, cb) {
2757-
if (this.destroyed)
2789+
if (this.destroyed || this.detached)
27582790
return; // TODO(addaleax): Can this happen?
27592791

27602792
this[kUpdateTimer]();
@@ -2829,6 +2861,8 @@ class QuicStream extends Duplex {
28292861
}
28302862

28312863
sendFile(path, options = {}) {
2864+
if (this.detached)
2865+
throw new ERR_INVALID_STATE('Unable to send file');
28322866
fs.open(path, 'r', QuicStream[kOnFileOpened].bind(this, options));
28332867
}
28342868

@@ -2856,6 +2890,9 @@ class QuicStream extends Duplex {
28562890
if (this.destroyed || this[kInternalState].closed)
28572891
return;
28582892

2893+
if (this.detached)
2894+
throw new ERR_INVALID_STATE('Unable to send file descriptor');
2895+
28592896
validateInteger(offset, 'options.offset', /* min */ -1);
28602897
validateInteger(length, 'options.length', /* min */ -1);
28612898

@@ -2947,6 +2984,12 @@ class QuicStream extends Duplex {
29472984
if (this.destroyed)
29482985
throw new ERR_INVALID_STATE('QuicStream is already destroyed');
29492986

2987+
if (this.detached) {
2988+
throw new ERR_INVALID_STATE(
2989+
'Push stream could not be opened on this QuicSession. ' +
2990+
'Push is either disabled or currently blocked.');
2991+
}
2992+
29502993
const state = this[kInternalState];
29512994
const {
29522995
highWaterMark = state.highWaterMark,
@@ -2995,9 +3038,11 @@ class QuicStream extends Duplex {
29953038
}
29963039

29973040
submitInformationalHeaders(headers = {}) {
2998-
// TODO(@jasnell): Likely better to throw here instead of return false
29993041
if (this.destroyed)
3000-
return false;
3042+
throw new ERR_INVALID_STATE('QuicStream is already destroyed');
3043+
3044+
if (this.detached)
3045+
throw new ERR_INVALID_STATE('Unable to submit headers');
30013046

30023047
validateObject(headers, 'headers');
30033048

@@ -3025,9 +3070,11 @@ class QuicStream extends Duplex {
30253070
}
30263071

30273072
submitInitialHeaders(headers = {}, options = {}) {
3028-
// TODO(@jasnell): Likely better to throw here instead of return false
30293073
if (this.destroyed)
3030-
return false;
3074+
throw new ERR_INVALID_STATE('QuicStream is already destroyed');
3075+
3076+
if (this.detached)
3077+
throw new ERR_INVALID_STATE('Unable to submit headers');
30313078

30323079
const { terminal } = { ...options };
30333080

@@ -3062,9 +3109,11 @@ class QuicStream extends Duplex {
30623109
}
30633110

30643111
submitTrailingHeaders(headers = {}) {
3065-
// TODO(@jasnell): Likely better to throw here instead of return false
30663112
if (this.destroyed)
3067-
return false;
3113+
throw new ERR_INVALID_STATE('QuicStream is already destroyed');
3114+
3115+
if (this.detached)
3116+
throw new ERR_INVALID_STATE('Unable to submit headers');
30683117

30693118
validateObject(headers, 'headers');
30703119

src/quic/node_quic_buffer.cc

+1
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ int QuicBuffer::DoPull(
115115
size_t len = 0;
116116
size_t numbytes = 0;
117117
int status = bob::Status::STATUS_CONTINUE;
118+
118119
// There's no data to read.
119120
if (!remaining() || head_ == nullptr) {
120121
status = is_ended() ?

src/quic/node_quic_session.cc

+1-1
Original file line numberDiff line numberDiff line change
@@ -1998,7 +1998,7 @@ bool QuicSession::ReceiveStreamData(
19981998
const uint8_t* data,
19991999
size_t datalen,
20002000
uint64_t offset) {
2001-
auto leave = OnScopeLeave([=]() {
2001+
auto leave = OnScopeLeave([&]() {
20022002
// Unconditionally extend the flow control window for the entire
20032003
// session but not for the individual Stream.
20042004
ExtendOffset(datalen);

src/stream_base.cc

+1
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ int StreamBase::UseUserBuffer(const FunctionCallbackInfo<Value>& args) {
6868
int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
6969
CHECK(args[0]->IsObject());
7070
Local<Object> req_wrap_obj = args[0].As<Object>();
71+
7172
return Shutdown(req_wrap_obj);
7273
}
7374

0 commit comments

Comments
 (0)