Skip to content

Commit fe647d8

Browse files
committed
Merge pull request #394 from Shopify/retry-get-offset
Add retry logic to GetOffset() just like Leader()
2 parents 9b048b0 + f922f12 commit fe647d8

File tree

2 files changed

+84
-21
lines changed

2 files changed

+84
-21
lines changed

client.go

+36-21
Original file line numberDiff line numberDiff line change
@@ -292,31 +292,16 @@ func (client *client) RefreshMetadata(topics ...string) error {
292292
}
293293

294294
func (client *client) GetOffset(topic string, partitionID int32, time int64) (int64, error) {
295-
broker, err := client.Leader(topic, partitionID)
296-
if err != nil {
297-
return -1, err
298-
}
299-
300-
request := &OffsetRequest{}
301-
request.AddBlock(topic, partitionID, time, 1)
295+
offset, err := client.getOffset(topic, partitionID, time)
302296

303-
response, err := broker.GetAvailableOffsets(request)
304297
if err != nil {
305-
return -1, err
306-
}
307-
308-
block := response.GetBlock(topic, partitionID)
309-
if block == nil {
310-
return -1, ErrIncompleteResponse
311-
}
312-
if block.Err != ErrNoError {
313-
return -1, block.Err
314-
}
315-
if len(block.Offsets) != 1 {
316-
return -1, ErrOffsetOutOfRange
298+
if err := client.RefreshMetadata(topic); err != nil {
299+
return -1, err
300+
}
301+
return client.getOffset(topic, partitionID, time)
317302
}
318303

319-
return block.Offsets[0], nil
304+
return offset, err
320305
}
321306

322307
// private broker management helpers
@@ -442,6 +427,36 @@ func (client *client) cachedLeader(topic string, partitionID int32) (*Broker, er
442427
return nil, ErrUnknownTopicOrPartition
443428
}
444429

430+
func (client *client) getOffset(topic string, partitionID int32, time int64) (int64, error) {
431+
broker, err := client.Leader(topic, partitionID)
432+
if err != nil {
433+
return -1, err
434+
}
435+
436+
request := &OffsetRequest{}
437+
request.AddBlock(topic, partitionID, time, 1)
438+
439+
response, err := broker.GetAvailableOffsets(request)
440+
if err != nil {
441+
_ = broker.Close()
442+
return -1, err
443+
}
444+
445+
block := response.GetBlock(topic, partitionID)
446+
if block == nil {
447+
_ = broker.Close()
448+
return -1, ErrIncompleteResponse
449+
}
450+
if block.Err != ErrNoError {
451+
return -1, block.Err
452+
}
453+
if len(block.Offsets) != 1 {
454+
return -1, ErrOffsetOutOfRange
455+
}
456+
457+
return block.Offsets[0], nil
458+
}
459+
445460
// core metadata update logic
446461

447462
func (client *client) backgroundMetadataUpdater() {

client_test.go

+48
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,54 @@ func TestClientMetadata(t *testing.T) {
200200
safeClose(t, client)
201201
}
202202

203+
func TestClientGetOffset(t *testing.T) {
204+
seedBroker := newMockBroker(t, 1)
205+
leader := newMockBroker(t, 2)
206+
leaderAddr := leader.Addr()
207+
208+
metadata := new(MetadataResponse)
209+
metadata.AddTopicPartition("foo", 0, leader.BrokerID(), nil, nil, ErrNoError)
210+
metadata.AddBroker(leaderAddr, leader.BrokerID())
211+
seedBroker.Returns(metadata)
212+
213+
client, err := NewClient([]string{seedBroker.Addr()}, nil)
214+
if err != nil {
215+
t.Fatal(err)
216+
}
217+
218+
offsetResponse := new(OffsetResponse)
219+
offsetResponse.AddTopicPartition("foo", 0, 123)
220+
leader.Returns(offsetResponse)
221+
222+
offset, err := client.GetOffset("foo", 0, OffsetNewest)
223+
if err != nil {
224+
t.Error(err)
225+
}
226+
if offset != 123 {
227+
t.Error("Unexpected offset, got ", offset)
228+
}
229+
230+
leader.Close()
231+
seedBroker.Returns(metadata)
232+
233+
leader = newMockBrokerAddr(t, 2, leaderAddr)
234+
offsetResponse = new(OffsetResponse)
235+
offsetResponse.AddTopicPartition("foo", 0, 456)
236+
leader.Returns(offsetResponse)
237+
238+
offset, err = client.GetOffset("foo", 0, OffsetNewest)
239+
if err != nil {
240+
t.Error(err)
241+
}
242+
if offset != 456 {
243+
t.Error("Unexpected offset, got ", offset)
244+
}
245+
246+
seedBroker.Close()
247+
leader.Close()
248+
safeClose(t, client)
249+
}
250+
203251
func TestClientReceivingUnknownTopic(t *testing.T) {
204252
seedBroker := newMockBroker(t, 1)
205253

0 commit comments

Comments
 (0)