Skip to content

Commit 9d59924

Browse files
committed
First round of comments addressed
1 parent eb11553 commit 9d59924

11 files changed

+303
-234
lines changed

exporter/exporterhelper/common.go

+84-74
Original file line numberDiff line numberDiff line change
@@ -31,38 +31,36 @@ var (
3131
okStatus = trace.Status{Code: trace.StatusCodeOK}
3232
)
3333

34+
// Settings for timeout. The timeout applies to individual attempts to send data to the backend.
3435
type TimeoutSettings struct {
35-
// Timeout is the timeout for each operation.
36+
// Timeout is the timeout for every attempt to send data to the backend.
3637
Timeout time.Duration `mapstructure:"timeout"`
3738
}
3839

40+
// CreateDefaultTimeoutSettings returns the default settings for TimeoutSettings.
3941
func CreateDefaultTimeoutSettings() TimeoutSettings {
4042
return TimeoutSettings{
4143
Timeout: 5 * time.Second,
4244
}
4345
}
4446

45-
type settings struct {
46-
configmodels.Exporter
47-
TimeoutSettings
48-
QueuedSettings
49-
RetrySettings
50-
}
51-
47+
// request is an abstraction of an individual request (batch of data) independent of the type of the data (traces, metrics, logs).
5248
type request interface {
5349
context() context.Context
5450
setContext(context.Context)
5551
export(ctx context.Context) (int, error)
56-
// Returns a new queue request that contains the items left to be exported.
52+
// Returns a new request that contains the items left to be sent.
5753
onPartialError(consumererror.PartialError) request
58-
// Returns the cnt of spans/metric points or log records.
54+
// Returns the count of spans/metric points or log records.
5955
count() int
6056
}
6157

58+
// requestSender is an abstraction of a sender for a request independent of the type of the data (traces, metrics, logs).
6259
type requestSender interface {
6360
send(req request) (int, error)
6461
}
6562

63+
// baseRequest is a base implementation for the request.
6664
type baseRequest struct {
6765
ctx context.Context
6866
}
@@ -81,139 +79,151 @@ type Start func(context.Context, component.Host) error
8179
// Shutdown specifies the function invoked when the exporter is being shutdown.
8280
type Shutdown func(context.Context) error
8381

82+
// internalOptions represents all the options that users can configured.
83+
type internalOptions struct {
84+
TimeoutSettings
85+
QueueSettings
86+
RetrySettings
87+
Start
88+
Shutdown
89+
}
90+
91+
// fromConfiguredOptions returns the internal options starting from the default and applying all configured options.
92+
func fromConfiguredOptions(options ...ExporterOption) *internalOptions {
93+
// Start from the default options:
94+
opts := &internalOptions{
95+
TimeoutSettings: CreateDefaultTimeoutSettings(),
96+
// TODO: Enable queuing by default (call CreateDefaultQueueSettings)
97+
QueueSettings: QueueSettings{Disabled: true},
98+
// TODO: Enable retry by default (call CreateDefaultRetrySettings)
99+
RetrySettings: RetrySettings{Disabled: true},
100+
Start: func(ctx context.Context, host component.Host) error { return nil },
101+
Shutdown: func(ctx context.Context) error { return nil },
102+
}
103+
104+
for _, op := range options {
105+
op(opts)
106+
}
107+
108+
return opts
109+
}
110+
84111
// ExporterOption apply changes to internalOptions.
85-
type ExporterOption func(*baseExporter)
112+
type ExporterOption func(*internalOptions)
86113

87114
// WithShutdown overrides the default Shutdown function for an exporter.
88115
// The default shutdown function does nothing and always returns nil.
89116
func WithShutdown(shutdown Shutdown) ExporterOption {
90-
return func(o *baseExporter) {
91-
o.shutdown = shutdown
117+
return func(o *internalOptions) {
118+
o.Shutdown = shutdown
92119
}
93120
}
94121

95122
// WithStart overrides the default Start function for an exporter.
96123
// The default shutdown function does nothing and always returns nil.
97124
func WithStart(start Start) ExporterOption {
98-
return func(o *baseExporter) {
99-
o.start = start
125+
return func(o *internalOptions) {
126+
o.Start = start
100127
}
101128
}
102129

103-
// WithShutdown overrides the default TimeoutSettings for an exporter.
130+
// WithTimeout overrides the default TimeoutSettings for an exporter.
104131
// The default TimeoutSettings is 5 seconds.
105-
func WithTimeout(timeout TimeoutSettings) ExporterOption {
106-
return func(o *baseExporter) {
107-
o.cfg.TimeoutSettings = timeout
132+
func WithTimeout(timeoutSettings TimeoutSettings) ExporterOption {
133+
return func(o *internalOptions) {
134+
o.TimeoutSettings = timeoutSettings
108135
}
109136
}
110137

111138
// WithRetry overrides the default RetrySettings for an exporter.
112139
// The default RetrySettings is to disable retries.
113-
func WithRetry(retry RetrySettings) ExporterOption {
114-
return func(o *baseExporter) {
115-
o.cfg.RetrySettings = retry
140+
func WithRetry(retrySettings RetrySettings) ExporterOption {
141+
return func(o *internalOptions) {
142+
o.RetrySettings = retrySettings
116143
}
117144
}
118145

119-
// WithQueued overrides the default QueuedSettings for an exporter.
120-
// The default QueuedSettings is to disable queueing.
121-
func WithQueued(queued QueuedSettings) ExporterOption {
122-
return func(o *baseExporter) {
123-
o.cfg.QueuedSettings = queued
146+
// WithQueue overrides the default QueueSettings for an exporter.
147+
// The default QueueSettings is to disable queueing.
148+
func WithQueue(queueSettings QueueSettings) ExporterOption {
149+
return func(o *internalOptions) {
150+
o.QueueSettings = queueSettings
124151
}
125152
}
126153

127-
// internalOptions contains internalOptions concerning how an Exporter is configured.
154+
// baseExporter contains common fields between different exporter types.
128155
type baseExporter struct {
129-
cfg *settings
156+
cfg configmodels.Exporter
130157
sender requestSender
131-
rSender *retrySender
132-
qSender *queuedSender
158+
qrSender *queuedRetrySender
133159
start Start
134160
shutdown Shutdown
135161
startOnce sync.Once
136162
shutdownOnce sync.Once
137163
}
138164

139-
// Construct the internalOptions from multiple ExporterOption.
140165
func newBaseExporter(cfg configmodels.Exporter, options ...ExporterOption) *baseExporter {
166+
opts := fromConfiguredOptions(options...)
141167
be := &baseExporter{
142-
cfg: &settings{
143-
Exporter: cfg,
144-
TimeoutSettings: CreateDefaultTimeoutSettings(),
145-
// TODO: Enable queuing by default (call CreateDefaultQueuedSettings
146-
QueuedSettings: QueuedSettings{Disabled: true},
147-
// TODO: Enable retry by default (call CreateDefaultRetrySettings)
148-
RetrySettings: RetrySettings{Disabled: true},
149-
},
168+
cfg: cfg,
169+
start: opts.Start,
170+
shutdown: opts.Shutdown,
150171
}
151172

152-
for _, op := range options {
153-
op(be)
154-
}
155-
156-
if be.start == nil {
157-
be.start = func(ctx context.Context, host component.Host) error { return nil }
158-
}
159-
160-
if be.shutdown == nil {
161-
be.shutdown = func(ctx context.Context) error { return nil }
162-
}
163-
164-
be.sender = &timeoutSender{cfg: &be.cfg.TimeoutSettings}
165-
166-
be.rSender = newRetrySender(&be.cfg.RetrySettings, be.sender)
167-
be.sender = be.rSender
168-
169-
be.qSender = newQueuedSender(&be.cfg.QueuedSettings, be.sender)
170-
be.sender = be.qSender
173+
be.qrSender = newQueuedRetrySender(opts.QueueSettings, opts.RetrySettings, &timeoutSender{cfg: opts.TimeoutSettings})
174+
be.sender = be.qrSender
171175

172176
return be
173177
}
174178

179+
// wrapConsumerSender wraps the consumer sender (the sender that uses retries and timeout) with the given wrapper.
180+
// This can be used to wrap with observability (create spans, record metrics) the consumer sender.
181+
func (be *baseExporter) wrapConsumerSender(f func(consumer requestSender) requestSender) {
182+
be.qrSender.consumerSender = f(be.qrSender.consumerSender)
183+
}
184+
185+
// Start all senders and exporter and is invoked during service start.
175186
func (be *baseExporter) Start(ctx context.Context, host component.Host) error {
176187
err := componenterror.ErrAlreadyStarted
177188
be.startOnce.Do(func() {
178-
// First start the nextSender
189+
// First start the wrapped exporter.
179190
err = be.start(ctx, host)
180191
if err != nil {
192+
// TODO: Log errors, or check if it is recorded by the caller.
181193
return
182194
}
183195

184-
// If no error then start the queuedSender
185-
be.qSender.start()
196+
// If no error then start the queuedRetrySender.
197+
be.qrSender.start()
186198
})
187199
return err
188200
}
189201

190-
// Shutdown stops the nextSender and is invoked during shutdown.
202+
// Shutdown all senders and exporter and is invoked during service shutdown.
191203
func (be *baseExporter) Shutdown(ctx context.Context) error {
192204
err := componenterror.ErrAlreadyStopped
193205
be.shutdownOnce.Do(func() {
194-
// First stop the retry goroutines
195-
be.rSender.shutdown()
196-
197-
// All operations will try to export once but will not retry because retrying was disabled when be.rSender stopped.
198-
be.qSender.shutdown()
199-
200-
// Last shutdown the nextSender itself.
206+
// First shutdown the queued retry sender
207+
be.qrSender.shutdown()
208+
// Last shutdown the wrapped exporter itself.
201209
err = be.shutdown(ctx)
202210
})
203211
return err
204212
}
205213

214+
// timeoutSender is a request sender that adds a `timeout` to every request that passes this sender.
206215
type timeoutSender struct {
207-
cfg *TimeoutSettings
216+
cfg TimeoutSettings
208217
}
209218

210-
func (te *timeoutSender) send(req request) (int, error) {
219+
// send implements the requestSender interface
220+
func (ts *timeoutSender) send(req request) (int, error) {
211221
// Intentionally don't overwrite the context inside the request, because in case of retries deadline will not be
212222
// updated because this deadline most likely is before the next one.
213223
ctx := req.context()
214-
if te.cfg.Timeout > 0 {
224+
if ts.cfg.Timeout > 0 {
215225
var cancelFunc func()
216-
ctx, cancelFunc = context.WithTimeout(req.context(), te.cfg.Timeout)
226+
ctx, cancelFunc = context.WithTimeout(req.context(), ts.cfg.Timeout)
217227
defer cancelFunc()
218228
}
219229
return req.export(ctx)

exporter/exporterhelper/logshelper.go

+9-9
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func (lexp *logsExporter) ConsumeLogs(ctx context.Context, ld data.Logs) error {
6666
return err
6767
}
6868

69-
// NewLogsExporter creates an LogsExporter that can record logs and can wrap every request with a Span.
69+
// NewLogsExporter creates an LogsExporter that records observability metrics and wraps every request with a Span.
7070
func NewLogsExporter(cfg configmodels.Exporter, pushLogsData PushLogsData, options ...ExporterOption) (component.LogExporter, error) {
7171
if cfg == nil {
7272
return nil, errNilConfig
@@ -77,12 +77,12 @@ func NewLogsExporter(cfg configmodels.Exporter, pushLogsData PushLogsData, optio
7777
}
7878

7979
be := newBaseExporter(cfg, options...)
80-
81-
// Record metrics on the consumer.
82-
be.qSender.nextSender = &logsExporterWithObservability{
83-
exporterName: cfg.Name(),
84-
sender: be.qSender.nextSender,
85-
}
80+
be.wrapConsumerSender(func(nextSender requestSender) requestSender {
81+
return &logsExporterWithObservability{
82+
exporterName: cfg.Name(),
83+
nextSender: nextSender,
84+
}
85+
})
8686

8787
return &logsExporter{
8888
baseExporter: be,
@@ -92,12 +92,12 @@ func NewLogsExporter(cfg configmodels.Exporter, pushLogsData PushLogsData, optio
9292

9393
type logsExporterWithObservability struct {
9494
exporterName string
95-
sender requestSender
95+
nextSender requestSender
9696
}
9797

9898
func (lewo *logsExporterWithObservability) send(req request) (int, error) {
9999
req.setContext(obsreport.StartLogsExportOp(req.context(), lewo.exporterName))
100-
numDroppedLogs, err := lewo.sender.send(req)
100+
numDroppedLogs, err := lewo.nextSender.send(req)
101101
obsreport.EndLogsExportOp(req.context(), req.count(), numDroppedLogs, err)
102102
return numDroppedLogs, err
103103
}

exporter/exporterhelper/metricshelper.go

+12-14
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,7 @@ func (mexp *metricsExporterOld) ConsumeMetricsData(ctx context.Context, md consu
4141
return err
4242
}
4343

44-
// NewMetricsExporterOld creates an MetricsExporter that can record metrics and can wrap every request with a Span.
45-
// If no internalOptions are passed it just adds the nextSender format as a tag in the Context.
44+
// NewMetricsExporterOld creates an MetricsExporter that records observability metrics and wraps every request with a Span.
4645
// TODO: Add support for retries.
4746
func NewMetricsExporterOld(cfg configmodels.Exporter, pushMetricsData PushMetricsDataOld, options ...ExporterOption) (component.MetricsExporterOld, error) {
4847
if cfg == nil {
@@ -129,8 +128,7 @@ func (mexp *metricsExporter) ConsumeMetrics(ctx context.Context, md pdata.Metric
129128
return err
130129
}
131130

132-
// NewMetricsExporter creates an MetricsExporter that can record metrics and can wrap every request with a Span.
133-
// If no internalOptions are passed it just adds the nextSender format as a tag in the Context.
131+
// NewMetricsExporter creates an MetricsExporter that records observability metrics and wraps every request with a Span.
134132
func NewMetricsExporter(cfg configmodels.Exporter, pushMetricsData PushMetricsData, options ...ExporterOption) (component.MetricsExporter, error) {
135133
if cfg == nil {
136134
return nil, errNilConfig
@@ -141,12 +139,12 @@ func NewMetricsExporter(cfg configmodels.Exporter, pushMetricsData PushMetricsDa
141139
}
142140

143141
be := newBaseExporter(cfg, options...)
144-
145-
// Record metrics on the consumer.
146-
be.qSender.nextSender = &metricsSenderWithObservability{
147-
exporterName: cfg.Name(),
148-
sender: be.qSender.nextSender,
149-
}
142+
be.wrapConsumerSender(func(nextSender requestSender) requestSender {
143+
return &metricsSenderWithObservability{
144+
exporterName: cfg.Name(),
145+
nextSender: nextSender,
146+
}
147+
})
150148

151149
return &metricsExporter{
152150
baseExporter: be,
@@ -156,15 +154,15 @@ func NewMetricsExporter(cfg configmodels.Exporter, pushMetricsData PushMetricsDa
156154

157155
type metricsSenderWithObservability struct {
158156
exporterName string
159-
sender requestSender
157+
nextSender requestSender
160158
}
161159

162160
func (mewo *metricsSenderWithObservability) send(req request) (int, error) {
163161
req.setContext(obsreport.StartMetricsExportOp(req.context(), mewo.exporterName))
164-
numDroppedMetrics, err := mewo.sender.send(req)
162+
numDroppedMetrics, err := mewo.nextSender.send(req)
165163

166-
// TODO: this is not ideal: req should come from the next function itself.
167-
// temporarily loading req from internal format. Once full switch is done
164+
// TODO: this is not ideal: it should come from the next function itself.
165+
// temporarily loading it from internal format. Once full switch is done
168166
// to new metrics will remove this.
169167
mReq := req.(*metricsRequest)
170168
numReceivedMetrics, numPoints := pdatautil.MetricAndDataPointCount(mReq.md)

0 commit comments

Comments
 (0)