Skip to content

Commit 26c7e7d

Browse files
bobrikyurishkuro
authored andcommitted
Switch from counter to a gauge for partitions held (#1485)
* Switch from counter to a gauge for partitions held Counters cannot be decremented in Prometheus: ``` panic: counter cannot decrease in value goroutine 895 [running]: github.com/jaegertracing/jaeger/vendor/github.com/prometheus/client_golang/prometheus.(*counter).Add(0xc000790600, 0xbff0000000000000) /home/travis/gopath/src/github.com/jaegertracing/jaeger/vendor/github.com/prometheus/client_golang/prometheus/counter.go:71 +0xa3 github.com/jaegertracing/jaeger/vendor/github.com/uber/jaeger-lib/metrics/prometheus.(*counter).Inc(0xc0006b42a0, 0xffffffffffffffff) /home/travis/gopath/src/github.com/jaegertracing/jaeger/vendor/github.com/uber/jaeger-lib/metrics/prometheus/factory.go:183 +0x46 github.com/jaegertracing/jaeger/cmd/ingester/app/consumer.(*Consumer).handleMessages(0xc0004c4300, 0xf08c60, 0xc00054e630) /home/travis/gopath/src/github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/consumer.go:124 +0x893 created by github.com/jaegertracing/jaeger/cmd/ingester/app/consumer.(*Consumer).Start.func1 /home/travis/gopath/src/github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/consumer.go:87 +0xbd ``` Gauges can, even though we have to keep an extra variable around to keep count. In Prometheus Go library itself that is not necessary as Gauge type provides `Inc` and `Dec`, but Jaeger's wrapper does not have those exposed. Fixes #1200. Signed-off-by: Ivan Babrou <ibobrik@gmail.com> * Protect partitionsHeld in consumer by lock Signed-off-by: Ivan Babrou <ibobrik@gmail.com>
1 parent 5b8c1f4 commit 26c7e7d

File tree

3 files changed

+30
-23
lines changed

3 files changed

+30
-23
lines changed

cmd/ingester/app/consumer/consumer.go

+21-14
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,10 @@ type Consumer struct {
4646

4747
deadlockDetector deadlockDetector
4848

49-
partitionIDToState map[int32]*consumerState
50-
partitionMapLock sync.Mutex
51-
partitionsHeld metrics.Counter
49+
partitionIDToState map[int32]*consumerState
50+
partitionMapLock sync.Mutex
51+
partitionsHeld int64
52+
partitionsHeldGauge metrics.Gauge
5253
}
5354

5455
type consumerState struct {
@@ -60,13 +61,13 @@ type consumerState struct {
6061
func New(params Params) (*Consumer, error) {
6162
deadlockDetector := newDeadlockDetector(params.MetricsFactory, params.Logger, params.DeadlockCheckInterval)
6263
return &Consumer{
63-
metricsFactory: params.MetricsFactory,
64-
logger: params.Logger,
65-
internalConsumer: params.InternalConsumer,
66-
processorFactory: params.ProcessorFactory,
67-
deadlockDetector: deadlockDetector,
68-
partitionIDToState: make(map[int32]*consumerState),
69-
partitionsHeld: partitionsHeld(params.MetricsFactory),
64+
metricsFactory: params.MetricsFactory,
65+
logger: params.Logger,
66+
internalConsumer: params.InternalConsumer,
67+
processorFactory: params.ProcessorFactory,
68+
deadlockDetector: deadlockDetector,
69+
partitionIDToState: make(map[int32]*consumerState),
70+
partitionsHeldGauge: partitionsHeldGauge(params.MetricsFactory),
7071
}, nil
7172
}
7273

@@ -109,13 +110,19 @@ func (c *Consumer) Close() error {
109110

110111
func (c *Consumer) handleMessages(pc sc.PartitionConsumer) {
111112
c.logger.Info("Starting message handler", zap.Int32("partition", pc.Partition()))
112-
c.partitionsHeld.Inc(1)
113-
defer c.partitionsHeld.Inc(-1)
114113
c.partitionMapLock.Lock()
114+
c.partitionsHeld++
115+
c.partitionsHeldGauge.Update(c.partitionsHeld)
115116
wg := &c.partitionIDToState[pc.Partition()].wg
116117
c.partitionMapLock.Unlock()
117-
defer wg.Done()
118-
defer c.closePartition(pc)
118+
defer func() {
119+
c.closePartition(pc)
120+
wg.Done()
121+
c.partitionMapLock.Lock()
122+
c.partitionsHeld--
123+
c.partitionsHeldGauge.Update(c.partitionsHeld)
124+
c.partitionMapLock.Unlock()
125+
}()
119126

120127
msgMetrics := c.newMsgMetrics(pc.Partition())
121128

cmd/ingester/app/consumer/consumer_metrics.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,6 @@ func (c *Consumer) partitionMetrics(partition int32) partitionMetrics {
6161
startCounter: f.Counter(metrics.Options{Name: "partition-start", Tags: nil})}
6262
}
6363

64-
func partitionsHeld(metricsFactory metrics.Factory) metrics.Counter {
65-
return metricsFactory.Namespace(metrics.NSOptions{Name: consumerNamespace, Tags: nil}).Counter(metrics.Options{Name: "partitions-held", Tags: nil})
64+
func partitionsHeldGauge(metricsFactory metrics.Factory) metrics.Gauge {
65+
return metricsFactory.Namespace(metrics.NSOptions{Name: consumerNamespace, Tags: nil}).Gauge(metrics.Options{Name: "partitions-held", Tags: nil})
6666
}

cmd/ingester/app/consumer/consumer_test.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -90,12 +90,12 @@ func newConsumer(
9090

9191
logger, _ := zap.NewDevelopment()
9292
return &Consumer{
93-
metricsFactory: metricsFactory,
94-
logger: logger,
95-
internalConsumer: consumer,
96-
partitionIDToState: make(map[int32]*consumerState),
97-
partitionsHeld: partitionsHeld(metricsFactory),
98-
deadlockDetector: newDeadlockDetector(metricsFactory, logger, time.Second),
93+
metricsFactory: metricsFactory,
94+
logger: logger,
95+
internalConsumer: consumer,
96+
partitionIDToState: make(map[int32]*consumerState),
97+
partitionsHeldGauge: partitionsHeldGauge(metricsFactory),
98+
deadlockDetector: newDeadlockDetector(metricsFactory, logger, time.Second),
9999

100100
processorFactory: ProcessorFactory{
101101
topic: topic,
@@ -153,7 +153,7 @@ func TestSaramaConsumerWrapper_start_Messages(t *testing.T) {
153153
mc.YieldMessage(msg)
154154
isProcessed.Wait()
155155

156-
localFactory.AssertCounterMetrics(t, metricstest.ExpectedMetric{
156+
localFactory.AssertGaugeMetrics(t, metricstest.ExpectedMetric{
157157
Name: "sarama-consumer.partitions-held",
158158
Value: 1,
159159
})

0 commit comments

Comments
 (0)