Skip to content

Commit 98ec384

Browse files
committed
fix: use least loaded broker to refresh metadata
Seed brokers never change after client initialization. If the first seed broker became stale (still online, but moved to other Kafka cluster), Sarama client may use this stale broker to get the wrong metadata. To avoid using the stale broker to do metadata refresh, we will choose the least loaded broker in the cached broker list which is the similar to how the Java client implementation works: https://github.com/apache/kafka/blob/7483991a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L671-L736 Contributes-to: #2637 Signed-off-by: Hao Sun <haos@uber.com>
1 parent 4b55bb3 commit 98ec384

9 files changed

+216
-137
lines changed

admin_test.go

+8-4
Original file line numberDiff line numberDiff line change
@@ -1712,11 +1712,15 @@ func TestRefreshMetaDataWithDifferentController(t *testing.T) {
17121712
seedBroker1.BrokerID(), b.ID())
17131713
}
17141714

1715+
metadataResponse := NewMockMetadataResponse(t).
1716+
SetController(seedBroker2.BrokerID()).
1717+
SetBroker(seedBroker1.Addr(), seedBroker1.BrokerID()).
1718+
SetBroker(seedBroker2.Addr(), seedBroker2.BrokerID())
17151719
seedBroker1.SetHandlerByMap(map[string]MockResponse{
1716-
"MetadataRequest": NewMockMetadataResponse(t).
1717-
SetController(seedBroker2.BrokerID()).
1718-
SetBroker(seedBroker1.Addr(), seedBroker1.BrokerID()).
1719-
SetBroker(seedBroker2.Addr(), seedBroker2.BrokerID()),
1720+
"MetadataRequest": metadataResponse,
1721+
})
1722+
seedBroker2.SetHandlerByMap(map[string]MockResponse{
1723+
"MetadataRequest": metadataResponse,
17201724
})
17211725

17221726
if b, _ := ca.refreshController(); seedBroker2.BrokerID() != b.ID() {

async_producer_test.go

+40-29
Original file line numberDiff line numberDiff line change
@@ -505,7 +505,7 @@ func TestAsyncProducerBrokerBounce(t *testing.T) {
505505
// When: a broker connection gets reset by a broker (network glitch, restart, you name it).
506506
leader.Close() // producer should get EOF
507507
leader = NewMockBrokerAddr(t, 2, leaderAddr) // start it up again right away for giggles
508-
seedBroker.Returns(metadataResponse) // tell it to go to broker 2 again
508+
leader.Returns(metadataResponse) // tell it to go to broker 2 again
509509

510510
// Then: a produced message goes through the new broker connection.
511511
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
@@ -591,13 +591,13 @@ func TestAsyncProducerMultipleRetries(t *testing.T) {
591591
metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
592592
metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError)
593593

594-
seedBroker.Returns(metadataLeader2)
594+
leader1.Returns(metadataLeader2)
595595
leader2.Returns(prodNotLeader)
596-
seedBroker.Returns(metadataLeader1)
596+
leader2.Returns(metadataLeader1)
597597
leader1.Returns(prodNotLeader)
598-
seedBroker.Returns(metadataLeader1)
598+
leader1.Returns(metadataLeader1)
599599
leader1.Returns(prodNotLeader)
600-
seedBroker.Returns(metadataLeader2)
600+
leader1.Returns(metadataLeader2)
601601

602602
prodSuccess := new(ProduceResponse)
603603
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
@@ -653,13 +653,13 @@ func TestAsyncProducerMultipleRetriesWithBackoffFunc(t *testing.T) {
653653
metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError)
654654

655655
leader1.Returns(prodNotLeader)
656-
seedBroker.Returns(metadataLeader2)
656+
leader1.Returns(metadataLeader2)
657657
leader2.Returns(prodNotLeader)
658-
seedBroker.Returns(metadataLeader1)
658+
leader2.Returns(metadataLeader1)
659659
leader1.Returns(prodNotLeader)
660-
seedBroker.Returns(metadataLeader1)
660+
leader1.Returns(metadataLeader1)
661661
leader1.Returns(prodNotLeader)
662-
seedBroker.Returns(metadataLeader2)
662+
leader1.Returns(metadataLeader2)
663663
leader2.Returns(prodSuccess)
664664

665665
expectResults(t, producer, 1, 0)
@@ -739,16 +739,17 @@ func TestAsyncProducerBrokerRestart(t *testing.T) {
739739
leader := NewMockBroker(t, 2)
740740

741741
var leaderLock sync.Mutex
742-
743-
// The seed broker only handles Metadata request
744-
seedBroker.setHandler(func(req *request) (res encoderWithHeader) {
742+
metadataRequestHandlerFunc := func(req *request) (res encoderWithHeader) {
745743
leaderLock.Lock()
746744
defer leaderLock.Unlock()
747745
metadataLeader := new(MetadataResponse)
748746
metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
749747
metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
750748
return metadataLeader
751-
})
749+
}
750+
751+
// The seed broker only handles Metadata request in bootstrap
752+
seedBroker.setHandler(metadataRequestHandlerFunc)
752753

753754
var emptyValues int32 = 0
754755

@@ -770,14 +771,27 @@ func TestAsyncProducerBrokerRestart(t *testing.T) {
770771
}
771772
}
772773

773-
leader.setHandler(func(req *request) (res encoderWithHeader) {
774+
failedProduceRequestHandlerFunc := func(req *request) (res encoderWithHeader) {
774775
countRecordsWithEmptyValue(req)
775776

776777
time.Sleep(50 * time.Millisecond)
777778

778779
prodSuccess := new(ProduceResponse)
779780
prodSuccess.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
780781
return prodSuccess
782+
}
783+
784+
succeededProduceRequestHandlerFunc := func(req *request) (res encoderWithHeader) {
785+
countRecordsWithEmptyValue(req)
786+
787+
prodSuccess := new(ProduceResponse)
788+
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
789+
return prodSuccess
790+
}
791+
792+
leader.SetHandlerFuncByMap(map[string]requestHandlerFunc{
793+
"ProduceRequest": failedProduceRequestHandlerFunc,
794+
"MetadataRequest": metadataRequestHandlerFunc,
781795
})
782796

783797
config := NewTestConfig()
@@ -816,12 +830,9 @@ func TestAsyncProducerBrokerRestart(t *testing.T) {
816830
leaderLock.Lock()
817831
leader = NewMockBroker(t, 2)
818832
leaderLock.Unlock()
819-
leader.setHandler(func(req *request) (res encoderWithHeader) {
820-
countRecordsWithEmptyValue(req)
821-
822-
prodSuccess := new(ProduceResponse)
823-
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
824-
return prodSuccess
833+
leader.SetHandlerFuncByMap(map[string]requestHandlerFunc{
834+
"ProduceRequest": succeededProduceRequestHandlerFunc,
835+
"MetadataRequest": metadataRequestHandlerFunc,
825836
})
826837

827838
wg.Wait()
@@ -938,7 +949,7 @@ func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) {
938949
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
939950

940951
// tell partition 0 to go to that broker again
941-
seedBroker.Returns(metadataResponse)
952+
leader.Returns(metadataResponse)
942953

943954
// succeed this time
944955
prodSuccess = new(ProduceResponse)
@@ -994,14 +1005,11 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) {
9941005

9951006
time.Sleep(50 * time.Millisecond)
9961007

997-
leader.SetHandlerByMap(map[string]MockResponse{
998-
"ProduceRequest": NewMockProduceResponse(t).
999-
SetVersion(0).
1000-
SetError("my_topic", 0, ErrNoError),
1001-
})
1002-
10031008
// tell partition 0 to go to that broker again
1004-
seedBroker.Returns(metadataResponse)
1009+
leader.Returns(metadataResponse)
1010+
prodSuccess := new(ProduceResponse)
1011+
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
1012+
leader.Returns(prodSuccess)
10051013

10061014
// succeed this time
10071015
expectResults(t, producer, 5, 0)
@@ -1010,6 +1018,9 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) {
10101018
for i := 0; i < 5; i++ {
10111019
producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
10121020
}
1021+
prodSuccess = new(ProduceResponse)
1022+
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
1023+
leader.Returns(prodSuccess)
10131024
expectResults(t, producer, 5, 0)
10141025

10151026
// shutdown
@@ -1051,7 +1062,7 @@ func TestAsyncProducerRetryShutdown(t *testing.T) {
10511062
prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
10521063
leader.Returns(prodNotLeader)
10531064

1054-
seedBroker.Returns(metadataLeader)
1065+
leader.Returns(metadataLeader)
10551066

10561067
prodSuccess := new(ProduceResponse)
10571068
prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)

client.go

+21-37
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ func (client *client) Broker(brokerID int32) (*Broker, error) {
260260
func (client *client) InitProducerID() (*InitProducerIDResponse, error) {
261261
// FIXME: this InitProducerID seems to only be called from client_test.go (TestInitProducerIDConnectionRefused) and has been superceded by transaction_manager.go?
262262
brokerErrors := make([]error, 0)
263-
for broker := client.anyBroker(); broker != nil; broker = client.anyBroker() {
263+
for broker := client.LeastLoadedBroker(); broker != nil; broker = client.LeastLoadedBroker() {
264264
request := &InitProducerIDRequest{}
265265

266266
if client.conf.Version.IsAtLeast(V2_7_0_0) {
@@ -763,22 +763,21 @@ func (client *client) registerBroker(broker *Broker) {
763763
}
764764
}
765765

766-
// deregisterBroker removes a broker from the seedsBroker list, and if it's
767-
// not the seedbroker, removes it from brokers map completely.
766+
// deregisterBroker removes a broker from the broker list, and if it's
767+
// not in the broker list, removes it from seedBrokers.
768768
func (client *client) deregisterBroker(broker *Broker) {
769769
client.lock.Lock()
770770
defer client.lock.Unlock()
771771

772+
_, ok := client.brokers[broker.ID()]
773+
if ok {
774+
Logger.Printf("client/brokers deregistered broker #%d at %s", broker.ID(), broker.Addr())
775+
delete(client.brokers, broker.ID())
776+
return
777+
}
772778
if len(client.seedBrokers) > 0 && broker == client.seedBrokers[0] {
773779
client.deadSeeds = append(client.deadSeeds, broker)
774780
client.seedBrokers = client.seedBrokers[1:]
775-
} else {
776-
// we do this so that our loop in `tryRefreshMetadata` doesn't go on forever,
777-
// but we really shouldn't have to; once that loop is made better this case can be
778-
// removed, and the function generally can be renamed from `deregisterBroker` to
779-
// `nextSeedBroker` or something
780-
DebugLogger.Printf("client/brokers deregistered broker #%d at %s", broker.ID(), broker.Addr())
781-
delete(client.brokers, broker.ID())
782781
}
783782
}
784783

@@ -791,33 +790,12 @@ func (client *client) resurrectDeadBrokers() {
791790
client.deadSeeds = nil
792791
}
793792

794-
func (client *client) anyBroker() *Broker {
795-
client.lock.RLock()
796-
defer client.lock.RUnlock()
797-
798-
if len(client.seedBrokers) > 0 {
799-
_ = client.seedBrokers[0].Open(client.conf)
800-
return client.seedBrokers[0]
801-
}
802-
803-
// not guaranteed to be random *or* deterministic
804-
for _, broker := range client.brokers {
805-
_ = broker.Open(client.conf)
806-
return broker
807-
}
808-
809-
return nil
810-
}
811-
793+
// LeastLoadedBroker returns the broker with the least pending requests.
794+
// Firstly, choose the broker from cached broker list. If the broker list is empty, choose from seed brokers.
812795
func (client *client) LeastLoadedBroker() *Broker {
813796
client.lock.RLock()
814797
defer client.lock.RUnlock()
815798

816-
if len(client.seedBrokers) > 0 {
817-
_ = client.seedBrokers[0].Open(client.conf)
818-
return client.seedBrokers[0]
819-
}
820-
821799
var leastLoadedBroker *Broker
822800
pendingRequests := math.MaxInt
823801
for _, broker := range client.brokers {
@@ -826,10 +804,16 @@ func (client *client) LeastLoadedBroker() *Broker {
826804
leastLoadedBroker = broker
827805
}
828806
}
829-
830807
if leastLoadedBroker != nil {
831808
_ = leastLoadedBroker.Open(client.conf)
809+
return leastLoadedBroker
832810
}
811+
812+
if len(client.seedBrokers) > 0 {
813+
_ = client.seedBrokers[0].Open(client.conf)
814+
return client.seedBrokers[0]
815+
}
816+
833817
return leastLoadedBroker
834818
}
835819

@@ -1032,9 +1016,9 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int,
10321016
return err
10331017
}
10341018

1035-
broker := client.anyBroker()
1019+
broker := client.LeastLoadedBroker()
10361020
brokerErrors := make([]error, 0)
1037-
for ; broker != nil && !pastDeadline(0); broker = client.anyBroker() {
1021+
for ; broker != nil && !pastDeadline(0); broker = client.LeastLoadedBroker() {
10381022
allowAutoTopicCreation := client.conf.Metadata.AllowAutoTopicCreation
10391023
if len(topics) > 0 {
10401024
DebugLogger.Printf("client/metadata fetching metadata for %v from broker %s\n", topics, broker.addr)
@@ -1212,7 +1196,7 @@ func (client *client) findCoordinator(coordinatorKey string, coordinatorType Coo
12121196
}
12131197

12141198
brokerErrors := make([]error, 0)
1215-
for broker := client.anyBroker(); broker != nil; broker = client.anyBroker() {
1199+
for broker := client.LeastLoadedBroker(); broker != nil; broker = client.LeastLoadedBroker() {
12161200
DebugLogger.Printf("client/coordinator requesting coordinator for %s from %s\n", coordinatorKey, broker.Addr())
12171201

12181202
request := new(FindCoordinatorRequest)

0 commit comments

Comments
 (0)