Skip to content

Commit e392e61

Browse files
feat(pubsub): Enable server side flow control by default with the option to turn it off (#3154)
* feat: Enable server side flow control by default with the option to turn it off This change enables sending flow control settings automatically to the server. If ReceiveSettings.MaxOutstandingMessages > 0 or ReceiveSettings.MaxOutstandingBytes > 0, flow control will be enforced at the server side (in addition to the client side). This behavior is enabled by default and ReceiveSettings.UseLegacyFlowControl can be set for users who would like to opt-out of this feature in case they encounter issues with server side flow control. * Update subscription.go * use unexported field in pulloptions * use short variable declaration Co-authored-by: Alex Hong <9397363+hongalex@users.noreply.github.com>
1 parent e12046b commit e392e61

File tree

2 files changed

+15
-1
lines changed

2 files changed

+15
-1
lines changed

pubsub/iterator.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,13 @@ type messageIterator struct {
7676
func newMessageIterator(subc *vkit.SubscriberClient, subName string, maxExtensionPeriod *time.Duration, po *pullOptions) *messageIterator {
7777
var ps *pullStream
7878
if !po.synchronous {
79-
ps = newPullStream(context.Background(), subc.StreamingPull, subName, po.maxOutstandingMessages, po.maxOutstandingBytes)
79+
maxMessages := po.maxOutstandingMessages
80+
maxBytes := po.maxOutstandingBytes
81+
if po.useLegacyFlowControl {
82+
maxMessages = 0
83+
maxBytes = 0
84+
}
85+
ps = newPullStream(context.Background(), subc.StreamingPull, subName, maxMessages, maxBytes)
8086
}
8187
// The period will update each tick based on the distribution of acks. We'll start by arbitrarily sending
8288
// the first keepAlive halfway towards the minimum ack deadline.

pubsub/subscription.go

+8
Original file line numberDiff line numberDiff line change
@@ -489,6 +489,12 @@ type ReceiveSettings struct {
489489
// for unprocessed messages.
490490
MaxOutstandingBytes int
491491

492+
// UseLegacyFlowControl disables enforcing flow control settings at the Cloud
493+
// PubSub server and the less accurate method of only enforcing flow control
494+
// at the client side is used.
495+
// The default is false.
496+
UseLegacyFlowControl bool
497+
492498
// NumGoroutines is the number of goroutines that each datastructure along
493499
// the Receive path will spawn. Adjusting this value adjusts concurrency
494500
// along the receive path.
@@ -820,6 +826,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes
820826
synchronous: s.ReceiveSettings.Synchronous,
821827
maxOutstandingMessages: maxCount,
822828
maxOutstandingBytes: maxBytes,
829+
useLegacyFlowControl: s.ReceiveSettings.UseLegacyFlowControl,
823830
}
824831
fc := newFlowController(maxCount, maxBytes)
825832

@@ -950,4 +957,5 @@ type pullOptions struct {
950957
synchronous bool
951958
maxOutstandingMessages int
952959
maxOutstandingBytes int
960+
useLegacyFlowControl bool
953961
}

0 commit comments

Comments
 (0)