Skip to content

Commit 2a0d8cc

Browse files
Fix sarama consumer deadlock (#2587)
* Fix sarama consumer deadlock Signed-off-by: albertteoh <albert.teoh@logz.io> * Simplify Signed-off-by: albertteoh <albert.teoh@logz.io> Co-authored-by: Yuri Shkuro <yurishkuro@users.noreply.github.com>
1 parent 81761cd commit 2a0d8cc

File tree

2 files changed

+35
-37
lines changed

2 files changed

+35
-37
lines changed

cmd/ingester/app/consumer/consumer.go

+18-23
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,11 @@ type Consumer struct {
5050
partitionMapLock sync.Mutex
5151
partitionsHeld int64
5252
partitionsHeldGauge metrics.Gauge
53+
54+
doneWg sync.WaitGroup
5355
}
5456

5557
type consumerState struct {
56-
wg sync.WaitGroup
5758
partitionConsumer sc.PartitionConsumer
5859
}
5960

@@ -78,17 +79,11 @@ func (c *Consumer) Start() {
7879
c.logger.Info("Starting main loop")
7980
for pc := range c.internalConsumer.Partitions() {
8081
c.partitionMapLock.Lock()
81-
if p, ok := c.partitionIDToState[pc.Partition()]; ok {
82-
// This is a guard against simultaneously draining messages
83-
// from the last time the partition was assigned and
84-
// processing new messages for the same partition, which may lead
85-
// to the cleanup process not completing
86-
p.wg.Wait()
87-
}
8882
c.partitionIDToState[pc.Partition()] = &consumerState{partitionConsumer: pc}
89-
c.partitionIDToState[pc.Partition()].wg.Add(2)
9083
c.partitionMapLock.Unlock()
9184
c.partitionMetrics(pc.Partition()).startCounter.Inc(1)
85+
86+
c.doneWg.Add(2)
9287
go c.handleMessages(pc)
9388
go c.handleErrors(pc.Partition(), pc.Errors())
9489
}
@@ -97,31 +92,33 @@ func (c *Consumer) Start() {
9792

9893
// Close closes the Consumer and underlying sarama consumer
9994
func (c *Consumer) Close() error {
100-
c.partitionMapLock.Lock()
101-
for _, p := range c.partitionIDToState {
102-
c.closePartition(p.partitionConsumer)
103-
p.wg.Wait()
104-
}
105-
c.partitionMapLock.Unlock()
106-
c.deadlockDetector.close()
95+
// Close the internal consumer, which will close each partition consumers' message and error channels.
10796
c.logger.Info("Closing parent consumer")
108-
return c.internalConsumer.Close()
97+
err := c.internalConsumer.Close()
98+
99+
c.logger.Debug("Closing deadlock detector")
100+
c.deadlockDetector.close()
101+
102+
c.logger.Debug("Waiting for messages and errors to be handled")
103+
c.doneWg.Wait()
104+
105+
return err
109106
}
110107

108+
// handleMessages handles incoming Kafka messages on a channel
111109
func (c *Consumer) handleMessages(pc sc.PartitionConsumer) {
112110
c.logger.Info("Starting message handler", zap.Int32("partition", pc.Partition()))
113111
c.partitionMapLock.Lock()
114112
c.partitionsHeld++
115113
c.partitionsHeldGauge.Update(c.partitionsHeld)
116-
wg := &c.partitionIDToState[pc.Partition()].wg
117114
c.partitionMapLock.Unlock()
118115
defer func() {
119116
c.closePartition(pc)
120-
wg.Done()
121117
c.partitionMapLock.Lock()
122118
c.partitionsHeld--
123119
c.partitionsHeldGauge.Update(c.partitionsHeld)
124120
c.partitionMapLock.Unlock()
121+
c.doneWg.Done()
125122
}()
126123

127124
msgMetrics := c.newMsgMetrics(pc.Partition())
@@ -165,12 +162,10 @@ func (c *Consumer) closePartition(partitionConsumer sc.PartitionConsumer) {
165162
c.logger.Info("Closed partition consumer", zap.Int32("partition", partitionConsumer.Partition()))
166163
}
167164

165+
// handleErrors handles incoming Kafka consumer errors on a channel
168166
func (c *Consumer) handleErrors(partition int32, errChan <-chan *sarama.ConsumerError) {
169167
c.logger.Info("Starting error handler", zap.Int32("partition", partition))
170-
c.partitionMapLock.Lock()
171-
wg := &c.partitionIDToState[partition].wg
172-
c.partitionMapLock.Unlock()
173-
defer wg.Done()
168+
defer c.doneWg.Done()
174169

175170
errMetrics := c.newErrMetrics(partition)
176171
for err := range errChan {

cmd/ingester/app/consumer/consumer_test.go

+17-14
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func (s partitionConsumerWrapper) Topic() string {
6868
return s.topic
6969
}
7070

71-
func newSaramaClusterConsumer(saramaPartitionConsumer sarama.PartitionConsumer) *kmocks.Consumer {
71+
func newSaramaClusterConsumer(saramaPartitionConsumer sarama.PartitionConsumer, mc *smocks.PartitionConsumer) *kmocks.Consumer {
7272
pcha := make(chan cluster.PartitionConsumer, 1)
7373
pcha <- &partitionConsumerWrapper{
7474
topic: topic,
@@ -77,27 +77,26 @@ func newSaramaClusterConsumer(saramaPartitionConsumer sarama.PartitionConsumer)
7777
}
7878
saramaClusterConsumer := &kmocks.Consumer{}
7979
saramaClusterConsumer.On("Partitions").Return((<-chan cluster.PartitionConsumer)(pcha))
80-
saramaClusterConsumer.On("Close").Return(nil)
80+
saramaClusterConsumer.On("Close").Return(nil).Run(func(args mock.Arguments) {
81+
mc.Close()
82+
})
8183
saramaClusterConsumer.On("MarkPartitionOffset", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
8284
return saramaClusterConsumer
8385
}
8486

8587
func newConsumer(
88+
t *testing.T,
8689
metricsFactory metrics.Factory,
8790
topic string,
8891
processor processor.SpanProcessor,
8992
consumer consumer.Consumer) *Consumer {
9093

9194
logger, _ := zap.NewDevelopment()
92-
return &Consumer{
93-
metricsFactory: metricsFactory,
94-
logger: logger,
95-
internalConsumer: consumer,
96-
partitionIDToState: make(map[int32]*consumerState),
97-
partitionsHeldGauge: partitionsHeldGauge(metricsFactory),
98-
deadlockDetector: newDeadlockDetector(metricsFactory, logger, time.Second),
99-
100-
processorFactory: ProcessorFactory{
95+
consumerParams := Params{
96+
MetricsFactory: metricsFactory,
97+
Logger: logger,
98+
InternalConsumer: consumer,
99+
ProcessorFactory: ProcessorFactory{
101100
topic: topic,
102101
consumer: consumer,
103102
metricsFactory: metricsFactory,
@@ -106,6 +105,10 @@ func newConsumer(
106105
parallelism: 1,
107106
},
108107
}
108+
109+
c, err := New(consumerParams)
110+
require.NoError(t, err)
111+
return c
109112
}
110113

111114
func TestSaramaConsumerWrapper_MarkPartitionOffset(t *testing.T) {
@@ -136,7 +139,7 @@ func TestSaramaConsumerWrapper_start_Messages(t *testing.T) {
136139
saramaPartitionConsumer, e := saramaConsumer.ConsumePartition(topic, partition, msgOffset)
137140
require.NoError(t, e)
138141

139-
undertest := newConsumer(localFactory, topic, mp, newSaramaClusterConsumer(saramaPartitionConsumer))
142+
undertest := newConsumer(t, localFactory, topic, mp, newSaramaClusterConsumer(saramaPartitionConsumer, mc))
140143

141144
undertest.partitionIDToState = map[int32]*consumerState{
142145
partition: {
@@ -202,7 +205,7 @@ func TestSaramaConsumerWrapper_start_Errors(t *testing.T) {
202205
saramaPartitionConsumer, e := saramaConsumer.ConsumePartition(topic, partition, msgOffset)
203206
require.NoError(t, e)
204207

205-
undertest := newConsumer(localFactory, topic, &pmocks.SpanProcessor{}, newSaramaClusterConsumer(saramaPartitionConsumer))
208+
undertest := newConsumer(t, localFactory, topic, &pmocks.SpanProcessor{}, newSaramaClusterConsumer(saramaPartitionConsumer, mc))
206209

207210
undertest.Start()
208211
mc.YieldError(errors.New("Daisy, Daisy"))
@@ -238,7 +241,7 @@ func TestHandleClosePartition(t *testing.T) {
238241
saramaPartitionConsumer, e := saramaConsumer.ConsumePartition(topic, partition, msgOffset)
239242
require.NoError(t, e)
240243

241-
undertest := newConsumer(metricsFactory, topic, mp, newSaramaClusterConsumer(saramaPartitionConsumer))
244+
undertest := newConsumer(t, metricsFactory, topic, mp, newSaramaClusterConsumer(saramaPartitionConsumer, mc))
242245
undertest.deadlockDetector = newDeadlockDetector(metricsFactory, undertest.logger, 200*time.Millisecond)
243246
undertest.Start()
244247
defer undertest.Close()

0 commit comments

Comments
 (0)