Skip to content

Commit b1750a4

Browse files
committed
quic: continued refactoring for quic_stream/quic_session
PR-URL: #34160 Reviewed-By: Anna Henningsen <anna@addaleax.net>
1 parent 56dbe46 commit b1750a4

8 files changed

+63
-51
lines changed

src/quic/node_quic_default_application.cc

+4-1
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,10 @@ bool DefaultApplication::ReceiveStreamData(
9393
if (!stream) {
9494
// Shutdown the stream explicitly if the session is being closed.
9595
if (session()->is_gracefully_closing()) {
96-
session()->ResetStream(stream_id, NGTCP2_ERR_CLOSING);
96+
ngtcp2_conn_shutdown_stream(
97+
session()->connection(),
98+
stream_id,
99+
NGTCP2_ERR_CLOSING);
97100
return true;
98101
}
99102

src/quic/node_quic_http3_application.cc

+4-1
Original file line numberDiff line numberDiff line change
@@ -702,7 +702,10 @@ void Http3Application::PushStream(
702702
void Http3Application::SendStopSending(
703703
int64_t stream_id,
704704
uint64_t app_error_code) {
705-
session()->ResetStream(stream_id, app_error_code);
705+
ngtcp2_conn_shutdown_stream_read(
706+
session()->connection(),
707+
stream_id,
708+
app_error_code);
706709
}
707710

708711
void Http3Application::EndStream(int64_t stream_id) {

src/quic/node_quic_session.cc

-5
Original file line numberDiff line numberDiff line change
@@ -2328,11 +2328,6 @@ void QuicSession::ResumeStream(int64_t stream_id) {
23282328
application()->ResumeStream(stream_id);
23292329
}
23302330

2331-
void QuicSession::ResetStream(int64_t stream_id, uint64_t code) {
2332-
SendSessionScope scope(this);
2333-
CHECK_EQ(ngtcp2_conn_shutdown_stream(connection(), stream_id, code), 0);
2334-
}
2335-
23362331
// Silent Close must start with the JavaScript side, which must
23372332
// clean up state, abort any still existing QuicSessions, then
23382333
// destroy the handle when done. The most important characteristic

src/quic/node_quic_session.h

-31
Original file line numberDiff line numberDiff line change
@@ -957,37 +957,6 @@ class QuicSession : public AsyncWrap,
957957

958958
const StreamsMap& streams() const { return streams_; }
959959

960-
// ResetStream will cause ngtcp2 to queue a
961-
// RESET_STREAM and STOP_SENDING frame, as appropriate,
962-
// for the given stream_id. For a locally-initiated
963-
// unidirectional stream, only a RESET_STREAM frame
964-
// will be scheduled and the stream will be immediately
965-
// closed. For a bi-directional stream, a STOP_SENDING
966-
// frame will be sent.
967-
//
968-
// It is important to note that the QuicStream is
969-
// not destroyed immediately following ShutdownStream.
970-
// The sending QuicSession will not close the stream
971-
// until the RESET_STREAM is acknowledged.
972-
//
973-
// Once the RESET_STREAM is sent, the QuicSession
974-
// should not send any new frames for the stream,
975-
// and all inbound stream frames should be discarded.
976-
// Once ngtcp2 receives the appropriate notification
977-
// that the RESET_STREAM has been acknowledged, the
978-
// stream will be closed.
979-
//
980-
// Once the stream has been closed, it will be
981-
// destroyed and memory will be freed. User code
982-
// can request that a stream be immediately and
983-
// abruptly destroyed without calling ShutdownStream.
984-
// Likewise, an idle timeout may cause the stream
985-
// to be silently destroyed without calling
986-
// ShutdownStream.
987-
void ResetStream(
988-
int64_t stream_id,
989-
uint64_t error_code = NGTCP2_APP_NOERROR);
990-
991960
void ResumeStream(int64_t stream_id);
992961

993962
// Submits informational headers to the QUIC Application

src/quic/node_quic_stream-inl.h

+21-7
Original file line numberDiff line numberDiff line change
@@ -132,19 +132,33 @@ void QuicStream::Commit(size_t amount) {
132132
streambuf_.Seek(amount);
133133
}
134134

135+
// ResetStream will cause ngtcp2 to queue a RESET_STREAM and STOP_SENDING
136+
// frame, as appropriate, for the given stream_id. For a locally-initiated
137+
// unidirectional stream, only a RESET_STREAM frame will be scheduled and
138+
// the stream will be immediately closed. For a bidirectional stream, a
139+
// STOP_SENDING frame will be sent.
135140
void QuicStream::ResetStream(uint64_t app_error_code) {
136-
// On calling shutdown, the stream will no longer be
137-
// readable or writable, all any pending data in the
138-
// streambuf_ will be canceled, and all data pending
139-
// to be acknowledged at the ngtcp2 level will be
140-
// abandoned.
141-
BaseObjectPtr<QuicSession> ptr(session_);
141+
QuicSession::SendSessionScope send_scope(session());
142+
ngtcp2_conn_shutdown_stream(
143+
session()->connection(),
144+
stream_id_,
145+
app_error_code);
142146
set_flag(QUICSTREAM_FLAG_READ_CLOSED);
143-
session_->ResetStream(stream_id_, app_error_code);
144147
streambuf_.Cancel();
145148
streambuf_.End();
146149
}
147150

151+
// StopSending will cause ngtcp2 to queue a STOP_SENDING frame if the
152+
// stream is still inbound readable.
153+
void QuicStream::StopSending(uint64_t app_error_code) {
154+
QuicSession::SendSessionScope send_scope(session());
155+
ngtcp2_conn_shutdown_stream_read(
156+
session()->connection(),
157+
stream_id_,
158+
app_error_code);
159+
set_flag(QUICSTREAM_FLAG_READ_CLOSED);
160+
}
161+
148162
void QuicStream::Schedule(Queue* queue) {
149163
if (!stream_queue_.IsEmpty()) // Already scheduled?
150164
return;

src/quic/node_quic_stream.cc

+28-3
Original file line numberDiff line numberDiff line change
@@ -118,21 +118,31 @@ std::string QuicStream::diagnostic_name() const {
118118
", " + session_->diagnostic_name() + ")";
119119
}
120120

121-
void QuicStream::Destroy() {
121+
void QuicStream::Destroy(QuicError* error) {
122122
if (is_destroyed())
123123
return;
124+
125+
QuicSession::SendSessionScope send_scope(session());
126+
124127
set_flag(QUICSTREAM_FLAG_DESTROYED);
125128
set_flag(QUICSTREAM_FLAG_READ_CLOSED);
126-
streambuf_.End();
129+
130+
// In case this stream is scheduled for sending, remove it
131+
// from the schedule queue
132+
Unschedule();
127133

128134
// If there is data currently buffered in the streambuf_,
129135
// then cancel will call out to invoke an arbitrary
130136
// JavaScript callback (the on write callback). Within
131137
// that callback, however, the QuicStream will no longer
132138
// be usable to send or receive data.
139+
streambuf_.End();
133140
streambuf_.Cancel();
134141
CHECK_EQ(streambuf_.length(), 0);
135142

143+
// Attempt to send a shutdown signal to the remote peer
144+
ResetStream(error != nullptr ? error->code : NGTCP2_NO_ERROR);
145+
136146
// The QuicSession maintains a map of std::unique_ptrs to
137147
// QuicStream instances. Removing this here will cause
138148
// this QuicStream object to be deconstructed, so the
@@ -411,9 +421,11 @@ void OpenBidirectionalStream(const FunctionCallbackInfo<Value>& args) {
411421
}
412422

413423
void QuicStreamDestroy(const FunctionCallbackInfo<Value>& args) {
424+
Environment* env = Environment::GetCurrent(args);
414425
QuicStream* stream;
415426
ASSIGN_OR_RETURN_UNWRAP(&stream, args.Holder());
416-
stream->Destroy();
427+
QuicError error(env, args[0], args[1], QUIC_ERROR_APPLICATION);
428+
stream->Destroy(&error);
417429
}
418430

419431
void QuicStreamReset(const FunctionCallbackInfo<Value>& args) {
@@ -428,6 +440,18 @@ void QuicStreamReset(const FunctionCallbackInfo<Value>& args) {
428440
error.code : static_cast<uint64_t>(NGTCP2_NO_ERROR));
429441
}
430442

443+
void QuicStreamStopSending(const FunctionCallbackInfo<Value>& args) {
444+
Environment* env = Environment::GetCurrent(args);
445+
QuicStream* stream;
446+
ASSIGN_OR_RETURN_UNWRAP(&stream, args.Holder());
447+
448+
QuicError error(env, args[0], args[1], QUIC_ERROR_APPLICATION);
449+
450+
stream->StopSending(
451+
error.family == QUIC_ERROR_APPLICATION ?
452+
error.code : static_cast<uint64_t>(NGTCP2_NO_ERROR));
453+
}
454+
431455
// Requests transmission of a block of informational headers. Not all
432456
// QUIC Applications will support headers. If headers are not supported,
433457
// This will set the return value to false, otherwise the return value
@@ -494,6 +518,7 @@ void QuicStream::Initialize(
494518
streamt->Set(env->owner_symbol(), Null(env->isolate()));
495519
env->SetProtoMethod(stream, "destroy", QuicStreamDestroy);
496520
env->SetProtoMethod(stream, "resetStream", QuicStreamReset);
521+
env->SetProtoMethod(stream, "stopSending", QuicStreamStopSending);
497522
env->SetProtoMethod(stream, "id", QuicStreamGetID);
498523
env->SetProtoMethod(stream, "submitInformation", QuicStreamSubmitInformation);
499524
env->SetProtoMethod(stream, "submitHeaders", QuicStreamSubmitHeaders);

src/quic/node_quic_stream.h

+4-2
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ class QuicStream : public AsyncWrap,
275275
void Acknowledge(uint64_t offset, size_t datalen);
276276

277277
// Destroy the QuicStream and render it no longer usable.
278-
void Destroy();
278+
void Destroy(QuicError* error = nullptr);
279279

280280
// Buffers chunks of data to be written to the QUIC connection.
281281
int DoWrite(
@@ -312,7 +312,9 @@ class QuicStream : public AsyncWrap,
312312

313313
// Resets the QUIC stream, sending a signal to the peer that
314314
// no additional data will be transmitted for this stream.
315-
inline void ResetStream(uint64_t app_error_code = 0);
315+
inline void ResetStream(uint64_t app_error_code = NGTCP2_NO_ERROR);
316+
317+
inline void StopSending(uint64_t app_error_code = NGTCP2_NO_ERROR);
316318

317319
// Submits informational headers. Returns false if headers are not
318320
// supported on the underlying QuicApplication.

test/parallel/test-quic-statelessreset.js

+2-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ server.on('session', common.mustCall((session) => {
4444

4545
server.on('close', common.mustCall(() => {
4646
// Verify stats recording
47-
assert.strictEqual(server.statelessResetCount, 1n);
47+
console.log(server.statelessResetCount);
48+
assert(server.statelessResetCount >= 1n);
4849
}));
4950

5051
server.on('ready', common.mustCall(() => {

0 commit comments

Comments
 (0)