@@ -162,6 +162,27 @@ class DataQueueImpl final : public DataQueue,
162
162
" entries" , entries_, " std::vector<std::unique_ptr<Entry>>" );
163
163
}
164
164
165
+ void addBackpressureListener (BackpressureListener* listener) override {
166
+ if (idempotent_) return ;
167
+ DCHECK_NOT_NULL (listener);
168
+ backpressure_listeners_.insert (listener);
169
+ }
170
+
171
+ void removeBackpressureListener (BackpressureListener* listener) override {
172
+ if (idempotent_) return ;
173
+ DCHECK_NOT_NULL (listener);
174
+ backpressure_listeners_.erase (listener);
175
+ }
176
+
177
+ void NotifyBackpressure (size_t amount) {
178
+ if (idempotent_) return ;
179
+ for (auto & listener : backpressure_listeners_) listener->EntryRead (amount);
180
+ }
181
+
182
+ bool HasBackpressureListeners () const noexcept {
183
+ return !backpressure_listeners_.empty ();
184
+ }
185
+
165
186
std::shared_ptr<Reader> get_reader () override ;
166
187
SET_MEMORY_INFO_NAME (DataQueue)
167
188
SET_SELF_SIZE(DataQueueImpl)
@@ -173,6 +194,8 @@ class DataQueueImpl final : public DataQueue,
173
194
std::optional<uint64_t > capped_size_ = std::nullopt;
174
195
bool locked_to_reader_ = false ;
175
196
197
+ std::unordered_set<BackpressureListener*> backpressure_listeners_;
198
+
176
199
friend class DataQueue ;
177
200
friend class IdempotentDataQueueReader ;
178
201
friend class NonIdempotentDataQueueReader ;
@@ -433,6 +456,17 @@ class NonIdempotentDataQueueReader final
433
456
return ;
434
457
}
435
458
459
+ // If there is a backpressure listener, lets report on how much data
460
+ // was actually read.
461
+ if (data_queue_->HasBackpressureListeners ()) {
462
+ // How much did we actually read?
463
+ size_t read = 0 ;
464
+ for (uint64_t n = 0 ; n < count; n++) {
465
+ read += vecs[n].len ;
466
+ }
467
+ data_queue_->NotifyBackpressure (read );
468
+ }
469
+
436
470
// Now that we have updated this readers state, we can forward
437
471
// everything on to the outer next.
438
472
std::move (next)(status, vecs, count, std::move (done));
0 commit comments