Skip to content

Commit d225e3a

Browse files
committed
fix(client): Metadata refresh should skip updates when metadata response is empty
Fix IBM#2664 We should skip the metadata refresh if the startup phase broker returns empty brokers in metadata response. The Java client skips the empty response to update the metadata cache (https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L1149) and we should make a feature parity in Sarama too Signed-off-by: Hao Sun <haos@uber.com>
1 parent 6678dd1 commit d225e3a

5 files changed

+92
-10
lines changed

client.go

+7
Original file line numberDiff line numberDiff line change
@@ -1035,6 +1035,13 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int,
10351035
var kerror KError
10361036
var packetEncodingError PacketEncodingError
10371037
if err == nil {
1038+
// When talking to the startup phase of a broker, it is possible to receive an empty metadata set. We should remove that broker and try next broker (https://issues.apache.org/jira/browse/KAFKA-7924).
1039+
if len(response.Brokers) == 0 {
1040+
Logger.Println("client/metadata receiving empty brokers from the metadata response when requesting the broker #%d at %s", broker.ID(), broker.addr)
1041+
_ = broker.Close()
1042+
client.deregisterBroker(broker)
1043+
continue
1044+
}
10381045
allKnownMetaData := len(topics) == 0
10391046
// valid response, use it
10401047
shouldRetry, err := client.updateMetadata(response, allKnownMetaData)

client_test.go

+78-8
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ func safeClose(t testing.TB, c io.Closer) {
2323
func TestSimpleClient(t *testing.T) {
2424
seedBroker := NewMockBroker(t, 1)
2525

26-
seedBroker.Returns(new(MetadataResponse))
26+
metadataResponse := new(MetadataResponse)
27+
metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
28+
seedBroker.Returns(metadataResponse)
2729

2830
client, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig())
2931
if err != nil {
@@ -92,6 +94,7 @@ func TestClientDoesntCachePartitionsForTopicsWithErrors(t *testing.T) {
9294
}
9395

9496
metadataResponse = new(MetadataResponse)
97+
metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
9598
metadataResponse.AddTopic("unknown", ErrUnknownTopicOrPartition)
9699
seedBroker.Returns(metadataResponse)
97100

@@ -111,6 +114,7 @@ func TestClientDoesntCachePartitionsForTopicsWithErrors(t *testing.T) {
111114
}
112115

113116
metadataResponse = new(MetadataResponse)
117+
metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
114118
metadataResponse.AddTopic("unknown", ErrUnknownTopicOrPartition)
115119
seedBroker.Returns(metadataResponse)
116120

@@ -358,6 +362,7 @@ func TestClientReceivingUnknownTopicWithBackoffFunc(t *testing.T) {
358362
seedBroker := NewMockBroker(t, 1)
359363

360364
metadataResponse1 := new(MetadataResponse)
365+
metadataResponse1.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
361366
seedBroker.Returns(metadataResponse1)
362367

363368
retryCount := int32(0)
@@ -375,6 +380,7 @@ func TestClientReceivingUnknownTopicWithBackoffFunc(t *testing.T) {
375380

376381
metadataUnknownTopic := new(MetadataResponse)
377382
metadataUnknownTopic.AddTopic("new_topic", ErrUnknownTopicOrPartition)
383+
metadataUnknownTopic.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
378384
seedBroker.Returns(metadataUnknownTopic)
379385
seedBroker.Returns(metadataUnknownTopic)
380386

@@ -395,6 +401,7 @@ func TestClientReceivingUnknownTopic(t *testing.T) {
395401
seedBroker := NewMockBroker(t, 1)
396402

397403
metadataResponse1 := new(MetadataResponse)
404+
metadataResponse1.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
398405
seedBroker.Returns(metadataResponse1)
399406

400407
config := NewTestConfig()
@@ -406,6 +413,7 @@ func TestClientReceivingUnknownTopic(t *testing.T) {
406413
}
407414

408415
metadataUnknownTopic := new(MetadataResponse)
416+
metadataUnknownTopic.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
409417
metadataUnknownTopic.AddTopic("new_topic", ErrUnknownTopicOrPartition)
410418
seedBroker.Returns(metadataUnknownTopic)
411419
seedBroker.Returns(metadataUnknownTopic)
@@ -481,6 +489,53 @@ func TestClientReceivingPartialMetadata(t *testing.T) {
481489
leader.Close()
482490
}
483491

492+
func TestClientRefreshBehaviourWhenEmptyMetadataResponse(t *testing.T) {
493+
seedBroker := NewMockBroker(t, 1)
494+
broker := NewMockBroker(t, 2)
495+
496+
metadataResponse1 := new(MetadataResponse)
497+
metadataResponse1.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
498+
seedBroker.Returns(metadataResponse1)
499+
500+
c, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig())
501+
if err != nil {
502+
t.Fatal(err)
503+
}
504+
client := c.(*client)
505+
if len(client.seedBrokers) != 1 {
506+
t.Error("incorrect number of live seeds")
507+
}
508+
if len(client.deadSeeds) != 0 {
509+
t.Error("incorrect number of dead seeds")
510+
}
511+
if len(client.brokers) != 1 {
512+
t.Error("incorrect number of brokers")
513+
}
514+
515+
// Empty metadata response
516+
seedBroker.Returns(new(MetadataResponse))
517+
metadataResponse2 := new(MetadataResponse)
518+
metadataResponse2.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
519+
metadataResponse2.AddBroker(broker.Addr(), broker.BrokerID())
520+
seedBroker.Returns(metadataResponse2)
521+
err = c.RefreshMetadata()
522+
if err != nil {
523+
t.Fatal(err)
524+
}
525+
if len(client.seedBrokers) != 1 {
526+
t.Error("incorrect number of live seeds")
527+
}
528+
if len(client.deadSeeds) != 0 {
529+
t.Error("incorrect number of dead seeds")
530+
}
531+
if len(client.brokers) != 2 {
532+
t.Error("incorrect number of brokers")
533+
}
534+
broker.Close()
535+
seedBroker.Close()
536+
safeClose(t, client)
537+
}
538+
484539
func TestClientRefreshBehaviour(t *testing.T) {
485540
seedBroker := NewMockBroker(t, 1)
486541
leader := NewMockBroker(t, 5)
@@ -633,8 +688,9 @@ func TestClientGetBroker(t *testing.T) {
633688

634689
func TestClientResurrectDeadSeeds(t *testing.T) {
635690
initialSeed := NewMockBroker(t, 0)
636-
emptyMetadata := new(MetadataResponse)
637-
initialSeed.Returns(emptyMetadata)
691+
metadataResponse := new(MetadataResponse)
692+
metadataResponse.AddBroker(initialSeed.Addr(), initialSeed.BrokerID())
693+
initialSeed.Returns(metadataResponse)
638694

639695
conf := NewTestConfig()
640696
conf.Metadata.Retry.Backoff = 0
@@ -643,7 +699,6 @@ func TestClientResurrectDeadSeeds(t *testing.T) {
643699
if err != nil {
644700
t.Fatal(err)
645701
}
646-
initialSeed.Close()
647702

648703
client := c.(*client)
649704

@@ -658,6 +713,7 @@ func TestClientResurrectDeadSeeds(t *testing.T) {
658713
safeClose(t, client.seedBrokers[0])
659714
client.seedBrokers = []*Broker{NewBroker(addr1), NewBroker(addr2), NewBroker(addr3)}
660715
client.deadSeeds = []*Broker{}
716+
client.brokers = map[int32]*Broker{}
661717

662718
wg := sync.WaitGroup{}
663719
wg.Add(1)
@@ -676,7 +732,9 @@ func TestClientResurrectDeadSeeds(t *testing.T) {
676732
seed3.Close()
677733

678734
seed1.Close()
679-
seed2.Returns(emptyMetadata)
735+
metadataResponse2 := new(MetadataResponse)
736+
metadataResponse2.AddBroker(seed2.Addr(), seed2.BrokerID())
737+
seed2.Returns(metadataResponse2)
680738

681739
wg.Wait()
682740

@@ -767,6 +825,7 @@ func TestClientMetadataTimeout(t *testing.T) {
767825
// Use a responsive broker to create a working client
768826
initialSeed := NewMockBroker(t, 0)
769827
emptyMetadata := new(MetadataResponse)
828+
emptyMetadata.AddBroker(initialSeed.Addr(), initialSeed.BrokerID())
770829
initialSeed.Returns(emptyMetadata)
771830

772831
conf := NewTestConfig()
@@ -996,6 +1055,7 @@ func TestClientCoordinatorWithoutConsumerOffsetsTopic(t *testing.T) {
9961055
coordinator := NewMockBroker(t, 2)
9971056

9981057
metadataResponse1 := new(MetadataResponse)
1058+
metadataResponse1.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
9991059
seedBroker.Returns(metadataResponse1)
10001060

10011061
config := NewTestConfig()
@@ -1011,11 +1071,13 @@ func TestClientCoordinatorWithoutConsumerOffsetsTopic(t *testing.T) {
10111071
seedBroker.Returns(coordinatorResponse1)
10121072

10131073
metadataResponse2 := new(MetadataResponse)
1074+
metadataResponse2.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
10141075
metadataResponse2.AddTopic("__consumer_offsets", ErrUnknownTopicOrPartition)
10151076
seedBroker.Returns(metadataResponse2)
10161077

10171078
replicas := []int32{coordinator.BrokerID()}
10181079
metadataResponse3 := new(MetadataResponse)
1080+
metadataResponse3.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
10191081
metadataResponse3.AddTopicPartition("__consumer_offsets", 0, replicas[0], replicas, replicas, []int32{}, ErrNoError)
10201082
seedBroker.Returns(metadataResponse3)
10211083

@@ -1049,6 +1111,7 @@ func TestClientAutorefreshShutdownRace(t *testing.T) {
10491111
defer seedBroker.Close()
10501112

10511113
metadataResponse := new(MetadataResponse)
1114+
metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
10521115
seedBroker.Returns(metadataResponse)
10531116

10541117
conf := NewTestConfig()
@@ -1105,7 +1168,9 @@ func TestClientConnectionRefused(t *testing.T) {
11051168
func TestClientCoordinatorConnectionRefused(t *testing.T) {
11061169
t.Parallel()
11071170
seedBroker := NewMockBroker(t, 1)
1108-
seedBroker.Returns(new(MetadataResponse))
1171+
metadataResponse := new(MetadataResponse)
1172+
metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
1173+
seedBroker.Returns(metadataResponse)
11091174

11101175
client, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig())
11111176
if err != nil {
@@ -1130,7 +1195,10 @@ func TestClientCoordinatorConnectionRefused(t *testing.T) {
11301195
func TestInitProducerIDConnectionRefused(t *testing.T) {
11311196
t.Parallel()
11321197
seedBroker := NewMockBroker(t, 1)
1133-
seedBroker.Returns(&MetadataResponse{Version: 4})
1198+
metadataResponse := new(MetadataResponse)
1199+
metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
1200+
metadataResponse.Version = 4
1201+
seedBroker.Returns(metadataResponse)
11341202

11351203
config := NewTestConfig()
11361204
config.Producer.Idempotent = true
@@ -1161,7 +1229,9 @@ func TestInitProducerIDConnectionRefused(t *testing.T) {
11611229
func TestMetricsCleanup(t *testing.T) {
11621230
seedBroker := NewMockBroker(t, 1)
11631231
defer seedBroker.Close()
1164-
seedBroker.Returns(new(MetadataResponse))
1232+
metadataResponse := new(MetadataResponse)
1233+
metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
1234+
seedBroker.Returns(metadataResponse)
11651235

11661236
config := NewTestConfig()
11671237
metrics.GetOrRegisterMeter("a", config.MetricRegistry)

client_tls_test.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,9 @@ func doListenerTLSTest(t *testing.T, expectSuccess bool, serverConfig, clientCon
197197
seedBroker := NewMockBrokerListener(childT, 1, seedListener)
198198
defer seedBroker.Close()
199199

200-
seedBroker.Returns(new(MetadataResponse))
200+
metadataResponse := new(MetadataResponse)
201+
metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
202+
seedBroker.Returns(metadataResponse)
201203

202204
config := NewTestConfig()
203205
config.Net.TLS.Enable = true

offset_manager_test.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,9 @@ func initPartitionOffsetManager(t *testing.T, om OffsetManager,
7878

7979
func TestNewOffsetManager(t *testing.T) {
8080
seedBroker := NewMockBroker(t, 1)
81-
seedBroker.Returns(new(MetadataResponse))
81+
metadataResponse := new(MetadataResponse)
82+
metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
83+
seedBroker.Returns(metadataResponse)
8284
defer seedBroker.Close()
8385

8486
testClient, err := NewClient([]string{seedBroker.Addr()}, NewTestConfig())

sync_producer_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,7 @@ func TestSyncProducerToNonExistingTopic(t *testing.T) {
271271
}
272272

273273
metadataResponse = new(MetadataResponse)
274+
metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
274275
metadataResponse.AddTopic("unknown", ErrUnknownTopicOrPartition)
275276
broker.Returns(metadataResponse)
276277

0 commit comments

Comments
 (0)