Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(consumer): use full range of FetchRequest vers #2554

Merged
merged 2 commits into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1068,20 +1068,35 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
MinBytes: bc.consumer.conf.Consumer.Fetch.Min,
MaxWaitTime: int32(bc.consumer.conf.Consumer.MaxWaitTime / time.Millisecond),
}
// Version 1 is the same as version 0.
if bc.consumer.conf.Version.IsAtLeast(V0_9_0_0) {
request.Version = 1
}
// Starting in Version 2, the requestor must be able to handle Kafka Log
// Message format version 1.
if bc.consumer.conf.Version.IsAtLeast(V0_10_0_0) {
request.Version = 2
}
// Version 3 adds MaxBytes. Starting in version 3, the partition ordering in
// the request is now relevant. Partitions will be processed in the order
// they appear in the request.
if bc.consumer.conf.Version.IsAtLeast(V0_10_1_0) {
request.Version = 3
request.MaxBytes = MaxResponseSize
}
// Version 4 adds IsolationLevel. Starting in version 4, the reqestor must be
// able to handle Kafka log message format version 2.
// Version 5 adds LogStartOffset to indicate the earliest available offset of
// partition data that can be consumed.
if bc.consumer.conf.Version.IsAtLeast(V0_11_0_0) {
request.Version = 4
request.Version = 5
request.Isolation = bc.consumer.conf.Consumer.IsolationLevel
}
// Version 6 is the same as version 5.
if bc.consumer.conf.Version.IsAtLeast(V1_0_0_0) {
request.Version = 6
}
// Version 7 adds incremental fetch request support.
if bc.consumer.conf.Version.IsAtLeast(V1_1_0_0) {
request.Version = 7
// We do not currently implement KIP-227 FetchSessions. Setting the id to 0
Expand All @@ -1090,9 +1105,17 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
request.SessionID = 0
request.SessionEpoch = -1
}
// Version 8 is the same as version 7.
if bc.consumer.conf.Version.IsAtLeast(V2_0_0_0) {
request.Version = 8
}
// Version 9 adds CurrentLeaderEpoch, as described in KIP-320.
// Version 10 indicates that we can use the ZStd compression algorithm, as
// described in KIP-110.
if bc.consumer.conf.Version.IsAtLeast(V2_1_0_0) {
request.Version = 10
}
// Version 11 adds RackID for KIP-392 fetch from closest replica
if bc.consumer.conf.Version.IsAtLeast(V2_3_0_0) {
request.Version = 11
request.RackID = bc.consumer.conf.RackID
Expand Down
21 changes: 11 additions & 10 deletions consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,10 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler
}
return c.retryNewSession(ctx, topics, handler, retries, true)
case ErrMemberIdRequired:
// from JoinGroupRequest v4, if client start with empty member id,
// it need to get member id from response and send another join request to join group
// from JoinGroupRequest v4 onwards (due to KIP-394) if the client starts
// with an empty member id, it needs to get the assigned id from the
// response and send another join request with that id to actually join the
// group
c.memberID = join.MemberId
return c.retryNewSession(ctx, topics, handler, retries+1 /*keep retry time*/, false)
case ErrFencedInstancedId:
Expand Down Expand Up @@ -442,14 +444,13 @@ func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) (
if c.config.Version.IsAtLeast(V2_0_0_0) {
req.Version = 3
}
// XXX: protocol states "Starting from version 4, the client needs to issue a
// second request to join group", so not enabling this until we can
// investigate
/*
if c.config.Version.IsAtLeast(V2_2_0_0) {
req.Version = 4
}
*/
// from JoinGroupRequest v4 onwards (due to KIP-394) the client will actually
// send two JoinGroupRequests, once with the empty member id, and then again
// with the assigned id from the first response. This is handled via the
// ErrMemberIdRequired case.
if c.config.Version.IsAtLeast(V2_2_0_0) {
req.Version = 4
}
if c.config.Version.IsAtLeast(V2_3_0_0) {
req.Version = 5
req.GroupInstanceId = c.groupInstanceId
Expand Down
22 changes: 12 additions & 10 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ import (
"time"
)

var testMsg = StringEncoder("Foo")
var testKey = StringEncoder("Bar")
var (
testMsg = StringEncoder("Foo")
testKey = StringEncoder("Bar")
)

// If a particular offset is provided then messages are consumed starting from
// that offset.
Expand Down Expand Up @@ -632,7 +634,7 @@ func TestConsumerExtraOffsets(t *testing.T) {
legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 2)
legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 3)
legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 4)
newFetchResponse := &FetchResponse{Version: 4}
newFetchResponse := &FetchResponse{Version: 5}
newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 1)
newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 2)
newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 3)
Expand All @@ -642,7 +644,7 @@ func TestConsumerExtraOffsets(t *testing.T) {
for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} {
cfg := NewTestConfig()
cfg.Consumer.Return.Errors = true
if fetchResponse1.Version >= 4 {
if fetchResponse1.Version >= 5 {
cfg.Version = V0_11_0_0
}

Expand Down Expand Up @@ -698,10 +700,10 @@ func TestConsumerExtraOffsets(t *testing.T) {
// more messages if higher offset was requested.
func TestConsumerReceivingFetchResponseWithTooOldRecords(t *testing.T) {
// Given
fetchResponse1 := &FetchResponse{Version: 4}
fetchResponse1 := &FetchResponse{Version: 5}
fetchResponse1.AddRecord("my_topic", 0, nil, testMsg, 1)

fetchResponse2 := &FetchResponse{Version: 4}
fetchResponse2 := &FetchResponse{Version: 5}
fetchResponse2.AddRecord("my_topic", 0, nil, testMsg, 1000000)

cfg := NewTestConfig()
Expand Down Expand Up @@ -745,7 +747,7 @@ func TestConsumerReceivingFetchResponseWithTooOldRecords(t *testing.T) {

func TestConsumeMessageWithNewerFetchAPIVersion(t *testing.T) {
// Given
fetchResponse1 := &FetchResponse{Version: 4}
fetchResponse1 := &FetchResponse{Version: 5}
fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1)
fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 2)

Expand Down Expand Up @@ -1220,7 +1222,7 @@ func TestConsumerNonSequentialOffsets(t *testing.T) {
legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 5)
legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 7)
legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 11)
newFetchResponse := &FetchResponse{Version: 4}
newFetchResponse := &FetchResponse{Version: 5}
newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 5)
newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 7)
newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 11)
Expand Down Expand Up @@ -1735,7 +1737,7 @@ func TestConsumerTimestamps(t *testing.T) {
cfg.Version = d.kversion
switch {
case d.kversion.IsAtLeast(V0_11_0_0):
fr = &FetchResponse{Version: 4, LogAppendTime: d.logAppendTime, Timestamp: now}
fr = &FetchResponse{Version: 5, LogAppendTime: d.logAppendTime, Timestamp: now}
for _, m := range d.messages {
fr.AddRecordWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp)
}
Expand Down Expand Up @@ -1806,7 +1808,7 @@ func TestExcludeUncommitted(t *testing.T) {
broker0 := NewMockBroker(t, 0)

fetchResponse := &FetchResponse{
Version: 4,
Version: 5,
Blocks: map[string]map[int32]*FetchResponseBlock{"my_topic": {0: {
AbortedTransactions: []*AbortedTransaction{{ProducerID: 7, FirstOffset: 1235}},
}}},
Expand Down
38 changes: 19 additions & 19 deletions fetch_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,28 +281,28 @@ func (r *FetchRequest) isValidVersion() bool {

func (r *FetchRequest) requiredVersion() KafkaVersion {
switch r.Version {
case 0:
return MinVersion
case 1:
return V0_9_0_0
case 2:
return V0_10_0_0
case 3:
return V0_10_1_0
case 4, 5:
return V0_11_0_0
case 6:
return V1_0_0_0
case 7:
return V1_1_0_0
case 8:
return V2_0_0_0
case 9, 10:
return V2_1_0_0
case 11:
return V2_3_0_0
case 9, 10:
return V2_1_0_0
case 8:
return V2_0_0_0
case 7:
return V1_1_0_0
case 6:
return V1_0_0_0
case 4, 5:
return V0_11_0_0
case 3:
return V0_10_1_0
case 2:
return V0_10_0_0
case 1:
return V0_9_0_0
case 0:
return V0_8_2_0
default:
return MaxVersion
return V2_3_0_0
}
}

Expand Down
38 changes: 19 additions & 19 deletions fetch_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,28 +392,28 @@ func (r *FetchResponse) isValidVersion() bool {

func (r *FetchResponse) requiredVersion() KafkaVersion {
switch r.Version {
case 0:
return MinVersion
case 1:
return V0_9_0_0
case 2:
return V0_10_0_0
case 3:
return V0_10_1_0
case 4, 5:
return V0_11_0_0
case 6:
return V1_0_0_0
case 7:
return V1_1_0_0
case 8:
return V2_0_0_0
case 9, 10:
return V2_1_0_0
case 11:
return V2_3_0_0
case 9, 10:
return V2_1_0_0
case 8:
return V2_0_0_0
case 7:
return V1_1_0_0
case 6:
return V1_0_0_0
case 4, 5:
return V0_11_0_0
case 3:
return V0_10_1_0
case 2:
return V0_10_0_0
case 1:
return V0_9_0_0
case 0:
return V0_8_2_0
default:
return MaxVersion
return V2_3_0_0
}
}

Expand Down