Skip to content

Commit 1c582f5

Browse files
author
Naireen
committed
reduce bloat in metrics api
1 parent 8620b36 commit 1c582f5

File tree

3 files changed

+50
-21
lines changed

3 files changed

+50
-21
lines changed

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java

+24-19
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.beam.sdk.metrics.Gauge;
2727
import org.apache.beam.sdk.metrics.Histogram;
2828
import org.apache.beam.sdk.metrics.MetricName;
29-
import org.apache.beam.sdk.metrics.Metrics;
3029
import org.apache.beam.sdk.util.Preconditions;
3130
import org.slf4j.Logger;
3231
import org.slf4j.LoggerFactory;
@@ -44,7 +43,7 @@ public interface KafkaMetrics {
4443
void updateKafkaMetrics();
4544

4645
/*Used to update backlog metrics, which is later used to update metrics container in another thread*/
47-
void recordBacklogBytes(String topic, int partitionId, long backlog);
46+
// void recordBacklogBytes(String topic, int partitionId, long backlog);
4847

4948
/** No-op implementation of {@code KafkaResults}. */
5049
class NoOpKafkaMetrics implements KafkaMetrics {
@@ -54,13 +53,13 @@ private NoOpKafkaMetrics() {}
5453
public void updateSuccessfulRpcMetrics(String topic, Duration elapsedTime) {}
5554

5655
@Override
57-
public void updateBacklogBytes(String topic, int partitionId, long elapsedTime) {}
56+
public void updateBacklogBytes(String topic, int partitionId, long backlog) {}
5857

5958
@Override
6059
public void updateKafkaMetrics() {}
6160

62-
@Override
63-
public void recordBacklogBytes(String topic, int partitionId, long backlog) {};
61+
// @Override
62+
// public void recordBacklogBytes(String topic, int partitionId, long backlog) {};
6463

6564
private static NoOpKafkaMetrics singleton = new NoOpKafkaMetrics();
6665

@@ -89,14 +88,14 @@ abstract class KafkaMetricsImpl implements KafkaMetrics {
8988

9089
abstract ConcurrentHashMap<String, ConcurrentLinkedQueue<Duration>> perTopicRpcLatencies();;
9190

92-
abstract ConcurrentHashMap<String, Long> perTopicPartitionBacklogs();
91+
abstract ConcurrentHashMap<MetricName, Long> perTopicPartitionBacklogs();
9392

9493
abstract AtomicBoolean isWritable();
9594

9695
public static KafkaMetricsImpl create() {
9796
return new AutoValue_KafkaMetrics_KafkaMetricsImpl(
9897
new ConcurrentHashMap<String, ConcurrentLinkedQueue<Duration>>(),
99-
new ConcurrentHashMap<String, Long>(),
98+
new ConcurrentHashMap<MetricName, Long>(),
10099
new AtomicBoolean(true));
101100
}
102101

@@ -133,8 +132,9 @@ public void updateSuccessfulRpcMetrics(String topic, Duration elapsedTime) {
133132
@Override
134133
public void updateBacklogBytes(String topicName, int partitionId, long backlog) {
135134
if (isWritable().get()) {
136-
String name = KafkaSinkMetrics.getMetricGaugeName(topicName, partitionId).getName();
137-
perTopicPartitionBacklogs().put(name, backlog);
135+
// Key by MetricName so info isn't lost
136+
MetricName metricName = KafkaSinkMetrics.getMetricGaugeName(topicName, partitionId);
137+
perTopicPartitionBacklogs().put(metricName, backlog);
138138
}
139139
}
140140

@@ -161,10 +161,15 @@ private void recordRpcLatencyMetrics() {
161161

162162
/** This is for creating gauges from backlog bytes recorded previously. */
163163
private void recordBacklogBytesInternal() {
164-
for (Map.Entry<String, Long> backlogs : perTopicPartitionBacklogs().entrySet()) {
165-
Gauge gauge =
166-
KafkaSinkMetrics.createBacklogGauge(MetricName.named("KafkaSink", backlogs.getKey()));
167-
gauge.set(backlogs.getValue());
164+
for (Map.Entry<MetricName, Long> backlog : perTopicPartitionBacklogs().entrySet()) {
165+
// MetricName perPartionBacklogName = KafkaSinkMetrics.getMetricGaugeName(topicName,
166+
// partitionId);
167+
// Gauge perPartition =
168+
// Metrics.gauge(KafkaSinkMetrics.getMetricGaugeName(topicName, partitionId));
169+
// Use lambda for more readability?
170+
// map.forEach((key, value) -> System.out.println(key + ": " + value));
171+
Gauge gauge = KafkaSinkMetrics.createBacklogGauge(backlog.getKey());
172+
gauge.set(backlog.getValue());
168173
}
169174
}
170175

@@ -177,12 +182,12 @@ private void recordBacklogBytesInternal() {
177182
* @param backlogBytes backlog for the topic Only included in the metric key if
178183
* 'supportsMetricsDeletion' is enabled.
179184
*/
180-
@Override
181-
public void recordBacklogBytes(String topicName, int partitionId, long backlogBytes) {
182-
Gauge perPartition =
183-
Metrics.gauge(KafkaSinkMetrics.getMetricGaugeName(topicName, partitionId));
184-
perPartition.set(backlogBytes);
185-
}
185+
// @Override
186+
// public void recordBacklogBytes(String topicName, int partitionId, long backlogBytes) {
187+
// Gauge perPartition =
188+
// Metrics.gauge(KafkaSinkMetrics.getMetricGaugeName(topicName, partitionId));
189+
// perPartition.set(backlogBytes);
190+
// }
186191

187192
/**
188193
* Export all metrics recorded in this instance to the underlying {@code perWorkerMetrics}

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetrics.java

+19-1
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,25 @@ public static Gauge createBacklogGauge(String topic, int partitionId) {
9898
* @return Counter.
9999
*/
100100
public static Gauge createBacklogGauge(MetricName name) {
101-
return new DelegatingGauge(name, false, true);
101+
// use label to differenciate between the type of gauge metric is created
102+
// TODO(bug to clean this to make more consistent between the two runners)
103+
// test if set to false, this doesn't occur
104+
// && name.getLabels().get(MonitoringInfoConstants.Labels.PER_WORKER_METRIC)
105+
if (name.getLabels().containsKey(MonitoringInfoConstants.Labels.PER_WORKER_METRIC)
106+
&& name.getLabels().get(MonitoringInfoConstants.Labels.PER_WORKER_METRIC).equals("true")) {
107+
// return Metrics.gauge(name); // can this be a delgating gauge of false type?
108+
// metric name always exists, so need a way to handle them differently
109+
// second bollean should be false for u2 path, how to not add label just yet?
110+
// for legacy
111+
// return new DelegatingGauge(name, false, true);
112+
113+
// for runner v2
114+
// investigate why it gest ton UW container, but not to DFE
115+
// return Metrics.gauge(name);
116+
return new DelegatingGauge(name, false, false);
117+
} else {
118+
return new DelegatingGauge(name, false, true);
119+
}
102120
}
103121

104122
/**

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -559,7 +559,9 @@ public ProcessContinuation processElement(
559559
.doubleValue()
560560
* avgRecordSize.estimateRecordByteSizeToOffsetCountRatio()));
561561
KafkaMetrics kafkaResults = KafkaSinkMetrics.kafkaMetrics();
562-
kafkaResults.recordBacklogBytes(
562+
// public void updateBacklogBytes(String topicName, int partitionId, long backlog) {
563+
564+
kafkaResults.updateBacklogBytes(
563565
kafkaSourceDescriptor.getTopic(),
564566
kafkaSourceDescriptor.getPartition(),
565567
(long)
@@ -569,6 +571,10 @@ public ProcessContinuation processElement(
569571
.subtract(BigDecimal.valueOf(expectedOffset), MathContext.DECIMAL128)
570572
.doubleValue()
571573
* avgRecordSize.estimateRecordByteSizeToOffsetCountRatio()));
574+
// flush imediately afterwards, can it be moved to finish bundle?
575+
// create the same kind of metric? This creates a per Worker metric, we dont want that to be
576+
// reusable between the two ios
577+
kafkaResults.updateKafkaMetrics();
572578
}
573579
}
574580
}

0 commit comments

Comments
 (0)