From 4ecd1e57c5f089cd33ea2db225b6661286cf3e24 Mon Sep 17 00:00:00 2001 From: Varun <vakumar@mediamath.com> Date: Thu, 28 Feb 2019 15:52:34 -0600 Subject: [PATCH 1/3] added a metric to expose consumer batch size --- consumer.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/consumer.go b/consumer.go index 258d1a570..a2fd3c63a 100644 --- a/consumer.go +++ b/consumer.go @@ -6,6 +6,8 @@ import ( "sync" "sync/atomic" "time" + + "github.com/rcrowley/go-metrics" ) // ConsumerMessage encapsulates a Kafka message returned by the consumer. @@ -518,6 +520,7 @@ func (child *partitionConsumer) parseMessages(msgSet *MessageSet) ([]*ConsumerMe func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMessage, error) { var messages []*ConsumerMessage + for _, rec := range batch.Records { offset := batch.FirstOffset + rec.OffsetDelta if offset < child.offset { @@ -545,6 +548,15 @@ func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMes } func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*ConsumerMessage, error) { + var ( + metricRegistry = child.conf.MetricRegistry + consumerBatchSizeMetric metrics.Histogram + ) + + if metricRegistry != nil { + consumerBatchSizeMetric = getOrRegisterHistogram("consumer-batch-size", metricRegistry) + } + block := response.GetBlock(child.topic, child.partition) if block == nil { return nil, ErrIncompleteResponse @@ -558,6 +570,9 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu if err != nil { return nil, err } + + consumerBatchSizeMetric.Update(int64(nRecs)) + if nRecs == 0 { partialTrailingMessage, err := block.isPartial() if err != nil { From eb519d64f4df925cb3ceae0341f06ea23afba5b7 Mon Sep 17 00:00:00 2001 From: Varun <vakumar@mediamath.com> Date: Wed, 13 Mar 2019 11:07:10 -0500 Subject: [PATCH 2/3] updated the documentation --- sarama.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sarama.go b/sarama.go index f9c756703..5f5bcdbd4 100644 --- a/sarama.go +++ b/sarama.go @@ -58,6 +58,14 @@ Producer related metrics: | compression-ratio-for-topic-<topic> | histogram | Distribution of the compression ratio times 100 of record batches for a given topic | +-------------------------------------------+------------+--------------------------------------------------------------------------------------+ +Consumer related metrics: + + +-------------------------------------------+------------+--------------------------------------------------------------------------------------+ + | Name | Type | Description | + +-------------------------------------------+------------+--------------------------------------------------------------------------------------+ + | consumer-batch-size | histogram | Distribution of the number of messages in a batch | + +-------------------------------------------+------------+--------------------------------------------------------------------------------------+ + */ package sarama From d1c48e31e38323b5dff91116fcd878846b701947 Mon Sep 17 00:00:00 2001 From: Vlad Gorodetsky <17348+bai@users.noreply.github.com> Date: Wed, 13 Mar 2019 18:11:25 +0200 Subject: [PATCH 3/3] Small docs indentation fix --- sarama.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sarama.go b/sarama.go index 5f5bcdbd4..98d596928 100644 --- a/sarama.go +++ b/sarama.go @@ -63,7 +63,7 @@ Consumer related metrics: +-------------------------------------------+------------+--------------------------------------------------------------------------------------+ | Name | Type | Description | +-------------------------------------------+------------+--------------------------------------------------------------------------------------+ - | consumer-batch-size | histogram | Distribution of the number of messages in a batch | + | consumer-batch-size | histogram | Distribution of the number of messages in a batch | +-------------------------------------------+------------+--------------------------------------------------------------------------------------+ */