Skip to content

Commit d4675d8

Browse files
committed
fix(consumer): use full range of FetchRequest vers
Signed-off-by: Dominic Evans <dominic.evans@uk.ibm.com>
1 parent 68312a5 commit d4675d8

File tree

3 files changed

+62
-39
lines changed

3 files changed

+62
-39
lines changed

consumer.go

+24-1
Original file line numberDiff line numberDiff line change
@@ -1068,20 +1068,35 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
10681068
MinBytes: bc.consumer.conf.Consumer.Fetch.Min,
10691069
MaxWaitTime: int32(bc.consumer.conf.Consumer.MaxWaitTime / time.Millisecond),
10701070
}
1071+
// Version 1 is the same as version 0.
10711072
if bc.consumer.conf.Version.IsAtLeast(V0_9_0_0) {
10721073
request.Version = 1
10731074
}
1075+
// Starting in Version 2, the requestor must be able to handle Kafka Log
1076+
// Message format version 1.
10741077
if bc.consumer.conf.Version.IsAtLeast(V0_10_0_0) {
10751078
request.Version = 2
10761079
}
1080+
// Version 3 adds MaxBytes. Starting in version 3, the partition ordering in
1081+
// the request is now relevant. Partitions will be processed in the order
1082+
// they appear in the request.
10771083
if bc.consumer.conf.Version.IsAtLeast(V0_10_1_0) {
10781084
request.Version = 3
10791085
request.MaxBytes = MaxResponseSize
10801086
}
1087+
// Version 4 adds IsolationLevel. Starting in version 4, the reqestor must be
1088+
// able to handle Kafka log message format version 2.
1089+
// Version 5 adds LogStartOffset to indicate the earliest available offset of
1090+
// partition data that can be consumed.
10811091
if bc.consumer.conf.Version.IsAtLeast(V0_11_0_0) {
1082-
request.Version = 4
1092+
request.Version = 5
10831093
request.Isolation = bc.consumer.conf.Consumer.IsolationLevel
10841094
}
1095+
// Version 6 is the same as version 5.
1096+
if bc.consumer.conf.Version.IsAtLeast(V1_0_0_0) {
1097+
request.Version = 6
1098+
}
1099+
// Version 7 adds incremental fetch request support.
10851100
if bc.consumer.conf.Version.IsAtLeast(V1_1_0_0) {
10861101
request.Version = 7
10871102
// We do not currently implement KIP-227 FetchSessions. Setting the id to 0
@@ -1090,9 +1105,17 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
10901105
request.SessionID = 0
10911106
request.SessionEpoch = -1
10921107
}
1108+
// Version 8 is the same as version 7.
1109+
if bc.consumer.conf.Version.IsAtLeast(V2_0_0_0) {
1110+
request.Version = 8
1111+
}
1112+
// Version 9 adds CurrentLeaderEpoch, as described in KIP-320.
1113+
// Version 10 indicates that we can use the ZStd compression algorithm, as
1114+
// described in KIP-110.
10931115
if bc.consumer.conf.Version.IsAtLeast(V2_1_0_0) {
10941116
request.Version = 10
10951117
}
1118+
// Version 11 adds RackID for KIP-392 fetch from closest replica
10961119
if bc.consumer.conf.Version.IsAtLeast(V2_3_0_0) {
10971120
request.Version = 11
10981121
request.RackID = bc.consumer.conf.RackID

fetch_request.go

+19-19
Original file line numberDiff line numberDiff line change
@@ -281,28 +281,28 @@ func (r *FetchRequest) isValidVersion() bool {
281281

282282
func (r *FetchRequest) requiredVersion() KafkaVersion {
283283
switch r.Version {
284-
case 0:
285-
return MinVersion
286-
case 1:
287-
return V0_9_0_0
288-
case 2:
289-
return V0_10_0_0
290-
case 3:
291-
return V0_10_1_0
292-
case 4, 5:
293-
return V0_11_0_0
294-
case 6:
295-
return V1_0_0_0
296-
case 7:
297-
return V1_1_0_0
298-
case 8:
299-
return V2_0_0_0
300-
case 9, 10:
301-
return V2_1_0_0
302284
case 11:
303285
return V2_3_0_0
286+
case 9, 10:
287+
return V2_1_0_0
288+
case 8:
289+
return V2_0_0_0
290+
case 7:
291+
return V1_1_0_0
292+
case 6:
293+
return V1_0_0_0
294+
case 4, 5:
295+
return V0_11_0_0
296+
case 3:
297+
return V0_10_1_0
298+
case 2:
299+
return V0_10_0_0
300+
case 1:
301+
return V0_9_0_0
302+
case 0:
303+
return V0_8_2_0
304304
default:
305-
return MaxVersion
305+
return V2_3_0_0
306306
}
307307
}
308308

fetch_response.go

+19-19
Original file line numberDiff line numberDiff line change
@@ -392,28 +392,28 @@ func (r *FetchResponse) isValidVersion() bool {
392392

393393
func (r *FetchResponse) requiredVersion() KafkaVersion {
394394
switch r.Version {
395-
case 0:
396-
return MinVersion
397-
case 1:
398-
return V0_9_0_0
399-
case 2:
400-
return V0_10_0_0
401-
case 3:
402-
return V0_10_1_0
403-
case 4, 5:
404-
return V0_11_0_0
405-
case 6:
406-
return V1_0_0_0
407-
case 7:
408-
return V1_1_0_0
409-
case 8:
410-
return V2_0_0_0
411-
case 9, 10:
412-
return V2_1_0_0
413395
case 11:
414396
return V2_3_0_0
397+
case 9, 10:
398+
return V2_1_0_0
399+
case 8:
400+
return V2_0_0_0
401+
case 7:
402+
return V1_1_0_0
403+
case 6:
404+
return V1_0_0_0
405+
case 4, 5:
406+
return V0_11_0_0
407+
case 3:
408+
return V0_10_1_0
409+
case 2:
410+
return V0_10_0_0
411+
case 1:
412+
return V0_9_0_0
413+
case 0:
414+
return V0_8_2_0
415415
default:
416-
return MaxVersion
416+
return V2_3_0_0
417417
}
418418
}
419419

0 commit comments

Comments
 (0)