diff --git a/consumer.go b/consumer.go index 11f9896fb..60556a566 100644 --- a/consumer.go +++ b/consumer.go @@ -1068,20 +1068,35 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) { MinBytes: bc.consumer.conf.Consumer.Fetch.Min, MaxWaitTime: int32(bc.consumer.conf.Consumer.MaxWaitTime / time.Millisecond), } + // Version 1 is the same as version 0. if bc.consumer.conf.Version.IsAtLeast(V0_9_0_0) { request.Version = 1 } + // Starting in Version 2, the requestor must be able to handle Kafka Log + // Message format version 1. if bc.consumer.conf.Version.IsAtLeast(V0_10_0_0) { request.Version = 2 } + // Version 3 adds MaxBytes. Starting in version 3, the partition ordering in + // the request is now relevant. Partitions will be processed in the order + // they appear in the request. if bc.consumer.conf.Version.IsAtLeast(V0_10_1_0) { request.Version = 3 request.MaxBytes = MaxResponseSize } + // Version 4 adds IsolationLevel. Starting in version 4, the reqestor must be + // able to handle Kafka log message format version 2. + // Version 5 adds LogStartOffset to indicate the earliest available offset of + // partition data that can be consumed. if bc.consumer.conf.Version.IsAtLeast(V0_11_0_0) { - request.Version = 4 + request.Version = 5 request.Isolation = bc.consumer.conf.Consumer.IsolationLevel } + // Version 6 is the same as version 5. + if bc.consumer.conf.Version.IsAtLeast(V1_0_0_0) { + request.Version = 6 + } + // Version 7 adds incremental fetch request support. if bc.consumer.conf.Version.IsAtLeast(V1_1_0_0) { request.Version = 7 // We do not currently implement KIP-227 FetchSessions. Setting the id to 0 @@ -1090,9 +1105,17 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) { request.SessionID = 0 request.SessionEpoch = -1 } + // Version 8 is the same as version 7. + if bc.consumer.conf.Version.IsAtLeast(V2_0_0_0) { + request.Version = 8 + } + // Version 9 adds CurrentLeaderEpoch, as described in KIP-320. + // Version 10 indicates that we can use the ZStd compression algorithm, as + // described in KIP-110. if bc.consumer.conf.Version.IsAtLeast(V2_1_0_0) { request.Version = 10 } + // Version 11 adds RackID for KIP-392 fetch from closest replica if bc.consumer.conf.Version.IsAtLeast(V2_3_0_0) { request.Version = 11 request.RackID = bc.consumer.conf.RackID diff --git a/consumer_group.go b/consumer_group.go index 1b08f6025..ed5b5a65b 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -318,8 +318,10 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler } return c.retryNewSession(ctx, topics, handler, retries, true) case ErrMemberIdRequired: - // from JoinGroupRequest v4, if client start with empty member id, - // it need to get member id from response and send another join request to join group + // from JoinGroupRequest v4 onwards (due to KIP-394) if the client starts + // with an empty member id, it needs to get the assigned id from the + // response and send another join request with that id to actually join the + // group c.memberID = join.MemberId return c.retryNewSession(ctx, topics, handler, retries+1 /*keep retry time*/, false) case ErrFencedInstancedId: @@ -442,14 +444,13 @@ func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) ( if c.config.Version.IsAtLeast(V2_0_0_0) { req.Version = 3 } - // XXX: protocol states "Starting from version 4, the client needs to issue a - // second request to join group", so not enabling this until we can - // investigate - /* - if c.config.Version.IsAtLeast(V2_2_0_0) { - req.Version = 4 - } - */ + // from JoinGroupRequest v4 onwards (due to KIP-394) the client will actually + // send two JoinGroupRequests, once with the empty member id, and then again + // with the assigned id from the first response. This is handled via the + // ErrMemberIdRequired case. + if c.config.Version.IsAtLeast(V2_2_0_0) { + req.Version = 4 + } if c.config.Version.IsAtLeast(V2_3_0_0) { req.Version = 5 req.GroupInstanceId = c.groupInstanceId diff --git a/consumer_test.go b/consumer_test.go index a6ac06c92..c8731a58a 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -13,8 +13,10 @@ import ( "time" ) -var testMsg = StringEncoder("Foo") -var testKey = StringEncoder("Bar") +var ( + testMsg = StringEncoder("Foo") + testKey = StringEncoder("Bar") +) // If a particular offset is provided then messages are consumed starting from // that offset. @@ -632,7 +634,7 @@ func TestConsumerExtraOffsets(t *testing.T) { legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 2) legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 3) legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 4) - newFetchResponse := &FetchResponse{Version: 4} + newFetchResponse := &FetchResponse{Version: 5} newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 1) newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 2) newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 3) @@ -642,7 +644,7 @@ func TestConsumerExtraOffsets(t *testing.T) { for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} { cfg := NewTestConfig() cfg.Consumer.Return.Errors = true - if fetchResponse1.Version >= 4 { + if fetchResponse1.Version >= 5 { cfg.Version = V0_11_0_0 } @@ -698,10 +700,10 @@ func TestConsumerExtraOffsets(t *testing.T) { // more messages if higher offset was requested. func TestConsumerReceivingFetchResponseWithTooOldRecords(t *testing.T) { // Given - fetchResponse1 := &FetchResponse{Version: 4} + fetchResponse1 := &FetchResponse{Version: 5} fetchResponse1.AddRecord("my_topic", 0, nil, testMsg, 1) - fetchResponse2 := &FetchResponse{Version: 4} + fetchResponse2 := &FetchResponse{Version: 5} fetchResponse2.AddRecord("my_topic", 0, nil, testMsg, 1000000) cfg := NewTestConfig() @@ -745,7 +747,7 @@ func TestConsumerReceivingFetchResponseWithTooOldRecords(t *testing.T) { func TestConsumeMessageWithNewerFetchAPIVersion(t *testing.T) { // Given - fetchResponse1 := &FetchResponse{Version: 4} + fetchResponse1 := &FetchResponse{Version: 5} fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 1) fetchResponse1.AddMessage("my_topic", 0, nil, testMsg, 2) @@ -1220,7 +1222,7 @@ func TestConsumerNonSequentialOffsets(t *testing.T) { legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 5) legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 7) legacyFetchResponse.AddMessage("my_topic", 0, nil, testMsg, 11) - newFetchResponse := &FetchResponse{Version: 4} + newFetchResponse := &FetchResponse{Version: 5} newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 5) newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 7) newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 11) @@ -1735,7 +1737,7 @@ func TestConsumerTimestamps(t *testing.T) { cfg.Version = d.kversion switch { case d.kversion.IsAtLeast(V0_11_0_0): - fr = &FetchResponse{Version: 4, LogAppendTime: d.logAppendTime, Timestamp: now} + fr = &FetchResponse{Version: 5, LogAppendTime: d.logAppendTime, Timestamp: now} for _, m := range d.messages { fr.AddRecordWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp) } @@ -1806,7 +1808,7 @@ func TestExcludeUncommitted(t *testing.T) { broker0 := NewMockBroker(t, 0) fetchResponse := &FetchResponse{ - Version: 4, + Version: 5, Blocks: map[string]map[int32]*FetchResponseBlock{"my_topic": {0: { AbortedTransactions: []*AbortedTransaction{{ProducerID: 7, FirstOffset: 1235}}, }}}, diff --git a/fetch_request.go b/fetch_request.go index f96415b99..d1fd81384 100644 --- a/fetch_request.go +++ b/fetch_request.go @@ -281,28 +281,28 @@ func (r *FetchRequest) isValidVersion() bool { func (r *FetchRequest) requiredVersion() KafkaVersion { switch r.Version { - case 0: - return MinVersion - case 1: - return V0_9_0_0 - case 2: - return V0_10_0_0 - case 3: - return V0_10_1_0 - case 4, 5: - return V0_11_0_0 - case 6: - return V1_0_0_0 - case 7: - return V1_1_0_0 - case 8: - return V2_0_0_0 - case 9, 10: - return V2_1_0_0 case 11: return V2_3_0_0 + case 9, 10: + return V2_1_0_0 + case 8: + return V2_0_0_0 + case 7: + return V1_1_0_0 + case 6: + return V1_0_0_0 + case 4, 5: + return V0_11_0_0 + case 3: + return V0_10_1_0 + case 2: + return V0_10_0_0 + case 1: + return V0_9_0_0 + case 0: + return V0_8_2_0 default: - return MaxVersion + return V2_3_0_0 } } diff --git a/fetch_response.go b/fetch_response.go index 7790b7ddd..02e8ca473 100644 --- a/fetch_response.go +++ b/fetch_response.go @@ -392,28 +392,28 @@ func (r *FetchResponse) isValidVersion() bool { func (r *FetchResponse) requiredVersion() KafkaVersion { switch r.Version { - case 0: - return MinVersion - case 1: - return V0_9_0_0 - case 2: - return V0_10_0_0 - case 3: - return V0_10_1_0 - case 4, 5: - return V0_11_0_0 - case 6: - return V1_0_0_0 - case 7: - return V1_1_0_0 - case 8: - return V2_0_0_0 - case 9, 10: - return V2_1_0_0 case 11: return V2_3_0_0 + case 9, 10: + return V2_1_0_0 + case 8: + return V2_0_0_0 + case 7: + return V1_1_0_0 + case 6: + return V1_0_0_0 + case 4, 5: + return V0_11_0_0 + case 3: + return V0_10_1_0 + case 2: + return V0_10_0_0 + case 1: + return V0_9_0_0 + case 0: + return V0_8_2_0 default: - return MaxVersion + return V2_3_0_0 } }