Skip to content

Commit 2582040

Browse files
committed
Protect partitionsHeld in consumer by lock
Signed-off-by: Ivan Babrou <ibobrik@gmail.com>
1 parent fa84803 commit 2582040

File tree

1 file changed

+7
-5
lines changed

1 file changed

+7
-5
lines changed

cmd/ingester/app/consumer/consumer.go

+7-5
Original file line numberDiff line numberDiff line change
@@ -110,17 +110,19 @@ func (c *Consumer) Close() error {
110110

111111
func (c *Consumer) handleMessages(pc sc.PartitionConsumer) {
112112
c.logger.Info("Starting message handler", zap.Int32("partition", pc.Partition()))
113+
c.partitionMapLock.Lock()
113114
c.partitionsHeld++
114115
c.partitionsHeldGauge.Update(c.partitionsHeld)
116+
wg := &c.partitionIDToState[pc.Partition()].wg
117+
c.partitionMapLock.Unlock()
115118
defer func() {
119+
c.closePartition(pc)
120+
wg.Done()
121+
c.partitionMapLock.Lock()
116122
c.partitionsHeld--
117123
c.partitionsHeldGauge.Update(c.partitionsHeld)
124+
c.partitionMapLock.Unlock()
118125
}()
119-
c.partitionMapLock.Lock()
120-
wg := &c.partitionIDToState[pc.Partition()].wg
121-
c.partitionMapLock.Unlock()
122-
defer wg.Done()
123-
defer c.closePartition(pc)
124126

125127
msgMetrics := c.newMsgMetrics(pc.Partition())
126128

0 commit comments

Comments
 (0)