@@ -23,6 +23,7 @@ import (
23
23
24
24
"go.opentelemetry.io/collector/component"
25
25
"go.opentelemetry.io/collector/config/configmodels"
26
+ "go.opentelemetry.io/collector/config/configprotocol"
26
27
"go.opentelemetry.io/collector/receiver/jaegerreceiver"
27
28
"go.opentelemetry.io/collector/receiver/opencensusreceiver"
28
29
"go.opentelemetry.io/collector/receiver/otlpreceiver"
@@ -36,7 +37,7 @@ import (
36
37
// an exporter.
37
38
type DataReceiver interface {
38
39
Start (tc * MockTraceConsumer , mc * MockMetricConsumer ) error
39
- Stop ()
40
+ Stop () error
40
41
41
42
// Generate a config string to place in exporter part of collector config
42
43
// so that it can send data to this receiver.
@@ -59,7 +60,7 @@ func (mb *DataReceiverBase) ReportFatalError(err error) {
59
60
}
60
61
61
62
// GetFactory of the specified kind. Returns the factory for a component type.
62
- func (mb * DataReceiverBase ) GetFactory (kind component.Kind , componentType configmodels.Type ) component.Factory {
63
+ func (mb * DataReceiverBase ) GetFactory (_ component.Kind , _ configmodels.Type ) component.Factory {
63
64
return nil
64
65
}
65
66
@@ -75,7 +76,7 @@ func (mb *DataReceiverBase) GetExporters() map[configmodels.DataType]map[configm
75
76
// OCDataReceiver implements OpenCensus format receiver.
76
77
type OCDataReceiver struct {
77
78
DataReceiverBase
78
- receiver * opencensusreceiver .Receiver
79
+ receiver component .Receiver
79
80
}
80
81
81
82
// Ensure OCDataReceiver implements DataReceiver.
@@ -89,19 +90,22 @@ func NewOCDataReceiver(port int) *OCDataReceiver {
89
90
return & OCDataReceiver {DataReceiverBase : DataReceiverBase {Port : port }}
90
91
}
91
92
92
- func (or * OCDataReceiver ) Start (tc * MockTraceConsumer , mc * MockMetricConsumer ) error {
93
- addr := fmt .Sprintf ("localhost:%d" , or .Port )
93
+ func (or * OCDataReceiver ) Start (tc * MockTraceConsumer , _ * MockMetricConsumer ) error {
94
+ factory := opencensusreceiver.Factory {}
95
+ cfg := factory .CreateDefaultConfig ().(* opencensusreceiver.Config )
96
+ cfg .SetName ("opencensus" )
97
+ cfg .Endpoint = fmt .Sprintf ("localhost:%d" , or .Port )
94
98
var err error
95
- or .receiver , err = opencensusreceiver . New ( "opencensus" , "tcp" , addr , tc , mc )
99
+ or .receiver , err = factory . CreateTraceReceiver ( context . Background (), zap . NewNop (), cfg , tc )
96
100
if err != nil {
97
101
return err
98
102
}
99
103
100
104
return or .receiver .Start (context .Background (), or )
101
105
}
102
106
103
- func (or * OCDataReceiver ) Stop () {
104
- or .receiver .Shutdown (context .Background ())
107
+ func (or * OCDataReceiver ) Stop () error {
108
+ return or .receiver .Shutdown (context .Background ())
105
109
}
106
110
107
111
func (or * OCDataReceiver ) GenConfigYAMLStr () string {
@@ -128,26 +132,25 @@ func NewJaegerDataReceiver(port int) *JaegerDataReceiver {
128
132
return & JaegerDataReceiver {DataReceiverBase : DataReceiverBase {Port : port }}
129
133
}
130
134
131
- func (jr * JaegerDataReceiver ) Start (tc * MockTraceConsumer , mc * MockMetricConsumer ) error {
132
- jaegerCfg := jaegerreceiver.Configuration {
133
- CollectorGRPCPort : jr .Port ,
135
+ func (jr * JaegerDataReceiver ) Start (tc * MockTraceConsumer , _ * MockMetricConsumer ) error {
136
+ factory := jaegerreceiver.Factory {}
137
+ cfg := factory .CreateDefaultConfig ().(* jaegerreceiver.Config )
138
+ cfg .SetName ("jaeger" )
139
+ cfg .Protocols ["grpc" ] = & configprotocol.ProtocolServerSettings {
140
+ Endpoint : fmt .Sprintf ("localhost:%d" , jr .Port ),
134
141
}
135
142
var err error
136
143
params := component.ReceiverCreateParams {Logger : zap .NewNop ()}
137
- jr .receiver , err = jaegerreceiver . New ( "jaeger" , & jaegerCfg , tc , params )
144
+ jr .receiver , err = factory . CreateTraceReceiver ( context . Background (), params , cfg , tc )
138
145
if err != nil {
139
146
return err
140
147
}
141
148
142
149
return jr .receiver .Start (context .Background (), jr )
143
150
}
144
151
145
- func (jr * JaegerDataReceiver ) Stop () {
146
- if jr .receiver != nil {
147
- if err := jr .receiver .Shutdown (context .Background ()); err != nil {
148
- log .Printf ("Cannot stop Jaeger receiver: %s" , err .Error ())
149
- }
150
- }
152
+ func (jr * JaegerDataReceiver ) Stop () error {
153
+ return jr .receiver .Shutdown (context .Background ())
151
154
}
152
155
153
156
func (jr * JaegerDataReceiver ) GenConfigYAMLStr () string {
@@ -165,7 +168,7 @@ func (jr *JaegerDataReceiver) ProtocolName() string {
165
168
// OTLPDataReceiver implements OTLP format receiver.
166
169
type OTLPDataReceiver struct {
167
170
DataReceiverBase
168
- receiver * otlpreceiver .Receiver
171
+ receiver component .Receiver
169
172
}
170
173
171
174
// Ensure OTLPDataReceiver implements DataReceiver.
@@ -179,19 +182,23 @@ func NewOTLPDataReceiver(port int) *OTLPDataReceiver {
179
182
return & OTLPDataReceiver {DataReceiverBase : DataReceiverBase {Port : port }}
180
183
}
181
184
182
- func (or * OTLPDataReceiver ) Start (tc * MockTraceConsumer , mc * MockMetricConsumer ) error {
183
- addr := fmt .Sprintf ("localhost:%d" , or .Port )
185
+ func (or * OTLPDataReceiver ) Start (tc * MockTraceConsumer , _ * MockMetricConsumer ) error {
186
+ factory := otlpreceiver.Factory {}
187
+ cfg := factory .CreateDefaultConfig ().(* otlpreceiver.Config )
188
+ cfg .SetName ("otlp" )
189
+ cfg .Endpoint = fmt .Sprintf ("localhost:%d" , or .Port )
184
190
var err error
185
- or .receiver , err = otlpreceiver .New ("otlp" , "tcp" , addr , tc , mc )
191
+ params := component.ReceiverCreateParams {Logger : zap .NewNop ()}
192
+ or .receiver , err = factory .CreateTraceReceiver (context .Background (), params , cfg , tc )
186
193
if err != nil {
187
194
return err
188
195
}
189
196
190
197
return or .receiver .Start (context .Background (), or )
191
198
}
192
199
193
- func (or * OTLPDataReceiver ) Stop () {
194
- or .receiver .Shutdown (context .Background ())
200
+ func (or * OTLPDataReceiver ) Stop () error {
201
+ return or .receiver .Shutdown (context .Background ())
195
202
}
196
203
197
204
func (or * OTLPDataReceiver ) GenConfigYAMLStr () string {
@@ -209,7 +216,7 @@ func (or *OTLPDataReceiver) ProtocolName() string {
209
216
// ZipkinDataReceiver implements Zipkin format receiver.
210
217
type ZipkinDataReceiver struct {
211
218
DataReceiverBase
212
- receiver * zipkinreceiver. ZipkinReceiver
219
+ receiver component. TraceReceiver
213
220
}
214
221
215
222
const DefaultZipkinAddressPort = 9411
@@ -218,13 +225,13 @@ func NewZipkinDataReceiver(port int) *ZipkinDataReceiver {
218
225
return & ZipkinDataReceiver {DataReceiverBase : DataReceiverBase {Port : port }}
219
226
}
220
227
221
- func (zr * ZipkinDataReceiver ) Start (tc * MockTraceConsumer , mc * MockMetricConsumer ) error {
222
- var err error
228
+ func (zr * ZipkinDataReceiver ) Start (tc * MockTraceConsumer , _ * MockMetricConsumer ) error {
223
229
factory := zipkinreceiver.Factory {}
224
230
cfg := factory .CreateDefaultConfig ().(* zipkinreceiver.Config )
225
- cfg .NameVal = "zipkin"
231
+ cfg .SetName ( "zipkin" )
226
232
cfg .Endpoint = fmt .Sprintf ("localhost:%d" , zr .Port )
227
- zr .receiver , err = zipkinreceiver .New (cfg , tc )
233
+ var err error
234
+ zr .receiver , err = factory .CreateTraceReceiver (context .Background (), zap .NewNop (), cfg , tc )
228
235
229
236
if err != nil {
230
237
return err
@@ -233,12 +240,8 @@ func (zr *ZipkinDataReceiver) Start(tc *MockTraceConsumer, mc *MockMetricConsume
233
240
return zr .receiver .Start (context .Background (), zr )
234
241
}
235
242
236
- func (zr * ZipkinDataReceiver ) Stop () {
237
- if zr .receiver != nil {
238
- if err := zr .receiver .Shutdown (context .Background ()); err != nil {
239
- log .Printf ("Cannot stop Zipkin receiver: %s" , err .Error ())
240
- }
241
- }
243
+ func (zr * ZipkinDataReceiver ) Stop () error {
244
+ return zr .receiver .Shutdown (context .Background ())
242
245
}
243
246
244
247
func (zr * ZipkinDataReceiver ) GenConfigYAMLStr () string {
0 commit comments