Skip to content

Commit d3c9ccd

Browse files
committed
Protect partitionsHeld in consumer by lock
Signed-off-by: Ivan Babrou <ibobrik@gmail.com>
1 parent fa84803 commit d3c9ccd

File tree

1 file changed

+3
-1
lines changed

1 file changed

+3
-1
lines changed

cmd/ingester/app/consumer/consumer.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -110,13 +110,15 @@ func (c *Consumer) Close() error {
110110

111111
func (c *Consumer) handleMessages(pc sc.PartitionConsumer) {
112112
c.logger.Info("Starting message handler", zap.Int32("partition", pc.Partition()))
113+
c.partitionMapLock.Lock()
113114
c.partitionsHeld++
114115
c.partitionsHeldGauge.Update(c.partitionsHeld)
115116
defer func() {
117+
c.partitionMapLock.Lock()
116118
c.partitionsHeld--
117119
c.partitionsHeldGauge.Update(c.partitionsHeld)
120+
c.partitionMapLock.Unlock()
118121
}()
119-
c.partitionMapLock.Lock()
120122
wg := &c.partitionIDToState[pc.Partition()].wg
121123
c.partitionMapLock.Unlock()
122124
defer wg.Done()

0 commit comments

Comments
 (0)