Skip to content

Commit eb11553

Browse files
committed
Add support for queued retry in the exporter helper.
Changed only the OTLP exporter for the moment to use the new settings. Timeout is enabled for all the exporters. Fixes #1193 There are some missing features that will be added in a followup PR: 1. Enforcing errors. For the moment added the Throttle error as a hack to keep backwards compatibility with OTLP. 2. Enable queued and retry for all exporters. 3. Fix observability metrics for the case when requests are dropped because the queue is full.
1 parent 3274841 commit eb11553

16 files changed

+1121
-192
lines changed

exporter/exporterhelper/common.go

+148-15
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,65 @@ package exporterhelper
1616

1717
import (
1818
"context"
19+
"sync"
20+
"time"
1921

2022
"go.opencensus.io/trace"
2123

2224
"go.opentelemetry.io/collector/component"
25+
"go.opentelemetry.io/collector/component/componenterror"
26+
"go.opentelemetry.io/collector/config/configmodels"
27+
"go.opentelemetry.io/collector/consumer/consumererror"
2328
)
2429

2530
var (
2631
okStatus = trace.Status{Code: trace.StatusCodeOK}
2732
)
2833

34+
type TimeoutSettings struct {
35+
// Timeout is the timeout for each operation.
36+
Timeout time.Duration `mapstructure:"timeout"`
37+
}
38+
39+
func CreateDefaultTimeoutSettings() TimeoutSettings {
40+
return TimeoutSettings{
41+
Timeout: 5 * time.Second,
42+
}
43+
}
44+
45+
type settings struct {
46+
configmodels.Exporter
47+
TimeoutSettings
48+
QueuedSettings
49+
RetrySettings
50+
}
51+
52+
type request interface {
53+
context() context.Context
54+
setContext(context.Context)
55+
export(ctx context.Context) (int, error)
56+
// Returns a new queue request that contains the items left to be exported.
57+
onPartialError(consumererror.PartialError) request
58+
// Returns the cnt of spans/metric points or log records.
59+
count() int
60+
}
61+
62+
type requestSender interface {
63+
send(req request) (int, error)
64+
}
65+
66+
type baseRequest struct {
67+
ctx context.Context
68+
}
69+
70+
func (req *baseRequest) context() context.Context {
71+
return req.ctx
72+
}
73+
74+
func (req *baseRequest) setContext(ctx context.Context) {
75+
req.ctx = ctx
76+
}
77+
2978
// Start specifies the function invoked when the exporter is being started.
3079
type Start func(context.Context, component.Host) error
3180

@@ -51,37 +100,121 @@ func WithStart(start Start) ExporterOption {
51100
}
52101
}
53102

103+
// WithShutdown overrides the default TimeoutSettings for an exporter.
104+
// The default TimeoutSettings is 5 seconds.
105+
func WithTimeout(timeout TimeoutSettings) ExporterOption {
106+
return func(o *baseExporter) {
107+
o.cfg.TimeoutSettings = timeout
108+
}
109+
}
110+
111+
// WithRetry overrides the default RetrySettings for an exporter.
112+
// The default RetrySettings is to disable retries.
113+
func WithRetry(retry RetrySettings) ExporterOption {
114+
return func(o *baseExporter) {
115+
o.cfg.RetrySettings = retry
116+
}
117+
}
118+
119+
// WithQueued overrides the default QueuedSettings for an exporter.
120+
// The default QueuedSettings is to disable queueing.
121+
func WithQueued(queued QueuedSettings) ExporterOption {
122+
return func(o *baseExporter) {
123+
o.cfg.QueuedSettings = queued
124+
}
125+
}
126+
54127
// internalOptions contains internalOptions concerning how an Exporter is configured.
55128
type baseExporter struct {
56-
exporterFullName string
57-
start Start
58-
shutdown Shutdown
129+
cfg *settings
130+
sender requestSender
131+
rSender *retrySender
132+
qSender *queuedSender
133+
start Start
134+
shutdown Shutdown
135+
startOnce sync.Once
136+
shutdownOnce sync.Once
59137
}
60138

61139
// Construct the internalOptions from multiple ExporterOption.
62-
func newBaseExporter(exporterFullName string, options ...ExporterOption) baseExporter {
63-
be := baseExporter{
64-
exporterFullName: exporterFullName,
140+
func newBaseExporter(cfg configmodels.Exporter, options ...ExporterOption) *baseExporter {
141+
be := &baseExporter{
142+
cfg: &settings{
143+
Exporter: cfg,
144+
TimeoutSettings: CreateDefaultTimeoutSettings(),
145+
// TODO: Enable queuing by default (call CreateDefaultQueuedSettings
146+
QueuedSettings: QueuedSettings{Disabled: true},
147+
// TODO: Enable retry by default (call CreateDefaultRetrySettings)
148+
RetrySettings: RetrySettings{Disabled: true},
149+
},
65150
}
66151

67152
for _, op := range options {
68-
op(&be)
153+
op(be)
154+
}
155+
156+
if be.start == nil {
157+
be.start = func(ctx context.Context, host component.Host) error { return nil }
69158
}
70159

160+
if be.shutdown == nil {
161+
be.shutdown = func(ctx context.Context) error { return nil }
162+
}
163+
164+
be.sender = &timeoutSender{cfg: &be.cfg.TimeoutSettings}
165+
166+
be.rSender = newRetrySender(&be.cfg.RetrySettings, be.sender)
167+
be.sender = be.rSender
168+
169+
be.qSender = newQueuedSender(&be.cfg.QueuedSettings, be.sender)
170+
be.sender = be.qSender
171+
71172
return be
72173
}
73174

74175
func (be *baseExporter) Start(ctx context.Context, host component.Host) error {
75-
if be.start != nil {
76-
return be.start(ctx, host)
77-
}
78-
return nil
176+
err := componenterror.ErrAlreadyStarted
177+
be.startOnce.Do(func() {
178+
// First start the nextSender
179+
err = be.start(ctx, host)
180+
if err != nil {
181+
return
182+
}
183+
184+
// If no error then start the queuedSender
185+
be.qSender.start()
186+
})
187+
return err
79188
}
80189

81-
// Shutdown stops the exporter and is invoked during shutdown.
190+
// Shutdown stops the nextSender and is invoked during shutdown.
82191
func (be *baseExporter) Shutdown(ctx context.Context) error {
83-
if be.shutdown != nil {
84-
return be.shutdown(ctx)
192+
err := componenterror.ErrAlreadyStopped
193+
be.shutdownOnce.Do(func() {
194+
// First stop the retry goroutines
195+
be.rSender.shutdown()
196+
197+
// All operations will try to export once but will not retry because retrying was disabled when be.rSender stopped.
198+
be.qSender.shutdown()
199+
200+
// Last shutdown the nextSender itself.
201+
err = be.shutdown(ctx)
202+
})
203+
return err
204+
}
205+
206+
type timeoutSender struct {
207+
cfg *TimeoutSettings
208+
}
209+
210+
func (te *timeoutSender) send(req request) (int, error) {
211+
// Intentionally don't overwrite the context inside the request, because in case of retries deadline will not be
212+
// updated because this deadline most likely is before the next one.
213+
ctx := req.context()
214+
if te.cfg.Timeout > 0 {
215+
var cancelFunc func()
216+
ctx, cancelFunc = context.WithTimeout(req.context(), te.cfg.Timeout)
217+
defer cancelFunc()
85218
}
86-
return nil
219+
return req.export(ctx)
87220
}

exporter/exporterhelper/common_test.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -23,22 +23,28 @@ import (
2323

2424
"go.opentelemetry.io/collector/component"
2525
"go.opentelemetry.io/collector/component/componenttest"
26+
"go.opentelemetry.io/collector/config/configmodels"
2627
)
2728

29+
var defaultExporterCfg = &configmodels.ExporterSettings{
30+
TypeVal: "test",
31+
NameVal: "test",
32+
}
33+
2834
func TestErrorToStatus(t *testing.T) {
2935
require.Equal(t, okStatus, errToStatus(nil))
3036
require.Equal(t, trace.Status{Code: trace.StatusCodeUnknown, Message: "my_error"}, errToStatus(errors.New("my_error")))
3137
}
3238

3339
func TestBaseExporter(t *testing.T) {
34-
be := newBaseExporter("test")
40+
be := newBaseExporter(defaultExporterCfg)
3541
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
3642
require.NoError(t, be.Shutdown(context.Background()))
3743
}
3844

3945
func TestBaseExporterWithOptions(t *testing.T) {
4046
be := newBaseExporter(
41-
"test",
47+
defaultExporterCfg,
4248
WithStart(func(ctx context.Context, host component.Host) error { return errors.New("my error") }),
4349
WithShutdown(func(ctx context.Context) error { return errors.New("my error") }))
4450
require.Error(t, be.Start(context.Background(), componenttest.NewNopHost()))

exporter/exporterhelper/logshelper.go

+51-18
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919

2020
"go.opentelemetry.io/collector/component"
2121
"go.opentelemetry.io/collector/config/configmodels"
22+
"go.opentelemetry.io/collector/consumer/consumererror"
2223
"go.opentelemetry.io/collector/internal/data"
2324
"go.opentelemetry.io/collector/obsreport"
2425
)
@@ -27,44 +28,76 @@ import (
2728
// the number of dropped logs.
2829
type PushLogsData func(ctx context.Context, md data.Logs) (droppedTimeSeries int, err error)
2930

31+
type logsRequest struct {
32+
baseRequest
33+
ld data.Logs
34+
pusher PushLogsData
35+
}
36+
37+
func newLogsRequest(ctx context.Context, ld data.Logs, pusher PushLogsData) request {
38+
return &logsRequest{
39+
baseRequest: baseRequest{ctx: ctx},
40+
ld: ld,
41+
pusher: pusher,
42+
}
43+
}
44+
45+
func (req *logsRequest) onPartialError(partialErr consumererror.PartialError) request {
46+
// TODO: Implement this
47+
return req
48+
}
49+
50+
func (req *logsRequest) export(ctx context.Context) (int, error) {
51+
return req.pusher(ctx, req.ld)
52+
}
53+
54+
func (req *logsRequest) count() int {
55+
return req.ld.LogRecordCount()
56+
}
57+
3058
type logsExporter struct {
31-
baseExporter
59+
*baseExporter
3260
pushLogsData PushLogsData
3361
}
3462

35-
func (me *logsExporter) ConsumeLogs(ctx context.Context, md data.Logs) error {
36-
exporterCtx := obsreport.ExporterContext(ctx, me.exporterFullName)
37-
_, err := me.pushLogsData(exporterCtx, md)
63+
func (lexp *logsExporter) ConsumeLogs(ctx context.Context, ld data.Logs) error {
64+
exporterCtx := obsreport.ExporterContext(ctx, lexp.cfg.Name())
65+
_, err := lexp.sender.send(newLogsRequest(exporterCtx, ld, lexp.pushLogsData))
3866
return err
3967
}
4068

4169
// NewLogsExporter creates an LogsExporter that can record logs and can wrap every request with a Span.
42-
// TODO: Add support for retries.
43-
func NewLogsExporter(config configmodels.Exporter, pushLogsData PushLogsData, options ...ExporterOption) (component.LogExporter, error) {
44-
if config == nil {
70+
func NewLogsExporter(cfg configmodels.Exporter, pushLogsData PushLogsData, options ...ExporterOption) (component.LogExporter, error) {
71+
if cfg == nil {
4572
return nil, errNilConfig
4673
}
4774

4875
if pushLogsData == nil {
4976
return nil, errNilPushLogsData
5077
}
5178

52-
pushLogsData = pushLogsWithObservability(pushLogsData, config.Name())
79+
be := newBaseExporter(cfg, options...)
80+
81+
// Record metrics on the consumer.
82+
be.qSender.nextSender = &logsExporterWithObservability{
83+
exporterName: cfg.Name(),
84+
sender: be.qSender.nextSender,
85+
}
5386

5487
return &logsExporter{
55-
baseExporter: newBaseExporter(config.Name(), options...),
88+
baseExporter: be,
5689
pushLogsData: pushLogsData,
5790
}, nil
5891
}
5992

60-
func pushLogsWithObservability(next PushLogsData, exporterName string) PushLogsData {
61-
return func(ctx context.Context, ld data.Logs) (int, error) {
62-
ctx = obsreport.StartLogsExportOp(ctx, exporterName)
63-
numDroppedLogs, err := next(ctx, ld)
64-
65-
numLogs := ld.LogRecordCount()
93+
type logsExporterWithObservability struct {
94+
exporterName string
95+
sender requestSender
96+
}
6697

67-
obsreport.EndLogsExportOp(ctx, numLogs, numDroppedLogs, err)
68-
return numLogs, err
69-
}
98+
func (lewo *logsExporterWithObservability) send(req request) (int, error) {
99+
req.setContext(obsreport.StartLogsExportOp(req.context(), lewo.exporterName))
100+
numDroppedLogs, err := lewo.sender.send(req)
101+
obsreport.EndLogsExportOp(req.context(), req.count(), numDroppedLogs, err)
102+
return numDroppedLogs, err
70103
}

exporter/exporterhelper/logshelper_test.go

+15-7
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424

2525
"go.opentelemetry.io/collector/component"
2626
"go.opentelemetry.io/collector/config/configmodels"
27+
"go.opentelemetry.io/collector/consumer/consumererror"
2728
"go.opentelemetry.io/collector/internal/data"
2829
"go.opentelemetry.io/collector/internal/data/testdata"
2930
"go.opentelemetry.io/collector/obsreport"
@@ -43,6 +44,13 @@ var (
4344
}
4445
)
4546

47+
func TestLogsRequest(t *testing.T) {
48+
mr := newLogsRequest(context.Background(), testdata.GenerateLogDataEmpty(), nil)
49+
50+
partialErr := consumererror.PartialTracesError(errors.New("some error"), testdata.GenerateTraceDataOneSpan())
51+
assert.Same(t, mr, mr.onPartialError(partialErr.(consumererror.PartialError)))
52+
}
53+
4654
func TestLogsExporter_InvalidName(t *testing.T) {
4755
me, err := NewLogsExporter(nil, newPushLogsData(0, nil))
4856
require.Nil(t, me)
@@ -178,7 +186,7 @@ func generateLogsTraffic(t *testing.T, me component.LogExporter, numRequests int
178186
}
179187
}
180188

181-
func checkWrapSpanForLogsExporter(t *testing.T, me component.LogExporter, wantError error, numMetricPoints int64) {
189+
func checkWrapSpanForLogsExporter(t *testing.T, me component.LogExporter, wantError error, numLogRecords int64) {
182190
ocSpansSaver := new(testOCTraceExporter)
183191
trace.RegisterExporter(ocSpansSaver)
184192
defer trace.UnregisterExporter(ocSpansSaver)
@@ -201,13 +209,13 @@ func checkWrapSpanForLogsExporter(t *testing.T, me component.LogExporter, wantEr
201209
require.Equalf(t, parentSpan.SpanContext.SpanID, sd.ParentSpanID, "Exporter span not a child\nSpanData %v", sd)
202210
require.Equalf(t, errToStatus(wantError), sd.Status, "SpanData %v", sd)
203211

204-
sentMetricPoints := numMetricPoints
205-
var failedToSendMetricPoints int64
212+
sentLogRecords := numLogRecords
213+
var failedToSendLogRecords int64
206214
if wantError != nil {
207-
sentMetricPoints = 0
208-
failedToSendMetricPoints = numMetricPoints
215+
sentLogRecords = 0
216+
failedToSendLogRecords = numLogRecords
209217
}
210-
require.Equalf(t, sentMetricPoints, sd.Attributes[obsreport.SentLogRecordsKey], "SpanData %v", sd)
211-
require.Equalf(t, failedToSendMetricPoints, sd.Attributes[obsreport.FailedToSendLogRecordsKey], "SpanData %v", sd)
218+
require.Equalf(t, sentLogRecords, sd.Attributes[obsreport.SentLogRecordsKey], "SpanData %v", sd)
219+
require.Equalf(t, failedToSendLogRecords, sd.Attributes[obsreport.FailedToSendLogRecordsKey], "SpanData %v", sd)
212220
}
213221
}

0 commit comments

Comments
 (0)