Skip to content

Commit f7508a2

Browse files
committed
First round of comments addressed
1 parent eb11553 commit f7508a2

File tree

14 files changed

+337
-271
lines changed

14 files changed

+337
-271
lines changed

exporter/exporterhelper/common.go

+86-74
Original file line numberDiff line numberDiff line change
@@ -31,38 +31,38 @@ var (
3131
okStatus = trace.Status{Code: trace.StatusCodeOK}
3232
)
3333

34+
// Settings for timeout. The timeout applies to individual attempts to send data to the backend.
3435
type TimeoutSettings struct {
35-
// Timeout is the timeout for each operation.
36+
// Timeout is the timeout for every attempt to send data to the backend.
3637
Timeout time.Duration `mapstructure:"timeout"`
3738
}
3839

40+
// CreateDefaultTimeoutSettings returns the default settings for TimeoutSettings.
3941
func CreateDefaultTimeoutSettings() TimeoutSettings {
4042
return TimeoutSettings{
4143
Timeout: 5 * time.Second,
4244
}
4345
}
4446

45-
type settings struct {
46-
configmodels.Exporter
47-
TimeoutSettings
48-
QueuedSettings
49-
RetrySettings
50-
}
51-
47+
// request is an abstraction of an individual request (batch of data) independent of the type of the data (traces, metrics, logs).
5248
type request interface {
49+
// context returns the Context of the requests.
5350
context() context.Context
51+
// setContext updates the Context of the requests.
5452
setContext(context.Context)
5553
export(ctx context.Context) (int, error)
56-
// Returns a new queue request that contains the items left to be exported.
54+
// Returns a new request that contains the items left to be sent.
5755
onPartialError(consumererror.PartialError) request
58-
// Returns the cnt of spans/metric points or log records.
56+
// Returns the count of spans/metric points or log records.
5957
count() int
6058
}
6159

60+
// requestSender is an abstraction of a sender for a request independent of the type of the data (traces, metrics, logs).
6261
type requestSender interface {
6362
send(req request) (int, error)
6463
}
6564

65+
// baseRequest is a base implementation for the request.
6666
type baseRequest struct {
6767
ctx context.Context
6868
}
@@ -81,139 +81,151 @@ type Start func(context.Context, component.Host) error
8181
// Shutdown specifies the function invoked when the exporter is being shutdown.
8282
type Shutdown func(context.Context) error
8383

84+
// internalOptions represents all the options that users can configure.
85+
type internalOptions struct {
86+
TimeoutSettings
87+
QueueSettings
88+
RetrySettings
89+
Start
90+
Shutdown
91+
}
92+
93+
// fromConfiguredOptions returns the internal options starting from the default and applying all configured options.
94+
func fromConfiguredOptions(options ...ExporterOption) *internalOptions {
95+
// Start from the default options:
96+
opts := &internalOptions{
97+
TimeoutSettings: CreateDefaultTimeoutSettings(),
98+
// TODO: Enable queuing by default (call CreateDefaultQueueSettings)
99+
QueueSettings: QueueSettings{Disabled: true},
100+
// TODO: Enable retry by default (call CreateDefaultRetrySettings)
101+
RetrySettings: RetrySettings{Disabled: true},
102+
Start: func(ctx context.Context, host component.Host) error { return nil },
103+
Shutdown: func(ctx context.Context) error { return nil },
104+
}
105+
106+
for _, op := range options {
107+
op(opts)
108+
}
109+
110+
return opts
111+
}
112+
84113
// ExporterOption apply changes to internalOptions.
85-
type ExporterOption func(*baseExporter)
114+
type ExporterOption func(*internalOptions)
86115

87116
// WithShutdown overrides the default Shutdown function for an exporter.
88117
// The default shutdown function does nothing and always returns nil.
89118
func WithShutdown(shutdown Shutdown) ExporterOption {
90-
return func(o *baseExporter) {
91-
o.shutdown = shutdown
119+
return func(o *internalOptions) {
120+
o.Shutdown = shutdown
92121
}
93122
}
94123

95124
// WithStart overrides the default Start function for an exporter.
96125
// The default shutdown function does nothing and always returns nil.
97126
func WithStart(start Start) ExporterOption {
98-
return func(o *baseExporter) {
99-
o.start = start
127+
return func(o *internalOptions) {
128+
o.Start = start
100129
}
101130
}
102131

103-
// WithShutdown overrides the default TimeoutSettings for an exporter.
132+
// WithTimeout overrides the default TimeoutSettings for an exporter.
104133
// The default TimeoutSettings is 5 seconds.
105-
func WithTimeout(timeout TimeoutSettings) ExporterOption {
106-
return func(o *baseExporter) {
107-
o.cfg.TimeoutSettings = timeout
134+
func WithTimeout(timeoutSettings TimeoutSettings) ExporterOption {
135+
return func(o *internalOptions) {
136+
o.TimeoutSettings = timeoutSettings
108137
}
109138
}
110139

111140
// WithRetry overrides the default RetrySettings for an exporter.
112141
// The default RetrySettings is to disable retries.
113-
func WithRetry(retry RetrySettings) ExporterOption {
114-
return func(o *baseExporter) {
115-
o.cfg.RetrySettings = retry
142+
func WithRetry(retrySettings RetrySettings) ExporterOption {
143+
return func(o *internalOptions) {
144+
o.RetrySettings = retrySettings
116145
}
117146
}
118147

119-
// WithQueued overrides the default QueuedSettings for an exporter.
120-
// The default QueuedSettings is to disable queueing.
121-
func WithQueued(queued QueuedSettings) ExporterOption {
122-
return func(o *baseExporter) {
123-
o.cfg.QueuedSettings = queued
148+
// WithQueue overrides the default QueueSettings for an exporter.
149+
// The default QueueSettings is to disable queueing.
150+
func WithQueue(queueSettings QueueSettings) ExporterOption {
151+
return func(o *internalOptions) {
152+
o.QueueSettings = queueSettings
124153
}
125154
}
126155

127-
// internalOptions contains internalOptions concerning how an Exporter is configured.
156+
// baseExporter contains common fields between different exporter types.
128157
type baseExporter struct {
129-
cfg *settings
158+
cfg configmodels.Exporter
130159
sender requestSender
131-
rSender *retrySender
132-
qSender *queuedSender
160+
qrSender *queuedRetrySender
133161
start Start
134162
shutdown Shutdown
135163
startOnce sync.Once
136164
shutdownOnce sync.Once
137165
}
138166

139-
// Construct the internalOptions from multiple ExporterOption.
140167
func newBaseExporter(cfg configmodels.Exporter, options ...ExporterOption) *baseExporter {
168+
opts := fromConfiguredOptions(options...)
141169
be := &baseExporter{
142-
cfg: &settings{
143-
Exporter: cfg,
144-
TimeoutSettings: CreateDefaultTimeoutSettings(),
145-
// TODO: Enable queuing by default (call CreateDefaultQueuedSettings
146-
QueuedSettings: QueuedSettings{Disabled: true},
147-
// TODO: Enable retry by default (call CreateDefaultRetrySettings)
148-
RetrySettings: RetrySettings{Disabled: true},
149-
},
170+
cfg: cfg,
171+
start: opts.Start,
172+
shutdown: opts.Shutdown,
150173
}
151174

152-
for _, op := range options {
153-
op(be)
154-
}
155-
156-
if be.start == nil {
157-
be.start = func(ctx context.Context, host component.Host) error { return nil }
158-
}
159-
160-
if be.shutdown == nil {
161-
be.shutdown = func(ctx context.Context) error { return nil }
162-
}
163-
164-
be.sender = &timeoutSender{cfg: &be.cfg.TimeoutSettings}
165-
166-
be.rSender = newRetrySender(&be.cfg.RetrySettings, be.sender)
167-
be.sender = be.rSender
168-
169-
be.qSender = newQueuedSender(&be.cfg.QueuedSettings, be.sender)
170-
be.sender = be.qSender
175+
be.qrSender = newQueuedRetrySender(opts.QueueSettings, opts.RetrySettings, &timeoutSender{cfg: opts.TimeoutSettings})
176+
be.sender = be.qrSender
171177

172178
return be
173179
}
174180

181+
// wrapConsumerSender wraps the consumer sender (the sender that uses retries and timeout) with the given wrapper.
182+
// This can be used to wrap with observability (create spans, record metrics) the consumer sender.
183+
func (be *baseExporter) wrapConsumerSender(f func(consumer requestSender) requestSender) {
184+
be.qrSender.consumerSender = f(be.qrSender.consumerSender)
185+
}
186+
187+
// Start all senders and exporter and is invoked during service start.
175188
func (be *baseExporter) Start(ctx context.Context, host component.Host) error {
176189
err := componenterror.ErrAlreadyStarted
177190
be.startOnce.Do(func() {
178-
// First start the nextSender
191+
// First start the wrapped exporter.
179192
err = be.start(ctx, host)
180193
if err != nil {
194+
// TODO: Log errors, or check if it is recorded by the caller.
181195
return
182196
}
183197

184-
// If no error then start the queuedSender
185-
be.qSender.start()
198+
// If no error then start the queuedRetrySender.
199+
be.qrSender.start()
186200
})
187201
return err
188202
}
189203

190-
// Shutdown stops the nextSender and is invoked during shutdown.
204+
// Shutdown all senders and exporter and is invoked during service shutdown.
191205
func (be *baseExporter) Shutdown(ctx context.Context) error {
192206
err := componenterror.ErrAlreadyStopped
193207
be.shutdownOnce.Do(func() {
194-
// First stop the retry goroutines
195-
be.rSender.shutdown()
196-
197-
// All operations will try to export once but will not retry because retrying was disabled when be.rSender stopped.
198-
be.qSender.shutdown()
199-
200-
// Last shutdown the nextSender itself.
208+
// First shutdown the queued retry sender
209+
be.qrSender.shutdown()
210+
// Last shutdown the wrapped exporter itself.
201211
err = be.shutdown(ctx)
202212
})
203213
return err
204214
}
205215

216+
// timeoutSender is a request sender that adds a `timeout` to every request that passes this sender.
206217
type timeoutSender struct {
207-
cfg *TimeoutSettings
218+
cfg TimeoutSettings
208219
}
209220

210-
func (te *timeoutSender) send(req request) (int, error) {
221+
// send implements the requestSender interface
222+
func (ts *timeoutSender) send(req request) (int, error) {
211223
// Intentionally don't overwrite the context inside the request, because in case of retries deadline will not be
212224
// updated because this deadline most likely is before the next one.
213225
ctx := req.context()
214-
if te.cfg.Timeout > 0 {
226+
if ts.cfg.Timeout > 0 {
215227
var cancelFunc func()
216-
ctx, cancelFunc = context.WithTimeout(req.context(), te.cfg.Timeout)
228+
ctx, cancelFunc = context.WithTimeout(req.context(), ts.cfg.Timeout)
217229
defer cancelFunc()
218230
}
219231
return req.export(ctx)

exporter/exporterhelper/logshelper.go

+9-9
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func (lexp *logsExporter) ConsumeLogs(ctx context.Context, ld data.Logs) error {
6666
return err
6767
}
6868

69-
// NewLogsExporter creates an LogsExporter that can record logs and can wrap every request with a Span.
69+
// NewLogsExporter creates an LogsExporter that records observability metrics and wraps every request with a Span.
7070
func NewLogsExporter(cfg configmodels.Exporter, pushLogsData PushLogsData, options ...ExporterOption) (component.LogExporter, error) {
7171
if cfg == nil {
7272
return nil, errNilConfig
@@ -77,12 +77,12 @@ func NewLogsExporter(cfg configmodels.Exporter, pushLogsData PushLogsData, optio
7777
}
7878

7979
be := newBaseExporter(cfg, options...)
80-
81-
// Record metrics on the consumer.
82-
be.qSender.nextSender = &logsExporterWithObservability{
83-
exporterName: cfg.Name(),
84-
sender: be.qSender.nextSender,
85-
}
80+
be.wrapConsumerSender(func(nextSender requestSender) requestSender {
81+
return &logsExporterWithObservability{
82+
exporterName: cfg.Name(),
83+
nextSender: nextSender,
84+
}
85+
})
8686

8787
return &logsExporter{
8888
baseExporter: be,
@@ -92,12 +92,12 @@ func NewLogsExporter(cfg configmodels.Exporter, pushLogsData PushLogsData, optio
9292

9393
type logsExporterWithObservability struct {
9494
exporterName string
95-
sender requestSender
95+
nextSender requestSender
9696
}
9797

9898
func (lewo *logsExporterWithObservability) send(req request) (int, error) {
9999
req.setContext(obsreport.StartLogsExportOp(req.context(), lewo.exporterName))
100-
numDroppedLogs, err := lewo.sender.send(req)
100+
numDroppedLogs, err := lewo.nextSender.send(req)
101101
obsreport.EndLogsExportOp(req.context(), req.count(), numDroppedLogs, err)
102102
return numDroppedLogs, err
103103
}

exporter/exporterhelper/metricshelper.go

+12-14
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,7 @@ func (mexp *metricsExporterOld) ConsumeMetricsData(ctx context.Context, md consu
4141
return err
4242
}
4343

44-
// NewMetricsExporterOld creates an MetricsExporter that can record metrics and can wrap every request with a Span.
45-
// If no internalOptions are passed it just adds the nextSender format as a tag in the Context.
44+
// NewMetricsExporterOld creates an MetricsExporter that records observability metrics and wraps every request with a Span.
4645
// TODO: Add support for retries.
4746
func NewMetricsExporterOld(cfg configmodels.Exporter, pushMetricsData PushMetricsDataOld, options ...ExporterOption) (component.MetricsExporterOld, error) {
4847
if cfg == nil {
@@ -129,8 +128,7 @@ func (mexp *metricsExporter) ConsumeMetrics(ctx context.Context, md pdata.Metric
129128
return err
130129
}
131130

132-
// NewMetricsExporter creates an MetricsExporter that can record metrics and can wrap every request with a Span.
133-
// If no internalOptions are passed it just adds the nextSender format as a tag in the Context.
131+
// NewMetricsExporter creates an MetricsExporter that records observability metrics and wraps every request with a Span.
134132
func NewMetricsExporter(cfg configmodels.Exporter, pushMetricsData PushMetricsData, options ...ExporterOption) (component.MetricsExporter, error) {
135133
if cfg == nil {
136134
return nil, errNilConfig
@@ -141,12 +139,12 @@ func NewMetricsExporter(cfg configmodels.Exporter, pushMetricsData PushMetricsDa
141139
}
142140

143141
be := newBaseExporter(cfg, options...)
144-
145-
// Record metrics on the consumer.
146-
be.qSender.nextSender = &metricsSenderWithObservability{
147-
exporterName: cfg.Name(),
148-
sender: be.qSender.nextSender,
149-
}
142+
be.wrapConsumerSender(func(nextSender requestSender) requestSender {
143+
return &metricsSenderWithObservability{
144+
exporterName: cfg.Name(),
145+
nextSender: nextSender,
146+
}
147+
})
150148

151149
return &metricsExporter{
152150
baseExporter: be,
@@ -156,15 +154,15 @@ func NewMetricsExporter(cfg configmodels.Exporter, pushMetricsData PushMetricsDa
156154

157155
type metricsSenderWithObservability struct {
158156
exporterName string
159-
sender requestSender
157+
nextSender requestSender
160158
}
161159

162160
func (mewo *metricsSenderWithObservability) send(req request) (int, error) {
163161
req.setContext(obsreport.StartMetricsExportOp(req.context(), mewo.exporterName))
164-
numDroppedMetrics, err := mewo.sender.send(req)
162+
numDroppedMetrics, err := mewo.nextSender.send(req)
165163

166-
// TODO: this is not ideal: req should come from the next function itself.
167-
// temporarily loading req from internal format. Once full switch is done
164+
// TODO: this is not ideal: it should come from the next function itself.
165+
// temporarily loading it from internal format. Once full switch is done
168166
// to new metrics will remove this.
169167
mReq := req.(*metricsRequest)
170168
numReceivedMetrics, numPoints := pdatautil.MetricAndDataPointCount(mReq.md)

0 commit comments

Comments
 (0)