Skip to content

Commit 0da3578

Browse files
authored
fix(pubsublite): rebatch messages upon new publish stream (#3694)
Merge in-flight message batches when a new publish stream is connected.
1 parent 613ced7 commit 0da3578

File tree

3 files changed

+160
-45
lines changed

3 files changed

+160
-45
lines changed

pubsublite/internal/wire/publish_batcher.go

+23-4
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ type messageHolder struct {
4141
// MessagePublishRequest.
4242
type publishBatch struct {
4343
msgHolders []*messageHolder
44+
totalSize int
4445
}
4546

4647
func (b *publishBatch) ToPublishRequest() *pb.PublishRequest {
@@ -93,7 +94,11 @@ func newPublishMessageBatcher(settings *PublishSettings, partition int, onNewBat
9394
// singlePartitionPublisher.onNewBatch() receives the new batch from the
9495
// Bundler, which calls publishMessageBatcher.AddBatch(). Only the
9596
// publisher's mutex is required.
96-
onNewBatch(&publishBatch{msgHolders: msgs})
97+
batch := &publishBatch{msgHolders: msgs}
98+
for _, msg := range batch.msgHolders {
99+
batch.totalSize += msg.size
100+
}
101+
onNewBatch(batch)
97102
})
98103
msgBundler.DelayThreshold = settings.DelayThreshold
99104
msgBundler.BundleCountThreshold = settings.CountThreshold
@@ -164,10 +169,24 @@ func (b *publishMessageBatcher) OnPermanentError(err error) {
164169

165170
func (b *publishMessageBatcher) InFlightBatches() []*publishBatch {
166171
var batches []*publishBatch
167-
for elem := b.publishQueue.Front(); elem != nil; elem = elem.Next() {
168-
if batch, ok := elem.Value.(*publishBatch); ok {
169-
batches = append(batches, batch)
172+
for elem := b.publishQueue.Front(); elem != nil; {
173+
batch := elem.Value.(*publishBatch)
174+
if elem.Prev() != nil {
175+
// Merge current batch with previous if within max bytes and count limits.
176+
prevBatch := elem.Prev().Value.(*publishBatch)
177+
totalSize := prevBatch.totalSize + batch.totalSize
178+
totalLen := len(prevBatch.msgHolders) + len(batch.msgHolders)
179+
if totalSize <= MaxPublishRequestBytes && totalLen <= MaxPublishRequestCount {
180+
prevBatch.totalSize = totalSize
181+
prevBatch.msgHolders = append(prevBatch.msgHolders, batch.msgHolders...)
182+
removeElem := elem
183+
elem = elem.Next()
184+
b.publishQueue.Remove(removeElem)
185+
continue
186+
}
170187
}
188+
batches = append(batches, batch)
189+
elem = elem.Next()
171190
}
172191
return batches
173192
}

pubsublite/internal/wire/publish_batcher_test.go

+136-39
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,8 @@ func (br *testPublishBatchReceiver) ValidateBatches(want []*publishBatch) {
128128
}
129129
}
130130

131-
if !testutil.Equal(got, want, cmp.AllowUnexported(publishBatch{}, messageHolder{})) {
132-
br.t.Errorf("Batches got: %v\nwant: %v", got, want)
131+
if diff := testutil.Diff(got, want, cmp.AllowUnexported(publishBatch{}, messageHolder{})); diff != "" {
132+
br.t.Errorf("Batches got: -, want: +\n%s", diff)
133133
}
134134
}
135135

@@ -144,6 +144,15 @@ func makeMsgHolder(msg *pb.PubSubMessage, receiver ...*testPublishResultReceiver
144144
return h
145145
}
146146

147+
func makePublishBatch(msgs ...*messageHolder) *publishBatch {
148+
batch := new(publishBatch)
149+
for _, msg := range msgs {
150+
batch.msgHolders = append(batch.msgHolders, msg)
151+
batch.totalSize += msg.size
152+
}
153+
return batch
154+
}
155+
147156
func TestPublishBatcherAddMessage(t *testing.T) {
148157
const initAvailableBytes = MaxPublishRequestBytes
149158
settings := DefaultPublishSettings
@@ -199,22 +208,16 @@ func TestPublishBatcherBundlerCountThreshold(t *testing.T) {
199208
// Batch 1
200209
msg1 := &pb.PubSubMessage{Data: []byte{'1'}}
201210
msg2 := &pb.PubSubMessage{Data: []byte{'2'}}
202-
wantBatch1 := &publishBatch{
203-
[]*messageHolder{makeMsgHolder(msg1), makeMsgHolder(msg2)},
204-
}
211+
wantBatch1 := makePublishBatch(makeMsgHolder(msg1), makeMsgHolder(msg2))
205212

206213
// Batch 2
207214
msg3 := &pb.PubSubMessage{Data: []byte{'3'}}
208215
msg4 := &pb.PubSubMessage{Data: []byte{'4'}}
209-
wantBatch2 := &publishBatch{
210-
[]*messageHolder{makeMsgHolder(msg3), makeMsgHolder(msg4)},
211-
}
216+
wantBatch2 := makePublishBatch(makeMsgHolder(msg3), makeMsgHolder(msg4))
212217

213218
// Batch 3
214219
msg5 := &pb.PubSubMessage{Data: []byte{'5'}}
215-
wantBatch3 := &publishBatch{
216-
[]*messageHolder{makeMsgHolder(msg5)},
217-
}
220+
wantBatch3 := makePublishBatch(makeMsgHolder(msg5))
218221

219222
receiver := newTestPublishBatchReceiver(t)
220223
batcher := newPublishMessageBatcher(&settings, 0, receiver.onNewBatch)
@@ -236,15 +239,11 @@ func TestPublishBatcherBundlerBatchingDelay(t *testing.T) {
236239

237240
// Batch 1
238241
msg1 := &pb.PubSubMessage{Data: []byte{'1'}}
239-
wantBatch1 := &publishBatch{
240-
[]*messageHolder{makeMsgHolder(msg1)},
241-
}
242+
wantBatch1 := makePublishBatch(makeMsgHolder(msg1))
242243

243244
// Batch 2
244245
msg2 := &pb.PubSubMessage{Data: []byte{'2'}}
245-
wantBatch2 := &publishBatch{
246-
[]*messageHolder{makeMsgHolder(msg2)},
247-
}
246+
wantBatch2 := makePublishBatch(makeMsgHolder(msg2))
248247

249248
receiver := newTestPublishBatchReceiver(t)
250249
batcher := newPublishMessageBatcher(&settings, 0, receiver.onNewBatch)
@@ -271,12 +270,7 @@ func TestPublishBatcherBundlerOnPermanentError(t *testing.T) {
271270
msg2 := &pb.PubSubMessage{Data: []byte{'2'}}
272271
pubResult1 := newTestPublishResultReceiver(t, msg1)
273272
pubResult2 := newTestPublishResultReceiver(t, msg2)
274-
batcher.AddBatch(&publishBatch{
275-
[]*messageHolder{
276-
makeMsgHolder(msg1, pubResult1),
277-
makeMsgHolder(msg2, pubResult2),
278-
},
279-
})
273+
batcher.AddBatch(makePublishBatch(makeMsgHolder(msg1, pubResult1), makeMsgHolder(msg2, pubResult2)))
280274

281275
wantErr := status.Error(codes.FailedPrecondition, "failed")
282276
batcher.OnPermanentError(wantErr)
@@ -306,17 +300,8 @@ func TestPublishBatcherBundlerOnPublishResponse(t *testing.T) {
306300
pubResult2 := newTestPublishResultReceiver(t, msg2)
307301
pubResult3 := newTestPublishResultReceiver(t, msg3)
308302

309-
batcher.AddBatch(&publishBatch{
310-
[]*messageHolder{
311-
makeMsgHolder(msg1, pubResult1),
312-
makeMsgHolder(msg2, pubResult2),
313-
},
314-
})
315-
batcher.AddBatch(&publishBatch{
316-
[]*messageHolder{
317-
makeMsgHolder(msg3, pubResult3),
318-
},
319-
})
303+
batcher.AddBatch(makePublishBatch(makeMsgHolder(msg1, pubResult1), makeMsgHolder(msg2, pubResult2)))
304+
batcher.AddBatch(makePublishBatch(makeMsgHolder(msg3, pubResult3)))
320305
if err := batcher.OnPublishResponse(70); err != nil {
321306
t.Errorf("OnPublishResponse() got err: %v", err)
322307
}
@@ -332,14 +317,126 @@ func TestPublishBatcherBundlerOnPublishResponse(t *testing.T) {
332317
t.Run("inconsistent offset", func(t *testing.T) {
333318
msg := &pb.PubSubMessage{Data: []byte{'4'}}
334319
pubResult := newTestPublishResultReceiver(t, msg)
335-
batcher.AddBatch(&publishBatch{
336-
[]*messageHolder{
337-
makeMsgHolder(msg, pubResult),
338-
},
339-
})
320+
batcher.AddBatch(makePublishBatch(makeMsgHolder(msg, pubResult)))
340321

341322
if gotErr, wantMsg := batcher.OnPublishResponse(80), "inconsistent start offset = 80"; !test.ErrorHasMsg(gotErr, wantMsg) {
342323
t.Errorf("OnPublishResponse() got err: %v, want err msg: %q", gotErr, wantMsg)
343324
}
344325
})
345326
}
327+
328+
func TestPublishBatcherRebatching(t *testing.T) {
329+
const partition = 2
330+
receiver := newTestPublishBatchReceiver(t)
331+
332+
t.Run("single batch", func(t *testing.T) {
333+
msg1 := &pb.PubSubMessage{Data: []byte{'1'}}
334+
335+
batcher := newPublishMessageBatcher(&DefaultPublishSettings, partition, receiver.onNewBatch)
336+
batcher.AddBatch(makePublishBatch(makeMsgHolder(msg1)))
337+
338+
got := batcher.InFlightBatches()
339+
want := []*publishBatch{
340+
makePublishBatch(makeMsgHolder(msg1)),
341+
}
342+
if diff := testutil.Diff(got, want, cmp.AllowUnexported(publishBatch{}, messageHolder{})); diff != "" {
343+
t.Errorf("Batches got: -, want: +\n%s", diff)
344+
}
345+
})
346+
347+
t.Run("merge into single batch", func(t *testing.T) {
348+
msg1 := &pb.PubSubMessage{Data: bytes.Repeat([]byte{1}, 100)}
349+
msg2 := &pb.PubSubMessage{Data: bytes.Repeat([]byte{2}, 200)}
350+
msg3 := &pb.PubSubMessage{Data: bytes.Repeat([]byte{3}, 300)}
351+
msg4 := &pb.PubSubMessage{Data: bytes.Repeat([]byte{4}, 400)}
352+
353+
batcher := newPublishMessageBatcher(&DefaultPublishSettings, partition, receiver.onNewBatch)
354+
batcher.AddBatch(makePublishBatch(makeMsgHolder(msg1)))
355+
batcher.AddBatch(makePublishBatch(makeMsgHolder(msg2), makeMsgHolder(msg3)))
356+
batcher.AddBatch(makePublishBatch(makeMsgHolder(msg4)))
357+
358+
got := batcher.InFlightBatches()
359+
want := []*publishBatch{
360+
makePublishBatch(makeMsgHolder(msg1), makeMsgHolder(msg2), makeMsgHolder(msg3), makeMsgHolder(msg4)),
361+
}
362+
if diff := testutil.Diff(got, want, cmp.AllowUnexported(publishBatch{}, messageHolder{})); diff != "" {
363+
t.Errorf("Batches got: -, want: +\n%s", diff)
364+
}
365+
})
366+
367+
t.Run("no rebatching", func(t *testing.T) {
368+
msg1 := &pb.PubSubMessage{Data: bytes.Repeat([]byte{1}, MaxPublishRequestBytes-10)}
369+
msg2 := &pb.PubSubMessage{Data: bytes.Repeat([]byte{2}, MaxPublishRequestBytes/2)}
370+
msg3 := &pb.PubSubMessage{Data: bytes.Repeat([]byte{3}, MaxPublishRequestBytes/2)}
371+
372+
batcher := newPublishMessageBatcher(&DefaultPublishSettings, partition, receiver.onNewBatch)
373+
batcher.AddBatch(makePublishBatch(makeMsgHolder(msg1)))
374+
batcher.AddBatch(makePublishBatch(makeMsgHolder(msg2)))
375+
batcher.AddBatch(makePublishBatch(makeMsgHolder(msg3)))
376+
377+
got := batcher.InFlightBatches()
378+
want := []*publishBatch{
379+
makePublishBatch(makeMsgHolder(msg1)),
380+
makePublishBatch(makeMsgHolder(msg2)),
381+
makePublishBatch(makeMsgHolder(msg3)),
382+
}
383+
if diff := testutil.Diff(got, want, cmp.AllowUnexported(publishBatch{}, messageHolder{})); diff != "" {
384+
t.Errorf("Batches got: -, want: +\n%s", diff)
385+
}
386+
})
387+
388+
t.Run("mixed rebatching", func(t *testing.T) {
389+
// Should be merged into a single batch.
390+
msg1 := &pb.PubSubMessage{Data: bytes.Repeat([]byte{1}, MaxPublishRequestBytes/2)}
391+
msg2 := &pb.PubSubMessage{Data: bytes.Repeat([]byte{2}, 200)}
392+
msg3 := &pb.PubSubMessage{Data: bytes.Repeat([]byte{3}, 300)}
393+
// Not merged due to byte limit.
394+
msg4 := &pb.PubSubMessage{Data: bytes.Repeat([]byte{4}, MaxPublishRequestBytes-500)}
395+
msg5 := &pb.PubSubMessage{Data: bytes.Repeat([]byte{5}, 500)}
396+
397+
batcher := newPublishMessageBatcher(&DefaultPublishSettings, partition, receiver.onNewBatch)
398+
batcher.AddBatch(makePublishBatch(makeMsgHolder(msg1)))
399+
batcher.AddBatch(makePublishBatch(makeMsgHolder(msg2)))
400+
batcher.AddBatch(makePublishBatch(makeMsgHolder(msg3)))
401+
batcher.AddBatch(makePublishBatch(makeMsgHolder(msg4)))
402+
batcher.AddBatch(makePublishBatch(makeMsgHolder(msg5)))
403+
404+
got := batcher.InFlightBatches()
405+
want := []*publishBatch{
406+
makePublishBatch(makeMsgHolder(msg1), makeMsgHolder(msg2), makeMsgHolder(msg3)),
407+
makePublishBatch(makeMsgHolder(msg4)),
408+
makePublishBatch(makeMsgHolder(msg5)),
409+
}
410+
if diff := testutil.Diff(got, want, cmp.AllowUnexported(publishBatch{}, messageHolder{})); diff != "" {
411+
t.Errorf("Batches got: -, want: +\n%s", diff)
412+
}
413+
})
414+
415+
t.Run("max count", func(t *testing.T) {
416+
var msgs []*pb.PubSubMessage
417+
var batch1 []*messageHolder
418+
var batch2 []*messageHolder
419+
batcher := newPublishMessageBatcher(&DefaultPublishSettings, partition, receiver.onNewBatch)
420+
for i := 0; i <= MaxPublishRequestCount; i++ {
421+
msg := &pb.PubSubMessage{Data: []byte{'0'}}
422+
msgs = append(msgs, msg)
423+
424+
msgHolder := makeMsgHolder(msg)
425+
if i < MaxPublishRequestCount {
426+
batch1 = append(batch1, msgHolder)
427+
} else {
428+
batch2 = append(batch2, msgHolder)
429+
}
430+
batcher.AddBatch(makePublishBatch(msgHolder))
431+
}
432+
433+
got := batcher.InFlightBatches()
434+
want := []*publishBatch{
435+
makePublishBatch(batch1...),
436+
makePublishBatch(batch2...),
437+
}
438+
if diff := testutil.Diff(got, want, cmp.AllowUnexported(publishBatch{}, messageHolder{})); diff != "" {
439+
t.Errorf("Batches got: -, want: +\n%s", diff)
440+
}
441+
})
442+
}

pubsublite/internal/wire/publisher_test.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -196,8 +196,7 @@ func TestSinglePartitionPublisherResendMessages(t *testing.T) {
196196
// The publisher should resend all in-flight batches to the second stream.
197197
stream2 := test.NewRPCVerifier(t)
198198
stream2.Push(initPubReq(topic), initPubResp(), nil)
199-
stream2.Push(msgPubReq(msg1), msgPubResp(0), nil)
200-
stream2.Push(msgPubReq(msg2), msgPubResp(1), nil)
199+
stream2.Push(msgPubReq(msg1, msg2), msgPubResp(0), nil)
201200
stream2.Push(msgPubReq(msg3), msgPubResp(2), nil)
202201
verifiers.AddPublishStream(topic.Path, topic.Partition, stream2)
203202

0 commit comments

Comments
 (0)