Skip to content

Commit 3fb7fc9

Browse files
jasnellRafaelGSS
authored andcommitted
quic: further implementation details
PR-URL: #48244 Reviewed-By: Yagiz Nizipli <yagiz.nizipli@sentry.io> Reviewed-By: Stephen Belanger <admin@stephenbelanger.com>
1 parent 297cb6f commit 3fb7fc9

File tree

7 files changed

+1229
-38
lines changed

7 files changed

+1229
-38
lines changed

src/dataqueue/queue.cc

+34
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,27 @@ class DataQueueImpl final : public DataQueue,
162162
"entries", entries_, "std::vector<std::unique_ptr<Entry>>");
163163
}
164164

165+
void addBackpressureListener(BackpressureListener* listener) override {
166+
if (idempotent_) return;
167+
DCHECK_NOT_NULL(listener);
168+
backpressure_listeners_.insert(listener);
169+
}
170+
171+
void removeBackpressureListener(BackpressureListener* listener) override {
172+
if (idempotent_) return;
173+
DCHECK_NOT_NULL(listener);
174+
backpressure_listeners_.erase(listener);
175+
}
176+
177+
void NotifyBackpressure(size_t amount) {
178+
if (idempotent_) return;
179+
for (auto& listener : backpressure_listeners_) listener->EntryRead(amount);
180+
}
181+
182+
bool HasBackpressureListeners() const noexcept {
183+
return !backpressure_listeners_.empty();
184+
}
185+
165186
std::shared_ptr<Reader> get_reader() override;
166187
SET_MEMORY_INFO_NAME(DataQueue)
167188
SET_SELF_SIZE(DataQueueImpl)
@@ -173,6 +194,8 @@ class DataQueueImpl final : public DataQueue,
173194
std::optional<uint64_t> capped_size_ = std::nullopt;
174195
bool locked_to_reader_ = false;
175196

197+
std::unordered_set<BackpressureListener*> backpressure_listeners_;
198+
176199
friend class DataQueue;
177200
friend class IdempotentDataQueueReader;
178201
friend class NonIdempotentDataQueueReader;
@@ -433,6 +456,17 @@ class NonIdempotentDataQueueReader final
433456
return;
434457
}
435458

459+
// If there is a backpressure listener, lets report on how much data
460+
// was actually read.
461+
if (data_queue_->HasBackpressureListeners()) {
462+
// How much did we actually read?
463+
size_t read = 0;
464+
for (uint64_t n = 0; n < count; n++) {
465+
read += vecs[n].len;
466+
}
467+
data_queue_->NotifyBackpressure(read);
468+
}
469+
436470
// Now that we have updated this readers state, we can forward
437471
// everything on to the outer next.
438472
std::move(next)(status, vecs, count, std::move(done));

src/dataqueue/queue.h

+12
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,14 @@ class DataQueue : public MemoryRetainer {
141141
using Done = bob::Done;
142142
};
143143

144+
// A BackpressureListener can be used to receive notifications
145+
// when a non-idempotent DataQueue releases entries as they
146+
// are consumed.
147+
class BackpressureListener {
148+
public:
149+
virtual void EntryRead(size_t amount) = 0;
150+
};
151+
144152
// A DataQueue::Entry represents a logical chunk of data in the queue.
145153
// The entry may or may not represent memory-resident data. It may
146154
// or may not be consumable more than once.
@@ -285,6 +293,10 @@ class DataQueue : public MemoryRetainer {
285293
// been set, maybeCapRemaining() will return std::nullopt.
286294
virtual std::optional<uint64_t> maybeCapRemaining() const = 0;
287295

296+
// BackpressureListeners only work on non-idempotent DataQueues.
297+
virtual void addBackpressureListener(BackpressureListener* listener) = 0;
298+
virtual void removeBackpressureListener(BackpressureListener* listener) = 0;
299+
288300
static void Initialize(Environment* env, v8::Local<v8::Object> target);
289301
static void RegisterExternalReferences(ExternalReferenceRegistry* registry);
290302
};

src/quic/application.cc

+5-6
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#include "node_bob.h"
12
#include "uv.h"
23
#if HAVE_OPENSSL && NODE_OPENSSL_HAS_QUIC
34

@@ -79,7 +80,7 @@ void Session::Application::AcknowledgeStreamData(Stream* stream,
7980

8081
void Session::Application::BlockStream(int64_t id) {
8182
auto stream = session().FindStream(id);
82-
if (stream) stream->Blocked();
83+
if (stream) stream->EmitBlocked();
8384
}
8485

8586
bool Session::Application::CanAddHeader(size_t current_count,
@@ -233,7 +234,7 @@ void Session::Application::SendPendingData() {
233234
// and no more outbound data can be sent.
234235
CHECK_LE(ndatalen, 0);
235236
auto stream = session_->FindStream(stream_data.id);
236-
if (stream) stream->End();
237+
if (stream) stream->EndWritable();
237238
continue;
238239
}
239240
case NGTCP2_ERR_WRITE_MORE: {
@@ -360,10 +361,8 @@ class DefaultApplication final : public Session::Application {
360361
stream_data->data,
361362
arraysize(stream_data->data),
362363
kMaxVectorCount);
363-
switch (ret) {
364-
case bob::Status::STATUS_EOS:
365-
stream_data->fin = 1;
366-
break;
364+
if (ret == bob::Status::STATUS_EOS) {
365+
stream_data->fin = 1;
367366
}
368367
} else {
369368
stream_data->fin = 1;

src/quic/bindingdata.cc

+6-2
Original file line numberDiff line numberDiff line change
@@ -203,8 +203,12 @@ CallbackScopeBase::CallbackScopeBase(Environment* env)
203203
: env(env), context_scope(env->context()), try_catch(env->isolate()) {}
204204

205205
CallbackScopeBase::~CallbackScopeBase() {
206-
if (try_catch.HasCaught() && !try_catch.HasTerminated()) {
207-
errors::TriggerUncaughtException(env->isolate(), try_catch);
206+
if (try_catch.HasCaught()) {
207+
if (!try_catch.HasTerminated() && env->can_call_into_js()) {
208+
errors::TriggerUncaughtException(env->isolate(), try_catch);
209+
} else {
210+
try_catch.ReThrow();
211+
}
208212
}
209213
}
210214

src/quic/bindingdata.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,6 @@ constexpr size_t kMaxVectorCount = 16;
103103
V(session_version_negotiation, SessionVersionNegotiation) \
104104
V(session_path_validation, SessionPathValidation) \
105105
V(stream_close, StreamClose) \
106-
V(stream_error, StreamError) \
107106
V(stream_created, StreamCreated) \
108107
V(stream_reset, StreamReset) \
109108
V(stream_headers, StreamHeaders) \
@@ -304,6 +303,8 @@ struct CallbackScopeBase {
304303
~CallbackScopeBase();
305304
};
306305

306+
// Maintains a strong reference to BaseObject type ptr to keep it alive during
307+
// a MakeCallback during which it might be destroyed.
307308
template <typename T>
308309
struct CallbackScope final : public CallbackScopeBase {
309310
BaseObjectPtr<T> ref;

0 commit comments

Comments
 (0)