Skip to content

Commit 2bc48d2

Browse files
authored
Upgrade otel to 0.34 (#3850)
* Upgrade otel to 0.34
1 parent de0b049 commit 2bc48d2

9 files changed

+233
-257
lines changed

common/metrics/metricstest/metricstest.go

+16-27
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,17 @@ package metricstest
2626

2727
import (
2828
"fmt"
29+
"net/http"
2930
"net/http/httptest"
3031
"strings"
3132

33+
"github.com/prometheus/client_golang/prometheus"
34+
"github.com/prometheus/client_golang/prometheus/promhttp"
3235
dto "github.com/prometheus/client_model/go"
3336
"github.com/prometheus/common/expfmt"
34-
"go.opentelemetry.io/otel/exporters/prometheus"
37+
exporters "go.opentelemetry.io/otel/exporters/prometheus"
3538
"go.opentelemetry.io/otel/metric"
36-
controller "go.opentelemetry.io/otel/sdk/metric/controller/basic"
37-
"go.opentelemetry.io/otel/sdk/metric/export/aggregation"
38-
processor "go.opentelemetry.io/otel/sdk/metric/processor/basic"
39-
"go.opentelemetry.io/otel/sdk/resource"
39+
sdkmetrics "go.opentelemetry.io/otel/sdk/metric"
4040
"golang.org/x/exp/maps"
4141

4242
"go.temporal.io/server/common/log"
@@ -46,7 +46,7 @@ import (
4646
type (
4747
Handler struct {
4848
metrics.Handler
49-
exporter *prometheus.Exporter
49+
reg *prometheus.Registry
5050
}
5151

5252
sample struct {
@@ -69,32 +69,19 @@ func MustNewHandler(logger log.Logger) *Handler {
6969
}
7070

7171
func NewHandler(logger log.Logger) (*Handler, error) {
72-
ctrl := controller.New(
73-
processor.NewFactory(
74-
metrics.NewOtelAggregatorSelector(nil),
75-
aggregation.CumulativeTemporalitySelector(),
76-
processor.WithMemory(true),
77-
),
78-
controller.WithResource(resource.Empty()),
79-
// Set collect period to 0 otherwise Snapshot() will potentially
80-
// return an old view of metrics.
81-
controller.WithCollectPeriod(0),
82-
)
83-
84-
exporter, err := prometheus.New(prometheus.Config{}, ctrl)
72+
registry := prometheus.NewRegistry()
73+
exporter, err := exporters.New(exporters.WithRegisterer(registry))
8574
if err != nil {
8675
return nil, err
8776
}
8877

89-
provider := &otelProvider{
90-
meter: ctrl.Meter("temporal"),
91-
}
78+
provider := sdkmetrics.NewMeterProvider(sdkmetrics.WithReader(exporter))
79+
meter := provider.Meter("temporal")
9280
clientConfig := metrics.ClientConfig{}
93-
otelHandler := metrics.NewOtelMetricsHandler(logger, provider, clientConfig)
94-
81+
otelHandler := metrics.NewOtelMetricsHandler(logger, &otelProvider{meter: meter}, clientConfig)
9582
metricsHandler := &Handler{
96-
Handler: otelHandler,
97-
exporter: exporter,
83+
Handler: otelHandler,
84+
reg: registry,
9885
}
9986

10087
return metricsHandler, nil
@@ -105,7 +92,9 @@ func (*Handler) Stop(log.Logger) {}
10592
func (h *Handler) Snapshot() (Snapshot, error) {
10693
rec := httptest.NewRecorder()
10794
req := httptest.NewRequest("GET", "/metrics", nil)
108-
h.exporter.ServeHTTP(rec, req)
95+
handler := http.NewServeMux()
96+
handler.HandleFunc("/metrics", promhttp.HandlerFor(h.reg, promhttp.HandlerOpts{Registry: h.reg}).ServeHTTP)
97+
handler.ServeHTTP(rec, req)
10998

11099
var tp expfmt.TextParser
111100
families, err := tp.TextToMetricFamilies(rec.Body)

common/metrics/metricstest/metricstest_test.go

+9-3
Original file line numberDiff line numberDiff line change
@@ -42,23 +42,29 @@ func TestBasic(t *testing.T) {
4242
metrics.StringTag("l2", "v2"),
4343
metrics.StringTag("l1", "v1"),
4444
}
45+
expectedSystemTags := []metrics.Tag{
46+
metrics.StringTag("otel_scope_name", "temporal"),
47+
metrics.StringTag("otel_scope_version", ""),
48+
}
49+
expectedCounterTags := append(expectedSystemTags, counterTags...)
4550
counter := handler.WithTags(counterTags...).Counter(counterName)
4651
counter.Record(1)
4752
counter.Record(1)
4853

4954
s1 := handler.MustSnapshot()
50-
require.Equal(t, float64(2), s1.MustCounter(counterName, counterTags...))
55+
require.Equal(t, float64(2), s1.MustCounter(counterName+"_total", expectedCounterTags...))
5156

5257
gaugeName := "gauge1"
5358
gaugeTags := []metrics.Tag{
5459
metrics.StringTag("l3", "v3"),
5560
metrics.StringTag("l4", "v4"),
5661
}
62+
expectedGaugeTags := append(expectedSystemTags, gaugeTags...)
5763
gauge := handler.WithTags(gaugeTags...).Gauge(gaugeName)
5864
gauge.Record(-2)
5965
gauge.Record(10)
6066

6167
s2 := handler.MustSnapshot()
62-
require.Equal(t, float64(2), s2.MustCounter(counterName, counterTags...))
63-
require.Equal(t, float64(10), s2.MustGauge(gaugeName, gaugeTags...))
68+
require.Equal(t, float64(2), s2.MustCounter(counterName+"_total", expectedCounterTags...))
69+
require.Equal(t, float64(10), s2.MustGauge(gaugeName, expectedGaugeTags...))
6470
}

common/metrics/opentelemetry_aggregator_selector.go

-88
This file was deleted.

common/metrics/opentelemetry_provider.go

+31-24
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,13 @@ import (
2929
"net/http"
3030
"time"
3131

32-
"go.opentelemetry.io/otel/exporters/prometheus"
32+
"github.com/prometheus/client_golang/prometheus"
33+
"github.com/prometheus/client_golang/prometheus/promhttp"
34+
exporters "go.opentelemetry.io/otel/exporters/prometheus"
3335
"go.opentelemetry.io/otel/metric"
34-
controller "go.opentelemetry.io/otel/sdk/metric/controller/basic"
35-
"go.opentelemetry.io/otel/sdk/metric/export/aggregation"
36-
processor "go.opentelemetry.io/otel/sdk/metric/processor/basic"
37-
"go.opentelemetry.io/otel/sdk/resource"
36+
"go.opentelemetry.io/otel/metric/unit"
37+
sdkmetrics "go.opentelemetry.io/otel/sdk/metric"
38+
"go.opentelemetry.io/otel/sdk/metric/aggregation"
3839

3940
"go.temporal.io/server/common/log"
4041
"go.temporal.io/server/common/log/tag"
@@ -49,7 +50,7 @@ type (
4950
}
5051

5152
openTelemetryProviderImpl struct {
52-
exporter *prometheus.Exporter
53+
exporter *exporters.Exporter
5354
meter metric.Meter
5455
config *PrometheusConfig
5556
server *http.Server
@@ -61,27 +62,33 @@ func NewOpenTelemetryProvider(
6162
prometheusConfig *PrometheusConfig,
6263
clientConfig *ClientConfig,
6364
) (*openTelemetryProviderImpl, error) {
64-
65-
c := controller.New(
66-
processor.NewFactory(
67-
NewOtelAggregatorSelector(
68-
clientConfig.PerUnitHistogramBoundaries,
69-
),
70-
aggregation.CumulativeTemporalitySelector(),
71-
processor.WithMemory(true),
72-
),
73-
controller.WithResource(resource.Empty()),
74-
)
75-
exporter, err := prometheus.New(prometheus.Config{}, c)
76-
65+
reg := prometheus.NewRegistry()
66+
exporter, err := exporters.New(exporters.WithRegisterer(reg))
7767
if err != nil {
7868
logger.Error("Failed to initialize prometheus exporter.", tag.Error(err))
7969
return nil, err
8070
}
8171

82-
metricServer := initPrometheusListener(prometheusConfig, logger, exporter)
83-
84-
meter := c.Meter("temporal")
72+
var views []sdkmetrics.View
73+
for _, u := range []string{Dimensionless, Bytes, Milliseconds} {
74+
views = append(views, sdkmetrics.NewView(
75+
sdkmetrics.Instrument{
76+
Kind: sdkmetrics.InstrumentKindSyncHistogram,
77+
Unit: unit.Unit(u),
78+
},
79+
sdkmetrics.Stream{
80+
Aggregation: aggregation.ExplicitBucketHistogram{
81+
Boundaries: clientConfig.PerUnitHistogramBoundaries[u],
82+
},
83+
},
84+
))
85+
}
86+
provider := sdkmetrics.NewMeterProvider(
87+
sdkmetrics.WithReader(exporter),
88+
sdkmetrics.WithView(views...),
89+
)
90+
metricServer := initPrometheusListener(prometheusConfig, reg, logger)
91+
meter := provider.Meter("temporal")
8592
reporter := &openTelemetryProviderImpl{
8693
exporter: exporter,
8794
meter: meter,
@@ -92,14 +99,14 @@ func NewOpenTelemetryProvider(
9299
return reporter, nil
93100
}
94101

95-
func initPrometheusListener(config *PrometheusConfig, logger log.Logger, exporter *prometheus.Exporter) *http.Server {
102+
func initPrometheusListener(config *PrometheusConfig, reg *prometheus.Registry, logger log.Logger) *http.Server {
96103
handlerPath := config.HandlerPath
97104
if handlerPath == "" {
98105
handlerPath = "/metrics"
99106
}
100107

101108
handler := http.NewServeMux()
102-
handler.HandleFunc(handlerPath, exporter.ServeHTTP)
109+
handler.HandleFunc(handlerPath, promhttp.HandlerFor(reg, promhttp.HandlerOpts{Registry: reg}).ServeHTTP)
103110

104111
if config.ListenAddress == "" {
105112
logger.Fatal("Listen address must be specified.", tag.Address(config.ListenAddress))

common/metrics/otel_metrics_handler.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,12 @@ func (omp *otelMetricsHandler) Gauge(gauge string) GaugeIface {
8585
}
8686

8787
return GaugeFunc(func(i float64, t ...Tag) {
88-
c.Observe(context.Background(), i, tagsToAttributes(omp.tags, t, omp.excludeTags)...)
88+
err = omp.provider.GetMeter().RegisterCallback([]instrument.Asynchronous{c}, func(ctx context.Context) {
89+
c.Observe(ctx, i, tagsToAttributes(omp.tags, t, omp.excludeTags)...)
90+
})
91+
if err != nil {
92+
omp.l.Fatal("error setting callback metric update", tag.NewStringTag("MetricName", gauge), tag.Error(err))
93+
}
8994
})
9095
}
9196

0 commit comments

Comments
 (0)