Skip to content

Commit 7f455c8

Browse files
committed
Merge pull request #369 from Shopify/fix-consumer-references
consumer: bugfix for broker workers getting stuck
2 parents 963015e + d988e6d commit 7f455c8

File tree

3 files changed

+117
-17
lines changed

3 files changed

+117
-17
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
Bug Fixes:
66
- Fix the producer's internal reference counting in certain unusual scenarios
77
([#367](https://github.com/Shopify/sarama/pull/367)).
8+
- Fix the consumer's internal reference counting in certain unusual scenarios
9+
([#369](https://github.com/Shopify/sarama/pull/369)).
810
- Fix a condition where the producer's internal control messages could have
911
gotten stuck ([#368](https://github.com/Shopify/sarama/pull/368)).
1012

consumer.go

+26-17
Original file line numberDiff line numberDiff line change
@@ -115,10 +115,10 @@ func (c *consumer) ConsumePartition(topic string, partition int32, offset int64)
115115
return nil, err
116116
}
117117

118-
if leader, err := c.client.Leader(child.topic, child.partition); err != nil {
118+
var leader *Broker
119+
var err error
120+
if leader, err = c.client.Leader(child.topic, child.partition); err != nil {
119121
return nil, err
120-
} else {
121-
child.broker = leader
122122
}
123123

124124
if err := c.addChild(child); err != nil {
@@ -127,8 +127,8 @@ func (c *consumer) ConsumePartition(topic string, partition int32, offset int64)
127127

128128
go withRecover(child.dispatcher)
129129

130-
brokerWorker := c.refBrokerConsumer(child.broker)
131-
brokerWorker.input <- child
130+
child.broker = c.refBrokerConsumer(leader)
131+
child.broker.input <- child
132132

133133
return child, nil
134134
}
@@ -171,31 +171,39 @@ func (c *consumer) refBrokerConsumer(broker *Broker) *brokerConsumer {
171171
newSubscriptions: make(chan []*partitionConsumer),
172172
wait: make(chan none),
173173
subscriptions: make(map[*partitionConsumer]none),
174-
refs: 1,
174+
refs: 0,
175175
}
176176
go withRecover(brokerWorker.subscriptionManager)
177177
go withRecover(brokerWorker.subscriptionConsumer)
178178
c.brokerConsumers[broker] = brokerWorker
179-
} else {
180-
brokerWorker.refs++
181179
}
182180

181+
brokerWorker.refs++
182+
183183
return brokerWorker
184184
}
185185

186-
func (c *consumer) unrefBrokerConsumer(broker *Broker) {
186+
func (c *consumer) unrefBrokerConsumer(brokerWorker *brokerConsumer) {
187187
c.lock.Lock()
188188
defer c.lock.Unlock()
189189

190-
brokerWorker := c.brokerConsumers[broker]
191190
brokerWorker.refs--
192191

193192
if brokerWorker.refs == 0 {
194193
close(brokerWorker.input)
195-
delete(c.brokerConsumers, broker)
194+
if c.brokerConsumers[brokerWorker.broker] == brokerWorker {
195+
delete(c.brokerConsumers, brokerWorker.broker)
196+
}
196197
}
197198
}
198199

200+
func (c *consumer) abandonBrokerConsumer(brokerWorker *brokerConsumer) {
201+
c.lock.Lock()
202+
defer c.lock.Unlock()
203+
204+
delete(c.brokerConsumers, brokerWorker.broker)
205+
}
206+
199207
// PartitionConsumer
200208

201209
// PartitionConsumer processes Kafka messages from a given topic and partition. You MUST call Close()
@@ -237,7 +245,7 @@ type partitionConsumer struct {
237245
topic string
238246
partition int32
239247

240-
broker *Broker
248+
broker *brokerConsumer
241249
messages chan *ConsumerMessage
242250
errors chan *ConsumerError
243251
trigger, dying chan none
@@ -291,15 +299,15 @@ func (child *partitionConsumer) dispatch() error {
291299
return err
292300
}
293301

294-
if leader, err := child.consumer.client.Leader(child.topic, child.partition); err != nil {
302+
var leader *Broker
303+
var err error
304+
if leader, err = child.consumer.client.Leader(child.topic, child.partition); err != nil {
295305
return err
296-
} else {
297-
child.broker = leader
298306
}
299307

300-
brokerWorker := child.consumer.refBrokerConsumer(child.broker)
308+
child.broker = child.consumer.refBrokerConsumer(leader)
301309

302-
brokerWorker.input <- child
310+
child.broker.input <- child
303311

304312
return nil
305313
}
@@ -463,6 +471,7 @@ func (w *brokerConsumer) updateSubscriptionCache(newSubscriptions []*partitionCo
463471
}
464472

465473
func (w *brokerConsumer) abort(err error) {
474+
w.consumer.abandonBrokerConsumer(w)
466475
_ = w.broker.Close() // we don't care about the error this might return, we already have one
467476

468477
for child := range w.subscriptions {

consumer_test.go

+89
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,95 @@ func TestConsumerInterleavedClose(t *testing.T) {
297297
seedBroker.Close()
298298
}
299299

300+
func TestConsumerBounceWithReferenceOpen(t *testing.T) {
301+
seedBroker := newMockBroker(t, 1)
302+
leader := newMockBroker(t, 2)
303+
leaderAddr := leader.Addr()
304+
305+
metadataResponse := new(MetadataResponse)
306+
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
307+
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
308+
metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
309+
seedBroker.Returns(metadataResponse)
310+
311+
config := NewConfig()
312+
config.Consumer.Return.Errors = true
313+
config.Consumer.Retry.Backoff = 0
314+
config.ChannelBufferSize = 0
315+
master, err := NewConsumer([]string{seedBroker.Addr()}, config)
316+
if err != nil {
317+
t.Fatal(err)
318+
}
319+
320+
c0, err := master.ConsumePartition("my_topic", 0, 0)
321+
if err != nil {
322+
t.Fatal(err)
323+
}
324+
325+
c1, err := master.ConsumePartition("my_topic", 1, 0)
326+
if err != nil {
327+
t.Fatal(err)
328+
}
329+
330+
fetchResponse := new(FetchResponse)
331+
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
332+
fetchResponse.AddError("my_topic", 1, ErrNoError)
333+
leader.Returns(fetchResponse)
334+
<-c0.Messages()
335+
336+
fetchResponse = new(FetchResponse)
337+
fetchResponse.AddError("my_topic", 0, ErrNoError)
338+
fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
339+
leader.Returns(fetchResponse)
340+
<-c1.Messages()
341+
342+
leader.Close()
343+
leader = newMockBrokerAddr(t, 2, leaderAddr)
344+
345+
// unblock one of the two (it doesn't matter which)
346+
select {
347+
case <-c0.Errors():
348+
case <-c1.Errors():
349+
}
350+
// send it back to the same broker
351+
seedBroker.Returns(metadataResponse)
352+
353+
fetchResponse = new(FetchResponse)
354+
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1))
355+
fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1))
356+
leader.Returns(fetchResponse)
357+
358+
time.Sleep(5 * time.Millisecond)
359+
360+
// unblock the other one
361+
select {
362+
case <-c0.Errors():
363+
case <-c1.Errors():
364+
}
365+
// send it back to the same broker
366+
seedBroker.Returns(metadataResponse)
367+
368+
select {
369+
case <-c0.Messages():
370+
case <-c1.Messages():
371+
}
372+
373+
leader.Close()
374+
seedBroker.Close()
375+
wg := sync.WaitGroup{}
376+
wg.Add(2)
377+
go func() {
378+
_ = c0.Close()
379+
wg.Done()
380+
}()
381+
go func() {
382+
_ = c1.Close()
383+
wg.Done()
384+
}()
385+
wg.Wait()
386+
safeClose(t, master)
387+
}
388+
300389
// This example has the simplest use case of the consumer. It simply
301390
// iterates over the messages channel using a for/range loop. Because
302391
// a producer never stopsunless requested, a signal handler is registered

0 commit comments

Comments
 (0)