Skip to content

Commit 6988081

Browse files
committed
First round of comments addressed
1 parent eb11553 commit 6988081

11 files changed

+191
-190
lines changed

exporter/exporterhelper/common.go

+76-73
Original file line numberDiff line numberDiff line change
@@ -31,38 +31,36 @@ var (
3131
okStatus = trace.Status{Code: trace.StatusCodeOK}
3232
)
3333

34+
// Settings for timeout. The timeout applies to individual attempts to send data to the backend.
3435
type TimeoutSettings struct {
35-
// Timeout is the timeout for each operation.
36+
// Timeout is the timeout for every attempt to send data to the backend.
3637
Timeout time.Duration `mapstructure:"timeout"`
3738
}
3839

40+
// CreateDefaultTimeoutSettings returns the default settings for TimeoutSettings.
3941
func CreateDefaultTimeoutSettings() TimeoutSettings {
4042
return TimeoutSettings{
4143
Timeout: 5 * time.Second,
4244
}
4345
}
4446

45-
type settings struct {
46-
configmodels.Exporter
47-
TimeoutSettings
48-
QueuedSettings
49-
RetrySettings
50-
}
51-
47+
// request is an abstraction of an individual request (batch of data) independent of the type of the data (traces, metrics, logs).
5248
type request interface {
5349
context() context.Context
5450
setContext(context.Context)
5551
export(ctx context.Context) (int, error)
56-
// Returns a new queue request that contains the items left to be exported.
52+
// Returns a new request that contains the items left to be sent.
5753
onPartialError(consumererror.PartialError) request
58-
// Returns the cnt of spans/metric points or log records.
54+
// Returns the count of spans/metric points or log records.
5955
count() int
6056
}
6157

58+
// requestSender is an abstraction of a sender for a request independent of the type of the data (traces, metrics, logs).
6259
type requestSender interface {
6360
send(req request) (int, error)
6461
}
6562

63+
// baseRequest is a base implementation for the request.
6664
type baseRequest struct {
6765
ctx context.Context
6866
}
@@ -81,139 +79,144 @@ type Start func(context.Context, component.Host) error
8179
// Shutdown specifies the function invoked when the exporter is being shutdown.
8280
type Shutdown func(context.Context) error
8381

82+
// internalOptions represents all the options that users can configured.
83+
type internalOptions struct {
84+
TimeoutSettings
85+
QueueSettings
86+
RetrySettings
87+
Start
88+
Shutdown
89+
}
90+
91+
// fromConfiguredOptions returns the internal options starting from the default and applying all configured options.
92+
func fromConfiguredOptions(options ...ExporterOption) *internalOptions {
93+
// Start from the default options:
94+
opts := &internalOptions{
95+
TimeoutSettings: CreateDefaultTimeoutSettings(),
96+
// TODO: Enable queuing by default (call CreateDefaultQueueSettings)
97+
QueueSettings: QueueSettings{Disabled: true},
98+
// TODO: Enable retry by default (call CreateDefaultRetrySettings)
99+
RetrySettings: RetrySettings{Disabled: true},
100+
Start: func(ctx context.Context, host component.Host) error { return nil },
101+
Shutdown: func(ctx context.Context) error { return nil },
102+
}
103+
104+
for _, op := range options {
105+
op(opts)
106+
}
107+
108+
return opts
109+
}
110+
84111
// ExporterOption apply changes to internalOptions.
85-
type ExporterOption func(*baseExporter)
112+
type ExporterOption func(*internalOptions)
86113

87114
// WithShutdown overrides the default Shutdown function for an exporter.
88115
// The default shutdown function does nothing and always returns nil.
89116
func WithShutdown(shutdown Shutdown) ExporterOption {
90-
return func(o *baseExporter) {
91-
o.shutdown = shutdown
117+
return func(o *internalOptions) {
118+
o.Shutdown = shutdown
92119
}
93120
}
94121

95122
// WithStart overrides the default Start function for an exporter.
96123
// The default shutdown function does nothing and always returns nil.
97124
func WithStart(start Start) ExporterOption {
98-
return func(o *baseExporter) {
99-
o.start = start
125+
return func(o *internalOptions) {
126+
o.Start = start
100127
}
101128
}
102129

103-
// WithShutdown overrides the default TimeoutSettings for an exporter.
130+
// WithTimeout overrides the default TimeoutSettings for an exporter.
104131
// The default TimeoutSettings is 5 seconds.
105-
func WithTimeout(timeout TimeoutSettings) ExporterOption {
106-
return func(o *baseExporter) {
107-
o.cfg.TimeoutSettings = timeout
132+
func WithTimeout(timeoutSettings TimeoutSettings) ExporterOption {
133+
return func(o *internalOptions) {
134+
o.TimeoutSettings = timeoutSettings
108135
}
109136
}
110137

111138
// WithRetry overrides the default RetrySettings for an exporter.
112139
// The default RetrySettings is to disable retries.
113-
func WithRetry(retry RetrySettings) ExporterOption {
114-
return func(o *baseExporter) {
115-
o.cfg.RetrySettings = retry
140+
func WithRetry(retrySettings RetrySettings) ExporterOption {
141+
return func(o *internalOptions) {
142+
o.RetrySettings = retrySettings
116143
}
117144
}
118145

119-
// WithQueued overrides the default QueuedSettings for an exporter.
120-
// The default QueuedSettings is to disable queueing.
121-
func WithQueued(queued QueuedSettings) ExporterOption {
122-
return func(o *baseExporter) {
123-
o.cfg.QueuedSettings = queued
146+
// WithQueue overrides the default QueueSettings for an exporter.
147+
// The default QueueSettings is to disable queueing.
148+
func WithQueue(queueSettings QueueSettings) ExporterOption {
149+
return func(o *internalOptions) {
150+
o.QueueSettings = queueSettings
124151
}
125152
}
126153

127-
// internalOptions contains internalOptions concerning how an Exporter is configured.
154+
// baseExporter contains common fields between different exporter types.
128155
type baseExporter struct {
129-
cfg *settings
156+
cfg configmodels.Exporter
130157
sender requestSender
131158
rSender *retrySender
132-
qSender *queuedSender
159+
qrSender *queuedRetrySender
133160
start Start
134161
shutdown Shutdown
135162
startOnce sync.Once
136163
shutdownOnce sync.Once
137164
}
138165

139-
// Construct the internalOptions from multiple ExporterOption.
140166
func newBaseExporter(cfg configmodels.Exporter, options ...ExporterOption) *baseExporter {
167+
opts := fromConfiguredOptions(options...)
141168
be := &baseExporter{
142-
cfg: &settings{
143-
Exporter: cfg,
144-
TimeoutSettings: CreateDefaultTimeoutSettings(),
145-
// TODO: Enable queuing by default (call CreateDefaultQueuedSettings
146-
QueuedSettings: QueuedSettings{Disabled: true},
147-
// TODO: Enable retry by default (call CreateDefaultRetrySettings)
148-
RetrySettings: RetrySettings{Disabled: true},
149-
},
150-
}
151-
152-
for _, op := range options {
153-
op(be)
154-
}
155-
156-
if be.start == nil {
157-
be.start = func(ctx context.Context, host component.Host) error { return nil }
169+
cfg: cfg,
170+
start: opts.Start,
171+
shutdown: opts.Shutdown,
158172
}
159173

160-
if be.shutdown == nil {
161-
be.shutdown = func(ctx context.Context) error { return nil }
162-
}
163-
164-
be.sender = &timeoutSender{cfg: &be.cfg.TimeoutSettings}
165-
166-
be.rSender = newRetrySender(&be.cfg.RetrySettings, be.sender)
167-
be.sender = be.rSender
168-
169-
be.qSender = newQueuedSender(&be.cfg.QueuedSettings, be.sender)
170-
be.sender = be.qSender
174+
be.qrSender = newQueuedRetrySender(opts.QueueSettings, opts.RetrySettings, &timeoutSender{cfg: opts.TimeoutSettings})
175+
be.sender = be.qrSender
171176

172177
return be
173178
}
174179

180+
// Start all senders and exporter and is invoked during service start.
175181
func (be *baseExporter) Start(ctx context.Context, host component.Host) error {
176182
err := componenterror.ErrAlreadyStarted
177183
be.startOnce.Do(func() {
178-
// First start the nextSender
184+
// First start the wrapped exporter.
179185
err = be.start(ctx, host)
180186
if err != nil {
187+
// TODO: Log errors, or check if it is recorded by the caller.
181188
return
182189
}
183190

184-
// If no error then start the queuedSender
185-
be.qSender.start()
191+
// If no error then start the queuedRetrySender.
192+
be.qrSender.start()
186193
})
187194
return err
188195
}
189196

190-
// Shutdown stops the nextSender and is invoked during shutdown.
197+
// Shutdown all senders and exporter and is invoked during service shutdown.
191198
func (be *baseExporter) Shutdown(ctx context.Context) error {
192199
err := componenterror.ErrAlreadyStopped
193200
be.shutdownOnce.Do(func() {
194-
// First stop the retry goroutines
195-
be.rSender.shutdown()
196-
197-
// All operations will try to export once but will not retry because retrying was disabled when be.rSender stopped.
198-
be.qSender.shutdown()
199-
200-
// Last shutdown the nextSender itself.
201+
// Last shutdown the wrapped exporter itself.
201202
err = be.shutdown(ctx)
202203
})
203204
return err
204205
}
205206

207+
// timeoutSender is a request sender that adds a `timeout` to every request that passes this sender.
206208
type timeoutSender struct {
207-
cfg *TimeoutSettings
209+
cfg TimeoutSettings
208210
}
209211

210-
func (te *timeoutSender) send(req request) (int, error) {
212+
// send implements the requestSender interface
213+
func (ts *timeoutSender) send(req request) (int, error) {
211214
// Intentionally don't overwrite the context inside the request, because in case of retries deadline will not be
212215
// updated because this deadline most likely is before the next one.
213216
ctx := req.context()
214-
if te.cfg.Timeout > 0 {
217+
if ts.cfg.Timeout > 0 {
215218
var cancelFunc func()
216-
ctx, cancelFunc = context.WithTimeout(req.context(), te.cfg.Timeout)
219+
ctx, cancelFunc = context.WithTimeout(req.context(), ts.cfg.Timeout)
217220
defer cancelFunc()
218221
}
219222
return req.export(ctx)

exporter/exporterhelper/logshelper.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func (lexp *logsExporter) ConsumeLogs(ctx context.Context, ld data.Logs) error {
6666
return err
6767
}
6868

69-
// NewLogsExporter creates an LogsExporter that can record logs and can wrap every request with a Span.
69+
// NewLogsExporter creates an LogsExporter that records observability metrics and wraps every request with a Span.
7070
func NewLogsExporter(cfg configmodels.Exporter, pushLogsData PushLogsData, options ...ExporterOption) (component.LogExporter, error) {
7171
if cfg == nil {
7272
return nil, errNilConfig
@@ -79,9 +79,9 @@ func NewLogsExporter(cfg configmodels.Exporter, pushLogsData PushLogsData, optio
7979
be := newBaseExporter(cfg, options...)
8080

8181
// Record metrics on the consumer.
82-
be.qSender.nextSender = &logsExporterWithObservability{
82+
be.qrSender.consumerSender = &logsExporterWithObservability{
8383
exporterName: cfg.Name(),
84-
sender: be.qSender.nextSender,
84+
sender: be.qrSender.consumerSender,
8585
}
8686

8787
return &logsExporter{

exporter/exporterhelper/metricshelper.go

+6-8
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,7 @@ func (mexp *metricsExporterOld) ConsumeMetricsData(ctx context.Context, md consu
4141
return err
4242
}
4343

44-
// NewMetricsExporterOld creates an MetricsExporter that can record metrics and can wrap every request with a Span.
45-
// If no internalOptions are passed it just adds the nextSender format as a tag in the Context.
44+
// NewMetricsExporterOld creates an MetricsExporter that records observability metrics and wraps every request with a Span.
4645
// TODO: Add support for retries.
4746
func NewMetricsExporterOld(cfg configmodels.Exporter, pushMetricsData PushMetricsDataOld, options ...ExporterOption) (component.MetricsExporterOld, error) {
4847
if cfg == nil {
@@ -129,8 +128,7 @@ func (mexp *metricsExporter) ConsumeMetrics(ctx context.Context, md pdata.Metric
129128
return err
130129
}
131130

132-
// NewMetricsExporter creates an MetricsExporter that can record metrics and can wrap every request with a Span.
133-
// If no internalOptions are passed it just adds the nextSender format as a tag in the Context.
131+
// NewMetricsExporter creates an MetricsExporter that records observability metrics and wraps every request with a Span.
134132
func NewMetricsExporter(cfg configmodels.Exporter, pushMetricsData PushMetricsData, options ...ExporterOption) (component.MetricsExporter, error) {
135133
if cfg == nil {
136134
return nil, errNilConfig
@@ -143,9 +141,9 @@ func NewMetricsExporter(cfg configmodels.Exporter, pushMetricsData PushMetricsDa
143141
be := newBaseExporter(cfg, options...)
144142

145143
// Record metrics on the consumer.
146-
be.qSender.nextSender = &metricsSenderWithObservability{
144+
be.qrSender.consumerSender = &metricsSenderWithObservability{
147145
exporterName: cfg.Name(),
148-
sender: be.qSender.nextSender,
146+
sender: be.qrSender.consumerSender,
149147
}
150148

151149
return &metricsExporter{
@@ -163,8 +161,8 @@ func (mewo *metricsSenderWithObservability) send(req request) (int, error) {
163161
req.setContext(obsreport.StartMetricsExportOp(req.context(), mewo.exporterName))
164162
numDroppedMetrics, err := mewo.sender.send(req)
165163

166-
// TODO: this is not ideal: req should come from the next function itself.
167-
// temporarily loading req from internal format. Once full switch is done
164+
// TODO: this is not ideal: it should come from the next function itself.
165+
// temporarily loading it from internal format. Once full switch is done
168166
// to new metrics will remove this.
169167
mReq := req.(*metricsRequest)
170168
numReceivedMetrics, numPoints := pdatautil.MetricAndDataPointCount(mReq.md)

0 commit comments

Comments
 (0)