Skip to content

Commit cfcc05c

Browse files
committed
Added timeout to the exporter path
Signed-off-by: Juraci Paixão Kröhling <juraci@kroehling.de>
1 parent 4eca960 commit cfcc05c

File tree

6 files changed

+49
-25
lines changed

6 files changed

+49
-25
lines changed

exporter/jaegerexporter/exporter.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package jaegerexporter
1616

1717
import (
1818
"context"
19+
"fmt"
1920

2021
jaegerproto "github.com/jaegertracing/jaeger/proto-gen/api_v2"
2122
"google.golang.org/grpc"
@@ -71,7 +72,7 @@ func (s *protoGRPCSender) pushTraceData(
7172

7273
batches, err := jaegertranslator.InternalTracesToJaegerProto(td)
7374
if err != nil {
74-
return td.SpanCount(), consumererror.Permanent(err)
75+
return td.SpanCount(), consumererror.Permanent(fmt.Errorf("failed to push trace data via Jaeger exporter: %v", err))
7576
}
7677

7778
if s.metadata.Len() > 0 {
@@ -84,7 +85,7 @@ func (s *protoGRPCSender) pushTraceData(
8485
ctx,
8586
&jaegerproto.PostSpansRequest{Batch: *batch}, grpc.WaitForReady(s.waitForReady))
8687
if err != nil {
87-
return td.SpanCount() - sentSpans, err
88+
return td.SpanCount() - sentSpans, fmt.Errorf("failed to push trace data via Jaeger exporter: %v", err)
8889
}
8990
sentSpans += len(batch.Spans)
9091
}

exporter/opencensusexporter/opencensus.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ func (oce *ocAgentExporter) PushTraceData(ctx context.Context, td consumerdata.T
156156
code: errAlreadyStopped,
157157
msg: "OpenCensus exporter was already stopped.",
158158
}
159-
return len(td.Spans), err
159+
return len(td.Spans), fmt.Errorf("failed to push trace data via OpenCensus exporter: %v", err)
160160
}
161161

162162
err := exporter.ExportTraceServiceRequest(
@@ -168,7 +168,7 @@ func (oce *ocAgentExporter) PushTraceData(ctx context.Context, td consumerdata.T
168168
)
169169
oce.exporters <- exporter
170170
if err != nil {
171-
return len(td.Spans), err
171+
return len(td.Spans), fmt.Errorf("failed to push trace data via OpenCensus exporter: %v", err)
172172
}
173173
return 0, nil
174174
}
@@ -181,7 +181,7 @@ func (oce *ocAgentExporter) PushMetricsData(ctx context.Context, md consumerdata
181181
code: errAlreadyStopped,
182182
msg: "OpenCensus exporter was already stopped.",
183183
}
184-
return exporterhelper.NumTimeSeries(md), err
184+
return exporterhelper.NumTimeSeries(md), fmt.Errorf("failed to push metrics data via OpenCensus exporter: %v", err)
185185
}
186186

187187
req := &agentmetricspb.ExportMetricsServiceRequest{
@@ -192,7 +192,7 @@ func (oce *ocAgentExporter) PushMetricsData(ctx context.Context, md consumerdata
192192
err := exporter.ExportMetricsServiceRequest(req)
193193
oce.exporters <- exporter
194194
if err != nil {
195-
return exporterhelper.NumTimeSeries(md), err
195+
return exporterhelper.NumTimeSeries(md), fmt.Errorf("failed to push metrics data via OpenCensus exporter: %v", err)
196196
}
197197
return 0, nil
198198
}

exporter/otlpexporter/otlp.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ func (oce *otlpExporter) pushTraceData(ctx context.Context, td pdata.Traces) (in
193193
code: errAlreadyStopped,
194194
msg: "OpenTelemetry exporter was already stopped.",
195195
}
196-
return td.SpanCount(), err
196+
return td.SpanCount(), fmt.Errorf("OTLP trace exporter failed: %v", err)
197197
}
198198

199199
// Perform the request.
@@ -205,7 +205,7 @@ func (oce *otlpExporter) pushTraceData(ctx context.Context, td pdata.Traces) (in
205205
// Return the exporter to the pool.
206206
oce.exporters <- exporter
207207
if err != nil {
208-
return td.SpanCount(), err
208+
return td.SpanCount(), fmt.Errorf("OTLP trace exporter failed: %v", err)
209209
}
210210
return 0, nil
211211
}
@@ -219,7 +219,7 @@ func (oce *otlpExporter) pushMetricsData(ctx context.Context, md pdata.Metrics)
219219
code: errAlreadyStopped,
220220
msg: "OpenTelemetry exporter was already stopped.",
221221
}
222-
return imd.MetricCount(), err
222+
return imd.MetricCount(), fmt.Errorf("OTLP metrics exporter failed: %v", err)
223223
}
224224

225225
// Perform the request.
@@ -231,7 +231,7 @@ func (oce *otlpExporter) pushMetricsData(ctx context.Context, md pdata.Metrics)
231231
// Return the exporter to the pool.
232232
oce.exporters <- exporter
233233
if err != nil {
234-
return imd.MetricCount(), err
234+
return imd.MetricCount(), fmt.Errorf("OTLP metrics exporter failed: %v", err)
235235
}
236236
return 0, nil
237237
}
@@ -244,7 +244,7 @@ func (oce *otlpExporter) pushLogData(ctx context.Context, logs data.Logs) (int,
244244
code: errAlreadyStopped,
245245
msg: "OpenTelemetry exporter was already stopped.",
246246
}
247-
return logs.LogRecordCount(), err
247+
return logs.LogRecordCount(), fmt.Errorf("OTLP log exporter failed: %v", err)
248248
}
249249

250250
request := &otlplogs.ExportLogServiceRequest{
@@ -255,7 +255,7 @@ func (oce *otlpExporter) pushLogData(ctx context.Context, logs data.Logs) (int,
255255
// Return the exporter to the pool.
256256
oce.exporters <- exporter
257257
if err != nil {
258-
return logs.LogRecordCount(), err
258+
return logs.LogRecordCount(), fmt.Errorf("OTLP log exporter failed: %v", err)
259259
}
260260
return 0, nil
261261
}

exporter/zipkinexporter/zipkin.go

+9-6
Original file line numberDiff line numberDiff line change
@@ -96,33 +96,36 @@ func createZipkinExporter(config configmodels.Exporter) (*zipkinExporter, error)
9696
return ze, nil
9797
}
9898

99-
func (ze *zipkinExporter) PushTraceData(_ context.Context, td consumerdata.TraceData) (int, error) {
99+
func (ze *zipkinExporter) PushTraceData(ctx context.Context, td consumerdata.TraceData) (int, error) {
100100
tbatch := make([]*zipkinmodel.SpanModel, 0, len(td.Spans))
101+
wrapError := func(err error) error {
102+
return fmt.Errorf("failed to push Zipkin trace trace: %v", err)
103+
}
101104

102105
var resource *resourcepb.Resource = td.Resource
103106

104107
for _, span := range td.Spans {
105108
zs, err := zipkin.OCSpanProtoToZipkin(td.Node, resource, span, ze.defaultServiceName)
106109
if err != nil {
107-
return len(td.Spans), consumererror.Permanent(err)
110+
return len(td.Spans), consumererror.Permanent(wrapError(err))
108111
}
109112
tbatch = append(tbatch, zs)
110113
}
111114

112115
body, err := ze.serializer.Serialize(tbatch)
113116
if err != nil {
114-
return len(td.Spans), consumererror.Permanent(err)
117+
return len(td.Spans), consumererror.Permanent(wrapError(err))
115118
}
116119

117-
req, err := http.NewRequest("POST", ze.url, bytes.NewReader(body))
120+
req, err := http.NewRequestWithContext(ctx, "POST", ze.url, bytes.NewReader(body))
118121
if err != nil {
119-
return len(td.Spans), err
122+
return len(td.Spans), wrapError(err)
120123
}
121124
req.Header.Set("Content-Type", ze.serializer.ContentType())
122125

123126
resp, err := ze.client.Do(req)
124127
if err != nil {
125-
return len(td.Spans), err
128+
return len(td.Spans), wrapError(err)
126129
}
127130
_ = resp.Body.Close()
128131
if resp.StatusCode < 200 || resp.StatusCode > 299 {

processor/batchprocessor/batch_processor.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -171,9 +171,11 @@ func (bp *batchTraceProcessor) resetTimer() {
171171
func (bp *batchTraceProcessor) sendItems(measure *stats.Int64Measure) {
172172
// Add that it came form the trace pipeline?
173173
statsTags := []tag.Mutator{tag.Insert(processor.TagProcessorNameKey, bp.name)}
174-
_ = stats.RecordWithTags(context.Background(), statsTags, measure.M(1))
174+
stats.RecordWithTags(context.Background(), statsTags, measure.M(1))
175175

176-
_ = bp.traceConsumer.ConsumeTraces(context.Background(), bp.batchTraces.getTraceData())
176+
if err := bp.traceConsumer.ConsumeTraces(context.Background(), bp.batchTraces.getTraceData()); err != nil {
177+
bp.logger.Warn("Sender failed", zap.Error(err))
178+
}
177179
bp.batchTraces.reset()
178180
}
179181

processor/fanoutconnector.go

+23-5
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package processor
1616

1717
import (
1818
"context"
19+
"time"
1920

2021
"go.opentelemetry.io/collector/component/componenterror"
2122
"go.opentelemetry.io/collector/consumer"
@@ -25,6 +26,8 @@ import (
2526
"go.opentelemetry.io/collector/internal/data"
2627
)
2728

29+
const timeout = 5 * time.Second
30+
2831
// This file contains implementations of Trace/Metrics connectors
2932
// that fan out the data to multiple other consumers.
3033

@@ -63,9 +66,12 @@ var _ consumer.MetricsConsumerOld = (*metricsFanOutConnectorOld)(nil)
6366

6467
// ConsumeMetricsData exports the MetricsData to all consumers wrapped by the current one.
6568
func (mfc metricsFanOutConnectorOld) ConsumeMetricsData(ctx context.Context, md consumerdata.MetricsData) error {
69+
ctxWithTimeout, cancel := context.WithTimeout(ctx, timeout)
70+
defer cancel()
71+
6672
var errs []error
6773
for _, mc := range mfc {
68-
if err := mc.ConsumeMetricsData(ctx, md); err != nil {
74+
if err := mc.ConsumeMetricsData(ctxWithTimeout, md); err != nil {
6975
errs = append(errs, err)
7076
}
7177
}
@@ -83,9 +89,12 @@ var _ consumer.MetricsConsumer = (*metricsFanOutConnector)(nil)
8389

8490
// ConsumeMetricsData exports the MetricsData to all consumers wrapped by the current one.
8591
func (mfc metricsFanOutConnector) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error {
92+
ctxWithTimeout, cancel := context.WithTimeout(ctx, timeout)
93+
defer cancel()
94+
8695
var errs []error
8796
for _, mc := range mfc {
88-
if err := mc.ConsumeMetrics(ctx, md); err != nil {
97+
if err := mc.ConsumeMetrics(ctxWithTimeout, md); err != nil {
8998
errs = append(errs, err)
9099
}
91100
}
@@ -127,9 +136,12 @@ var _ consumer.TraceConsumerOld = (*traceFanOutConnectorOld)(nil)
127136

128137
// ConsumeTraceData exports the span data to all trace consumers wrapped by the current one.
129138
func (tfc traceFanOutConnectorOld) ConsumeTraceData(ctx context.Context, td consumerdata.TraceData) error {
139+
ctxWithTimeout, cancel := context.WithTimeout(ctx, timeout)
140+
defer cancel()
141+
130142
var errs []error
131143
for _, tc := range tfc {
132-
if err := tc.ConsumeTraceData(ctx, td); err != nil {
144+
if err := tc.ConsumeTraceData(ctxWithTimeout, td); err != nil {
133145
errs = append(errs, err)
134146
}
135147
}
@@ -147,9 +159,12 @@ var _ consumer.TraceConsumer = (*traceFanOutConnector)(nil)
147159

148160
// ConsumeTraces exports the span data to all trace consumers wrapped by the current one.
149161
func (tfc traceFanOutConnector) ConsumeTraces(ctx context.Context, td pdata.Traces) error {
162+
ctxWithTimeout, cancel := context.WithTimeout(ctx, timeout)
163+
defer cancel()
164+
150165
var errs []error
151166
for _, tc := range tfc {
152-
if err := tc.ConsumeTraces(ctx, td); err != nil {
167+
if err := tc.ConsumeTraces(ctxWithTimeout, td); err != nil {
153168
errs = append(errs, err)
154169
}
155170
}
@@ -167,9 +182,12 @@ var _ consumer.LogConsumer = (*LogFanOutConnector)(nil)
167182

168183
// Consume exports the span data to all consumers wrapped by the current one.
169184
func (fc LogFanOutConnector) ConsumeLogs(ctx context.Context, ld data.Logs) error {
185+
ctxWithTimeout, cancel := context.WithTimeout(ctx, timeout)
186+
defer cancel()
187+
170188
var errs []error
171189
for _, tc := range fc {
172-
if err := tc.ConsumeLogs(ctx, ld); err != nil {
190+
if err := tc.ConsumeLogs(ctxWithTimeout, ld); err != nil {
173191
errs = append(errs, err)
174192
}
175193
}

0 commit comments

Comments
 (0)