Skip to content

Commit 1d5d711

Browse files
committed
Support adding process tags in OTEL via env variable
Signed-off-by: Pavol Loffay <ploffay@redhat.com>
1 parent 0ac47f6 commit 1d5d711

File tree

12 files changed

+272
-34
lines changed

12 files changed

+272
-34
lines changed

cmd/opentelemetry-collector/app/defaults/default_config.go

+13-7
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/open-telemetry/opentelemetry-collector/config"
2222
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
2323
"github.com/open-telemetry/opentelemetry-collector/extension/healthcheckextension"
24+
"github.com/open-telemetry/opentelemetry-collector/processor/resourceprocessor"
2425
"github.com/open-telemetry/opentelemetry-collector/receiver"
2526
"github.com/open-telemetry/opentelemetry-collector/receiver/jaegerreceiver"
2627
"github.com/open-telemetry/opentelemetry-collector/receiver/zipkinreceiver"
@@ -36,7 +37,6 @@ const (
3637
httpThriftBinaryEndpoint = "localhost:14268"
3738
udpThriftCompactEndpoint = "localhost:6831"
3839
udpThriftBinaryEndpoint = "localhost:6832"
39-
httpSamplingEndpoint = "localhost:5778"
4040
)
4141

4242
// CollectorConfig creates default collector configuration.
@@ -56,17 +56,20 @@ func CollectorConfig(storageType string, zipkinHostPort string, factories config
5656
recTypes = append(recTypes, string(v.Type()))
5757
}
5858
hc := factories.Extensions["health_check"].CreateDefaultConfig()
59+
resProcessor := factories.Processors["resource"].CreateDefaultConfig()
5960
return &configmodels.Config{
6061
Receivers: receivers,
62+
Processors: configmodels.Processors{"resource": resProcessor},
6163
Exporters: exporters,
6264
Extensions: configmodels.Extensions{"health_check": hc},
6365
Service: configmodels.Service{
6466
Extensions: []string{"health_check"},
6567
Pipelines: configmodels.Pipelines{
6668
"traces": {
67-
InputType: configmodels.TracesDataType,
68-
Receivers: recTypes,
69-
Exporters: expTypes,
69+
InputType: configmodels.TracesDataType,
70+
Receivers: recTypes,
71+
Processors: []string{"resource"},
72+
Exporters: expTypes,
7073
},
7174
},
7275
},
@@ -125,17 +128,20 @@ func createExporters(storageTypes string, factories config.Factories) (configmod
125128
func AgentConfig(factories config.Factories) *configmodels.Config {
126129
jaegerExporter := factories.Exporters["jaeger"]
127130
hc := factories.Extensions["health_check"].CreateDefaultConfig().(*healthcheckextension.Config)
131+
resProcessor := factories.Processors["resource"].CreateDefaultConfig().(*resourceprocessor.Config)
128132
return &configmodels.Config{
129133
Receivers: createAgentReceivers(factories),
134+
Processors: configmodels.Processors{"resource": resProcessor},
130135
Exporters: configmodels.Exporters{"jaeger": jaegerExporter.CreateDefaultConfig()},
131136
Extensions: configmodels.Extensions{"health_check": hc},
132137
Service: configmodels.Service{
133138
Extensions: []string{"health_check"},
134139
Pipelines: map[string]*configmodels.Pipeline{
135140
"traces": {
136-
InputType: configmodels.TracesDataType,
137-
Receivers: []string{"jaeger"},
138-
Exporters: []string{"jaeger"},
141+
InputType: configmodels.TracesDataType,
142+
Receivers: []string{"jaeger"},
143+
Processors: []string{"resource"},
144+
Exporters: []string{"jaeger"},
139145
},
140146
},
141147
},

cmd/opentelemetry-collector/app/defaults/default_config_test.go

+28-19
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/open-telemetry/opentelemetry-collector/config"
2222
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
2323
"github.com/open-telemetry/opentelemetry-collector/exporter/jaegerexporter"
24+
"github.com/open-telemetry/opentelemetry-collector/processor/resourceprocessor"
2425
"github.com/open-telemetry/opentelemetry-collector/receiver/jaegerreceiver"
2526
"github.com/spf13/viper"
2627
"github.com/stretchr/testify/assert"
@@ -51,9 +52,10 @@ func TestDefaultCollectorConfig(t *testing.T) {
5152
exporterTypes: []string{elasticsearch.TypeStr},
5253
pipeline: configmodels.Pipelines{
5354
"traces": {
54-
InputType: configmodels.TracesDataType,
55-
Receivers: []string{"jaeger"},
56-
Exporters: []string{elasticsearch.TypeStr},
55+
InputType: configmodels.TracesDataType,
56+
Receivers: []string{"jaeger"},
57+
Processors: []string{"resource"},
58+
Exporters: []string{elasticsearch.TypeStr},
5759
},
5860
},
5961
},
@@ -63,9 +65,10 @@ func TestDefaultCollectorConfig(t *testing.T) {
6365
exporterTypes: []string{cassandra.TypeStr},
6466
pipeline: configmodels.Pipelines{
6567
"traces": {
66-
InputType: configmodels.TracesDataType,
67-
Receivers: []string{"jaeger"},
68-
Exporters: []string{cassandra.TypeStr},
68+
InputType: configmodels.TracesDataType,
69+
Receivers: []string{"jaeger"},
70+
Processors: []string{"resource"},
71+
Exporters: []string{cassandra.TypeStr},
6972
},
7073
},
7174
},
@@ -75,9 +78,10 @@ func TestDefaultCollectorConfig(t *testing.T) {
7578
exporterTypes: []string{kafka.TypeStr},
7679
pipeline: configmodels.Pipelines{
7780
"traces": {
78-
InputType: configmodels.TracesDataType,
79-
Receivers: []string{"jaeger"},
80-
Exporters: []string{kafka.TypeStr},
81+
InputType: configmodels.TracesDataType,
82+
Receivers: []string{"jaeger"},
83+
Processors: []string{"resource"},
84+
Exporters: []string{kafka.TypeStr},
8185
},
8286
},
8387
},
@@ -87,9 +91,10 @@ func TestDefaultCollectorConfig(t *testing.T) {
8791
exporterTypes: []string{cassandra.TypeStr, elasticsearch.TypeStr},
8892
pipeline: configmodels.Pipelines{
8993
"traces": {
90-
InputType: configmodels.TracesDataType,
91-
Receivers: []string{"jaeger"},
92-
Exporters: []string{cassandra.TypeStr, elasticsearch.TypeStr},
94+
InputType: configmodels.TracesDataType,
95+
Receivers: []string{"jaeger"},
96+
Processors: []string{"resource"},
97+
Exporters: []string{cassandra.TypeStr, elasticsearch.TypeStr},
9398
},
9499
},
95100
},
@@ -99,9 +104,10 @@ func TestDefaultCollectorConfig(t *testing.T) {
99104
exporterTypes: []string{cassandra.TypeStr},
100105
pipeline: configmodels.Pipelines{
101106
"traces": {
102-
InputType: configmodels.TracesDataType,
103-
Receivers: []string{"jaeger", "zipkin"},
104-
Exporters: []string{cassandra.TypeStr},
107+
InputType: configmodels.TracesDataType,
108+
Receivers: []string{"jaeger", "zipkin"},
109+
Processors: []string{"resource"},
110+
Exporters: []string{cassandra.TypeStr},
105111
},
106112
},
107113
},
@@ -128,6 +134,7 @@ func TestDefaultCollectorConfig(t *testing.T) {
128134
assert.Equal(t, len(test.pipeline["traces"].Receivers), len(cfg.Receivers))
129135
assert.Equal(t, "jaeger", cfg.Receivers["jaeger"].Name())
130136
assert.Equal(t, len(test.exporterTypes), len(cfg.Exporters))
137+
assert.IsType(t, &resourceprocessor.Config{}, cfg.Processors["resource"])
131138

132139
types := []string{}
133140
for _, v := range cfg.Exporters {
@@ -148,13 +155,15 @@ func TestDefaultAgentConfig(t *testing.T) {
148155
Extensions: []string{"health_check"},
149156
Pipelines: configmodels.Pipelines{
150157
"traces": &configmodels.Pipeline{
151-
InputType: configmodels.TracesDataType,
152-
Receivers: []string{"jaeger"},
153-
Exporters: []string{"jaeger"},
158+
InputType: configmodels.TracesDataType,
159+
Receivers: []string{"jaeger"},
160+
Processors: []string{"resource"},
161+
Exporters: []string{"jaeger"},
154162
},
155163
},
156164
}, cfg.Service)
157-
assert.Equal(t, 0, len(cfg.Processors))
165+
assert.Equal(t, 1, len(cfg.Processors))
166+
assert.IsType(t, &resourceprocessor.Config{}, cfg.Processors["resource"])
158167
assert.Equal(t, 1, len(cfg.Receivers))
159168
assert.IsType(t, &jaegerreceiver.Config{}, cfg.Receivers["jaeger"])
160169
assert.Equal(t, 1, len(cfg.Exporters))

cmd/opentelemetry-collector/app/defaults/defaults.go

+12-4
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@ import (
1818
"flag"
1919

2020
"github.com/open-telemetry/opentelemetry-collector/config"
21-
otelJaegerEexporter "github.com/open-telemetry/opentelemetry-collector/exporter/jaegerexporter"
22-
otelJaegerreceiver "github.com/open-telemetry/opentelemetry-collector/receiver/jaegerreceiver"
21+
otelJaegerExporter "github.com/open-telemetry/opentelemetry-collector/exporter/jaegerexporter"
22+
otelResourceProcessor "github.com/open-telemetry/opentelemetry-collector/processor/resourceprocessor"
23+
otelJaegerReceiver "github.com/open-telemetry/opentelemetry-collector/receiver/jaegerreceiver"
2324
"github.com/open-telemetry/opentelemetry-collector/service/defaultcomponents"
2425
"github.com/spf13/pflag"
2526
"github.com/spf13/viper"
@@ -28,6 +29,7 @@ import (
2829
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/elasticsearch"
2930
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/jaegerexporter"
3031
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/kafka"
32+
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/processor/resourceprocessor"
3133
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/receiver/jaegerreceiver"
3234
storageCassandra "github.com/jaegertracing/jaeger/plugin/storage/cassandra"
3335
storageEs "github.com/jaegertracing/jaeger/plugin/storage/es"
@@ -61,16 +63,22 @@ func Components(v *viper.Viper) config.Factories {
6163
factories.Exporters[cassandraExp.Type()] = cassandraExp
6264
factories.Exporters[esExp.Type()] = esExp
6365

64-
jaegerRec := factories.Receivers["jaeger"].(*otelJaegerreceiver.Factory)
66+
jaegerRec := factories.Receivers["jaeger"].(*otelJaegerReceiver.Factory)
6567
factories.Receivers["jaeger"] = &jaegerreceiver.Factory{
6668
Wrapped: jaegerRec,
6769
Viper: v,
6870
}
69-
jaegerExp := factories.Exporters["jaeger"].(*otelJaegerEexporter.Factory)
71+
jaegerExp := factories.Exporters["jaeger"].(*otelJaegerExporter.Factory)
7072
factories.Exporters["jaeger"] = &jaegerexporter.Factory{
7173
Wrapped: jaegerExp,
7274
Viper: v,
7375
}
76+
77+
resourceProc := factories.Processors["resource"].(*otelResourceProcessor.Factory)
78+
factories.Processors["resource"] = &resourceprocessor.Factory{
79+
Wrapped: resourceProc,
80+
Viper: v,
81+
}
7482
return factories
7583
}
7684

cmd/opentelemetry-collector/app/defaults/defaults_test.go

+2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/elasticsearch"
2525
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/jaegerexporter"
2626
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/kafka"
27+
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/processor/resourceprocessor"
2728
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/receiver/jaegerreceiver"
2829
jConfig "github.com/jaegertracing/jaeger/pkg/config"
2930
)
@@ -51,4 +52,5 @@ func TestComponents(t *testing.T) {
5152
assert.Equal(t, []string{"http://127.0.0.1:9200"}, ec.GetPrimary().Servers)
5253
assert.IsType(t, &jaegerreceiver.Factory{}, factories.Receivers["jaeger"])
5354
assert.IsType(t, &jaegerexporter.Factory{}, factories.Exporters["jaeger"])
55+
assert.IsType(t, &resourceprocessor.Factory{}, factories.Processors["resource"])
5456
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
// Copyright (c) 2020 The Jaeger Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package resourceprocessor
16+
17+
import (
18+
"flag"
19+
20+
"github.com/open-telemetry/opentelemetry-collector/component"
21+
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
22+
"github.com/open-telemetry/opentelemetry-collector/consumer"
23+
"github.com/open-telemetry/opentelemetry-collector/processor/resourceprocessor"
24+
"github.com/spf13/viper"
25+
"go.uber.org/zap"
26+
27+
"github.com/jaegertracing/jaeger/cmd/flags"
28+
)
29+
30+
const (
31+
resourceTags = "resource.tags"
32+
resourceTagsLegacy = "jaeger.tags"
33+
)
34+
35+
// Factory wraps resourceprocessor.Factory and makes the default config configurable via viper.
36+
// For instance this enables using flags as default values in the config object.
37+
type Factory struct {
38+
Wrapped *resourceprocessor.Factory
39+
Viper *viper.Viper
40+
}
41+
42+
var _ component.ProcessorFactoryOld = (*Factory)(nil)
43+
44+
// Type returns the type of the receiver.
45+
func (f Factory) Type() configmodels.Type {
46+
return f.Wrapped.Type()
47+
}
48+
49+
// CreateDefaultConfig returns default configuration of Factory.
50+
// This function implements OTEL component.ProcessorFactoryBase interface.
51+
func (f Factory) CreateDefaultConfig() configmodels.Processor {
52+
cfg := f.Wrapped.CreateDefaultConfig().(*resourceprocessor.Config)
53+
for k, v := range getTags(f.Viper) {
54+
cfg.Labels[k] = v
55+
}
56+
return cfg
57+
}
58+
59+
func getTags(v *viper.Viper) map[string]string {
60+
tagsLegacy := flags.ParseJaegerTags(v.GetString(resourceTagsLegacy))
61+
tags := flags.ParseJaegerTags(v.GetString(resourceTags))
62+
for k, v := range tagsLegacy {
63+
if _, ok := tags[k]; !ok {
64+
tags[k] = v
65+
}
66+
}
67+
return tags
68+
}
69+
70+
// CreateTraceProcessor creates resource processor.
71+
// This function implements OTEL component.ProcessorFactoryOld interface.
72+
func (f Factory) CreateTraceProcessor(
73+
logger *zap.Logger,
74+
nextConsumer consumer.TraceConsumerOld,
75+
cfg configmodels.Processor,
76+
) (component.TraceProcessorOld, error) {
77+
return f.Wrapped.CreateTraceProcessor(logger, nextConsumer, cfg)
78+
}
79+
80+
// CreateMetricsProcessor creates a resource processor.
81+
// This function implements component.ProcessorFactoryOld.
82+
func (f Factory) CreateMetricsProcessor(
83+
logger *zap.Logger,
84+
nextConsumer consumer.MetricsConsumerOld,
85+
cfg configmodels.Processor,
86+
) (component.MetricsProcessorOld, error) {
87+
return f.Wrapped.CreateMetricsProcessor(logger, nextConsumer, cfg)
88+
}
89+
90+
// AddFlags adds flags for Options.
91+
func AddFlags(flags *flag.FlagSet) {
92+
flags.String(resourceTagsLegacy, "", "(deprecated, use --resource.tags) One or more tags to be added to the Process tags of all spans passing through this agent. Ex: key1=value1,key2=${envVar:defaultValue}")
93+
flags.String(resourceTags, "", "One or more tags to be added to the Process tags of all spans passing through this agent. Ex: key1=value1,key2=${envVar:defaultValue}")
94+
}

0 commit comments

Comments
 (0)