Skip to content

Commit 6cdaea8

Browse files
authored
Support adding process tags in OTEL via env variable (#2220)
* Support adding process tags in OTEL via env variable Signed-off-by: Pavol Loffay <ploffay@redhat.com> * Install processor only when flag is used Signed-off-by: Pavol Loffay <ploffay@redhat.com> * Increase coverage Signed-off-by: Pavol Loffay <ploffay@redhat.com> * Fix name Signed-off-by: Pavol Loffay <ploffay@redhat.com> * Fix review Signed-off-by: Pavol Loffay <ploffay@redhat.com>
1 parent 1b4bf08 commit 6cdaea8

File tree

13 files changed

+361
-59
lines changed

13 files changed

+361
-59
lines changed

cmd/agent/app/reporter/flags.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,8 @@ import (
2828
const (
2929
// Whether to use grpc or tchannel reporter.
3030
reporterType = "reporter.type"
31-
// Agent tags
32-
agentTagsDeprecated = "jaeger.tags"
31+
// AgentTagsDeprecated is a configuration property name for adding process tags to incoming spans.
32+
AgentTagsDeprecated = "jaeger.tags"
3333
agentTags = "agent.tags"
3434
// GRPC is name of gRPC reporter.
3535
GRPC Type = "grpc"
@@ -48,7 +48,7 @@ type Options struct {
4848
func AddFlags(flags *flag.FlagSet) {
4949
flags.String(reporterType, string(GRPC), fmt.Sprintf("Reporter type to use e.g. %s", string(GRPC)))
5050
if !setupcontext.IsAllInOne() {
51-
flags.String(agentTagsDeprecated, "", "(deprecated) see --"+agentTags)
51+
flags.String(AgentTagsDeprecated, "", "(deprecated) see --"+agentTags)
5252
flags.String(agentTags, "", "One or more tags to be added to the Process tags of all spans passing through this agent. Ex: key1=value1,key2=${envVar:defaultValue}")
5353
}
5454
}
@@ -57,9 +57,9 @@ func AddFlags(flags *flag.FlagSet) {
5757
func (b *Options) InitFromViper(v *viper.Viper, logger *zap.Logger) *Options {
5858
b.ReporterType = Type(v.GetString(reporterType))
5959
if !setupcontext.IsAllInOne() {
60-
if len(v.GetString(agentTagsDeprecated)) > 0 {
61-
logger.Warn("Using deprecated configuration", zap.String("option", agentTagsDeprecated))
62-
b.AgentTags = flags.ParseJaegerTags(v.GetString(agentTagsDeprecated))
60+
if len(v.GetString(AgentTagsDeprecated)) > 0 {
61+
logger.Warn("Using deprecated configuration", zap.String("option", AgentTagsDeprecated))
62+
b.AgentTags = flags.ParseJaegerTags(v.GetString(AgentTagsDeprecated))
6363
}
6464
if len(v.GetString(agentTags)) > 0 {
6565
b.AgentTags = flags.ParseJaegerTags(v.GetString(agentTags))

cmd/opentelemetry-collector/app/defaults/default_config.go

+49-17
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/open-telemetry/opentelemetry-collector/config"
2222
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
2323
"github.com/open-telemetry/opentelemetry-collector/extension/healthcheckextension"
24+
"github.com/open-telemetry/opentelemetry-collector/processor/resourceprocessor"
2425
"github.com/open-telemetry/opentelemetry-collector/receiver"
2526
"github.com/open-telemetry/opentelemetry-collector/receiver/jaegerreceiver"
2627
"github.com/open-telemetry/opentelemetry-collector/receiver/zipkinreceiver"
@@ -36,7 +37,6 @@ const (
3637
httpThriftBinaryEndpoint = "localhost:14268"
3738
udpThriftCompactEndpoint = "localhost:6831"
3839
udpThriftBinaryEndpoint = "localhost:6832"
39-
httpSamplingEndpoint = "localhost:5778"
4040
)
4141

4242
// CollectorConfig creates default collector configuration.
@@ -46,27 +46,26 @@ func CollectorConfig(storageType string, zipkinHostPort string, factories config
4646
if err != nil {
4747
return nil, err
4848
}
49-
expTypes := []string{}
50-
for _, v := range exporters {
51-
expTypes = append(expTypes, string(v.Type()))
52-
}
5349
receivers := createCollectorReceivers(zipkinHostPort, factories)
54-
recTypes := []string{}
55-
for _, v := range receivers {
56-
recTypes = append(recTypes, string(v.Type()))
57-
}
5850
hc := factories.Extensions["health_check"].CreateDefaultConfig()
51+
processors := configmodels.Processors{}
52+
resProcessor := factories.Processors["resource"].CreateDefaultConfig().(*resourceprocessor.Config)
53+
if len(resProcessor.Labels) > 0 {
54+
processors[resProcessor.Name()] = resProcessor
55+
}
5956
return &configmodels.Config{
6057
Receivers: receivers,
58+
Processors: processors,
6159
Exporters: exporters,
6260
Extensions: configmodels.Extensions{"health_check": hc},
6361
Service: configmodels.Service{
6462
Extensions: []string{"health_check"},
6563
Pipelines: configmodels.Pipelines{
6664
"traces": {
67-
InputType: configmodels.TracesDataType,
68-
Receivers: recTypes,
69-
Exporters: expTypes,
65+
InputType: configmodels.TracesDataType,
66+
Receivers: receiverNames(receivers),
67+
Processors: processorNames(processors),
68+
Exporters: exporterNames(exporters),
7069
},
7170
},
7271
},
@@ -124,18 +123,27 @@ func createExporters(storageTypes string, factories config.Factories) (configmod
124123
// It enables Jaeger receiver with UDP endpoints and Jaeger exporter.
125124
func AgentConfig(factories config.Factories) *configmodels.Config {
126125
jaegerExporter := factories.Exporters["jaeger"]
126+
exporters := configmodels.Exporters{"jaeger": jaegerExporter.CreateDefaultConfig()}
127127
hc := factories.Extensions["health_check"].CreateDefaultConfig().(*healthcheckextension.Config)
128+
processors := configmodels.Processors{}
129+
resProcessor := factories.Processors["resource"].CreateDefaultConfig().(*resourceprocessor.Config)
130+
if len(resProcessor.Labels) > 0 {
131+
processors[resProcessor.Name()] = resProcessor
132+
}
133+
receivers := createAgentReceivers(factories)
128134
return &configmodels.Config{
129-
Receivers: createAgentReceivers(factories),
130-
Exporters: configmodels.Exporters{"jaeger": jaegerExporter.CreateDefaultConfig()},
135+
Receivers: receivers,
136+
Processors: processors,
137+
Exporters: exporters,
131138
Extensions: configmodels.Extensions{"health_check": hc},
132139
Service: configmodels.Service{
133140
Extensions: []string{"health_check"},
134141
Pipelines: map[string]*configmodels.Pipeline{
135142
"traces": {
136-
InputType: configmodels.TracesDataType,
137-
Receivers: []string{"jaeger"},
138-
Exporters: []string{"jaeger"},
143+
InputType: configmodels.TracesDataType,
144+
Receivers: receiverNames(receivers),
145+
Processors: processorNames(processors),
146+
Exporters: exporterNames(exporters),
139147
},
140148
},
141149
},
@@ -161,3 +169,27 @@ func createAgentReceivers(factories config.Factories) configmodels.Receivers {
161169
}
162170
return recvs
163171
}
172+
173+
func receiverNames(receivers configmodels.Receivers) []string {
174+
var names []string
175+
for _, v := range receivers {
176+
names = append(names, v.Name())
177+
}
178+
return names
179+
}
180+
181+
func processorNames(processors configmodels.Processors) []string {
182+
var names []string
183+
for _, v := range processors {
184+
names = append(names, v.Name())
185+
}
186+
return names
187+
}
188+
189+
func exporterNames(exporters configmodels.Exporters) []string {
190+
var names []string
191+
for _, v := range exporters {
192+
names = append(names, v.Name())
193+
}
194+
return names
195+
}

cmd/opentelemetry-collector/app/defaults/default_config_test.go

+72-21
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ import (
2121
"github.com/open-telemetry/opentelemetry-collector/config"
2222
"github.com/open-telemetry/opentelemetry-collector/config/configmodels"
2323
"github.com/open-telemetry/opentelemetry-collector/exporter/jaegerexporter"
24+
"github.com/open-telemetry/opentelemetry-collector/processor/resourceprocessor"
2425
"github.com/open-telemetry/opentelemetry-collector/receiver/jaegerreceiver"
25-
"github.com/spf13/viper"
2626
"github.com/stretchr/testify/assert"
2727
"github.com/stretchr/testify/require"
2828
"go.uber.org/zap"
@@ -36,14 +36,14 @@ import (
3636
)
3737

3838
func TestDefaultCollectorConfig(t *testing.T) {
39-
factories := Components(viper.New())
4039
disabledHostPort := ports.PortToHostPort(0)
4140
tests := []struct {
4241
storageType string
4342
zipkinHostPort string
4443
exporterTypes []string
4544
pipeline configmodels.Pipelines
4645
err string
46+
config map[string]interface{}
4747
}{
4848
{
4949
storageType: "elasticsearch",
@@ -97,11 +97,13 @@ func TestDefaultCollectorConfig(t *testing.T) {
9797
storageType: "cassandra",
9898
zipkinHostPort: ":9411",
9999
exporterTypes: []string{cassandra.TypeStr},
100+
config: map[string]interface{}{"resource.labels": "foo=bar"},
100101
pipeline: configmodels.Pipelines{
101102
"traces": {
102-
InputType: configmodels.TracesDataType,
103-
Receivers: []string{"jaeger", "zipkin"},
104-
Exporters: []string{cassandra.TypeStr},
103+
InputType: configmodels.TracesDataType,
104+
Receivers: []string{"jaeger", "zipkin"},
105+
Processors: []string{"resource"},
106+
Exporters: []string{cassandra.TypeStr},
105107
},
106108
},
107109
},
@@ -112,6 +114,11 @@ func TestDefaultCollectorConfig(t *testing.T) {
112114
}
113115
for _, test := range tests {
114116
t.Run(test.storageType, func(t *testing.T) {
117+
v, _ := jConfig.Viperize(grpc.AddFlags)
118+
factories := Components(v)
119+
for key, val := range test.config {
120+
v.Set(key, val)
121+
}
115122
cfg, err := CollectorConfig(test.storageType, test.zipkinHostPort, factories)
116123
if test.err != "" {
117124
require.Nil(t, cfg)
@@ -129,6 +136,15 @@ func TestDefaultCollectorConfig(t *testing.T) {
129136
assert.Equal(t, "jaeger", cfg.Receivers["jaeger"].Name())
130137
assert.Equal(t, len(test.exporterTypes), len(cfg.Exporters))
131138

139+
processorMap := map[string]bool{}
140+
for _, p := range test.pipeline["traces"].Processors {
141+
processorMap[p] = true
142+
}
143+
if processorMap["resource"] {
144+
assert.Equal(t, len(processorMap), len(cfg.Processors))
145+
assert.IsType(t, &resourceprocessor.Config{}, cfg.Processors["resource"])
146+
}
147+
132148
types := []string{}
133149
for _, v := range cfg.Exporters {
134150
types = append(types, string(v.Type()))
@@ -141,22 +157,57 @@ func TestDefaultCollectorConfig(t *testing.T) {
141157
}
142158

143159
func TestDefaultAgentConfig(t *testing.T) {
144-
v, _ := jConfig.Viperize(grpc.AddFlags)
145-
factories := Components(v)
146-
cfg := AgentConfig(factories)
147-
assert.Equal(t, configmodels.Service{
148-
Extensions: []string{"health_check"},
149-
Pipelines: configmodels.Pipelines{
150-
"traces": &configmodels.Pipeline{
151-
InputType: configmodels.TracesDataType,
152-
Receivers: []string{"jaeger"},
153-
Exporters: []string{"jaeger"},
160+
tests := []struct {
161+
config map[string]interface{}
162+
service configmodels.Service
163+
}{
164+
{
165+
config: map[string]interface{}{"resource.labels": "foo=bar"},
166+
service: configmodels.Service{
167+
Extensions: []string{"health_check"},
168+
Pipelines: configmodels.Pipelines{
169+
"traces": &configmodels.Pipeline{
170+
InputType: configmodels.TracesDataType,
171+
Receivers: []string{"jaeger"},
172+
Processors: []string{"resource"},
173+
Exporters: []string{"jaeger"},
174+
},
175+
},
176+
},
177+
},
178+
{
179+
service: configmodels.Service{
180+
Extensions: []string{"health_check"},
181+
Pipelines: configmodels.Pipelines{
182+
"traces": &configmodels.Pipeline{
183+
InputType: configmodels.TracesDataType,
184+
Receivers: []string{"jaeger"},
185+
Exporters: []string{"jaeger"},
186+
},
187+
},
154188
},
155189
},
156-
}, cfg.Service)
157-
assert.Equal(t, 0, len(cfg.Processors))
158-
assert.Equal(t, 1, len(cfg.Receivers))
159-
assert.IsType(t, &jaegerreceiver.Config{}, cfg.Receivers["jaeger"])
160-
assert.Equal(t, 1, len(cfg.Exporters))
161-
assert.IsType(t, &jaegerexporter.Config{}, cfg.Exporters["jaeger"])
190+
}
191+
for _, test := range tests {
192+
v, _ := jConfig.Viperize(grpc.AddFlags)
193+
for key, val := range test.config {
194+
v.Set(key, val)
195+
}
196+
factories := Components(v)
197+
cfg := AgentConfig(factories)
198+
199+
assert.Equal(t, test.service, cfg.Service)
200+
assert.Equal(t, 1, len(cfg.Receivers))
201+
assert.IsType(t, &jaegerreceiver.Config{}, cfg.Receivers["jaeger"])
202+
assert.Equal(t, 1, len(cfg.Exporters))
203+
assert.IsType(t, &jaegerexporter.Config{}, cfg.Exporters["jaeger"])
204+
processorMap := map[string]bool{}
205+
for _, p := range test.service.Pipelines["traces"].Processors {
206+
processorMap[p] = true
207+
}
208+
if processorMap["resource"] {
209+
assert.Equal(t, len(processorMap), len(cfg.Processors))
210+
assert.IsType(t, &resourceprocessor.Config{}, cfg.Processors["resource"])
211+
}
212+
}
162213
}

cmd/opentelemetry-collector/app/defaults/defaults.go

+12-4
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@ import (
1818
"flag"
1919

2020
"github.com/open-telemetry/opentelemetry-collector/config"
21-
otelJaegerEexporter "github.com/open-telemetry/opentelemetry-collector/exporter/jaegerexporter"
22-
otelJaegerreceiver "github.com/open-telemetry/opentelemetry-collector/receiver/jaegerreceiver"
21+
otelJaegerExporter "github.com/open-telemetry/opentelemetry-collector/exporter/jaegerexporter"
22+
otelResourceProcessor "github.com/open-telemetry/opentelemetry-collector/processor/resourceprocessor"
23+
otelJaegerReceiver "github.com/open-telemetry/opentelemetry-collector/receiver/jaegerreceiver"
2324
"github.com/open-telemetry/opentelemetry-collector/service/defaultcomponents"
2425
"github.com/spf13/pflag"
2526
"github.com/spf13/viper"
@@ -28,6 +29,7 @@ import (
2829
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/elasticsearch"
2930
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/jaegerexporter"
3031
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/kafka"
32+
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/processor/resourceprocessor"
3133
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/receiver/jaegerreceiver"
3234
storageCassandra "github.com/jaegertracing/jaeger/plugin/storage/cassandra"
3335
storageEs "github.com/jaegertracing/jaeger/plugin/storage/es"
@@ -61,16 +63,22 @@ func Components(v *viper.Viper) config.Factories {
6163
factories.Exporters[cassandraExp.Type()] = cassandraExp
6264
factories.Exporters[esExp.Type()] = esExp
6365

64-
jaegerRec := factories.Receivers["jaeger"].(*otelJaegerreceiver.Factory)
66+
jaegerRec := factories.Receivers["jaeger"].(*otelJaegerReceiver.Factory)
6567
factories.Receivers["jaeger"] = &jaegerreceiver.Factory{
6668
Wrapped: jaegerRec,
6769
Viper: v,
6870
}
69-
jaegerExp := factories.Exporters["jaeger"].(*otelJaegerEexporter.Factory)
71+
jaegerExp := factories.Exporters["jaeger"].(*otelJaegerExporter.Factory)
7072
factories.Exporters["jaeger"] = &jaegerexporter.Factory{
7173
Wrapped: jaegerExp,
7274
Viper: v,
7375
}
76+
77+
resourceProc := factories.Processors["resource"].(*otelResourceProcessor.Factory)
78+
factories.Processors["resource"] = &resourceprocessor.Factory{
79+
Wrapped: resourceProc,
80+
Viper: v,
81+
}
7482
return factories
7583
}
7684

cmd/opentelemetry-collector/app/defaults/defaults_test.go

+2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/elasticsearch"
2525
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/jaegerexporter"
2626
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/exporter/kafka"
27+
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/processor/resourceprocessor"
2728
"github.com/jaegertracing/jaeger/cmd/opentelemetry-collector/app/receiver/jaegerreceiver"
2829
jConfig "github.com/jaegertracing/jaeger/pkg/config"
2930
)
@@ -51,4 +52,5 @@ func TestComponents(t *testing.T) {
5152
assert.Equal(t, []string{"http://127.0.0.1:9200"}, ec.GetPrimary().Servers)
5253
assert.IsType(t, &jaegerreceiver.Factory{}, factories.Receivers["jaeger"])
5354
assert.IsType(t, &jaegerexporter.Factory{}, factories.Exporters["jaeger"])
55+
assert.IsType(t, &resourceprocessor.Factory{}, factories.Processors["resource"])
5456
}

0 commit comments

Comments
 (0)