@@ -31,38 +31,36 @@ var (
31
31
okStatus = trace.Status {Code : trace .StatusCodeOK }
32
32
)
33
33
34
+ // Settings for timeout. The timeout applies to individual attempts to send data to the backend.
34
35
type TimeoutSettings struct {
35
- // Timeout is the timeout for each operation .
36
+ // Timeout is the timeout for every attempt to send data to the backend .
36
37
Timeout time.Duration `mapstructure:"timeout"`
37
38
}
38
39
40
+ // CreateDefaultTimeoutSettings returns the default settings for TimeoutSettings.
39
41
func CreateDefaultTimeoutSettings () TimeoutSettings {
40
42
return TimeoutSettings {
41
43
Timeout : 5 * time .Second ,
42
44
}
43
45
}
44
46
45
- type settings struct {
46
- configmodels.Exporter
47
- TimeoutSettings
48
- QueuedSettings
49
- RetrySettings
50
- }
51
-
47
+ // request is an abstraction of an individual request (batch of data) independent of the type of the data (traces, metrics, logs).
52
48
type request interface {
53
49
context () context.Context
54
50
setContext (context.Context )
55
51
export (ctx context.Context ) (int , error )
56
- // Returns a new queue request that contains the items left to be exported .
52
+ // Returns a new request that contains the items left to be sent .
57
53
onPartialError (consumererror.PartialError ) request
58
- // Returns the cnt of spans/metric points or log records.
54
+ // Returns the count of spans/metric points or log records.
59
55
count () int
60
56
}
61
57
58
+ // requestSender is an abstraction of a sender for a request independent of the type of the data (traces, metrics, logs).
62
59
type requestSender interface {
63
60
send (req request ) (int , error )
64
61
}
65
62
63
+ // baseRequest is a base implementation for the request.
66
64
type baseRequest struct {
67
65
ctx context.Context
68
66
}
@@ -81,139 +79,151 @@ type Start func(context.Context, component.Host) error
81
79
// Shutdown specifies the function invoked when the exporter is being shutdown.
82
80
type Shutdown func (context.Context ) error
83
81
82
+ // internalOptions represents all the options that users can configured.
83
+ type internalOptions struct {
84
+ TimeoutSettings
85
+ QueueSettings
86
+ RetrySettings
87
+ Start
88
+ Shutdown
89
+ }
90
+
91
+ // fromConfiguredOptions returns the internal options starting from the default and applying all configured options.
92
+ func fromConfiguredOptions (options ... ExporterOption ) * internalOptions {
93
+ // Start from the default options:
94
+ opts := & internalOptions {
95
+ TimeoutSettings : CreateDefaultTimeoutSettings (),
96
+ // TODO: Enable queuing by default (call CreateDefaultQueueSettings)
97
+ QueueSettings : QueueSettings {Disabled : true },
98
+ // TODO: Enable retry by default (call CreateDefaultRetrySettings)
99
+ RetrySettings : RetrySettings {Disabled : true },
100
+ Start : func (ctx context.Context , host component.Host ) error { return nil },
101
+ Shutdown : func (ctx context.Context ) error { return nil },
102
+ }
103
+
104
+ for _ , op := range options {
105
+ op (opts )
106
+ }
107
+
108
+ return opts
109
+ }
110
+
84
111
// ExporterOption apply changes to internalOptions.
85
- type ExporterOption func (* baseExporter )
112
+ type ExporterOption func (* internalOptions )
86
113
87
114
// WithShutdown overrides the default Shutdown function for an exporter.
88
115
// The default shutdown function does nothing and always returns nil.
89
116
func WithShutdown (shutdown Shutdown ) ExporterOption {
90
- return func (o * baseExporter ) {
91
- o .shutdown = shutdown
117
+ return func (o * internalOptions ) {
118
+ o .Shutdown = shutdown
92
119
}
93
120
}
94
121
95
122
// WithStart overrides the default Start function for an exporter.
96
123
// The default shutdown function does nothing and always returns nil.
97
124
func WithStart (start Start ) ExporterOption {
98
- return func (o * baseExporter ) {
99
- o .start = start
125
+ return func (o * internalOptions ) {
126
+ o .Start = start
100
127
}
101
128
}
102
129
103
- // WithShutdown overrides the default TimeoutSettings for an exporter.
130
+ // WithTimeout overrides the default TimeoutSettings for an exporter.
104
131
// The default TimeoutSettings is 5 seconds.
105
- func WithTimeout (timeout TimeoutSettings ) ExporterOption {
106
- return func (o * baseExporter ) {
107
- o .cfg . TimeoutSettings = timeout
132
+ func WithTimeout (timeoutSettings TimeoutSettings ) ExporterOption {
133
+ return func (o * internalOptions ) {
134
+ o .TimeoutSettings = timeoutSettings
108
135
}
109
136
}
110
137
111
138
// WithRetry overrides the default RetrySettings for an exporter.
112
139
// The default RetrySettings is to disable retries.
113
- func WithRetry (retry RetrySettings ) ExporterOption {
114
- return func (o * baseExporter ) {
115
- o .cfg . RetrySettings = retry
140
+ func WithRetry (retrySettings RetrySettings ) ExporterOption {
141
+ return func (o * internalOptions ) {
142
+ o .RetrySettings = retrySettings
116
143
}
117
144
}
118
145
119
- // WithQueued overrides the default QueuedSettings for an exporter.
120
- // The default QueuedSettings is to disable queueing.
121
- func WithQueued ( queued QueuedSettings ) ExporterOption {
122
- return func (o * baseExporter ) {
123
- o .cfg . QueuedSettings = queued
146
+ // WithQueue overrides the default QueueSettings for an exporter.
147
+ // The default QueueSettings is to disable queueing.
148
+ func WithQueue ( queueSettings QueueSettings ) ExporterOption {
149
+ return func (o * internalOptions ) {
150
+ o .QueueSettings = queueSettings
124
151
}
125
152
}
126
153
127
- // internalOptions contains internalOptions concerning how an Exporter is configured .
154
+ // baseExporter contains common fields between different exporter types .
128
155
type baseExporter struct {
129
- cfg * settings
156
+ cfg configmodels. Exporter
130
157
sender requestSender
131
- rSender * retrySender
132
- qSender * queuedSender
158
+ qrSender * queuedRetrySender
133
159
start Start
134
160
shutdown Shutdown
135
161
startOnce sync.Once
136
162
shutdownOnce sync.Once
137
163
}
138
164
139
- // Construct the internalOptions from multiple ExporterOption.
140
165
func newBaseExporter (cfg configmodels.Exporter , options ... ExporterOption ) * baseExporter {
166
+ opts := fromConfiguredOptions (options ... )
141
167
be := & baseExporter {
142
- cfg : & settings {
143
- Exporter : cfg ,
144
- TimeoutSettings : CreateDefaultTimeoutSettings (),
145
- // TODO: Enable queuing by default (call CreateDefaultQueuedSettings
146
- QueuedSettings : QueuedSettings {Disabled : true },
147
- // TODO: Enable retry by default (call CreateDefaultRetrySettings)
148
- RetrySettings : RetrySettings {Disabled : true },
149
- },
168
+ cfg : cfg ,
169
+ start : opts .Start ,
170
+ shutdown : opts .Shutdown ,
150
171
}
151
172
152
- for _ , op := range options {
153
- op (be )
154
- }
155
-
156
- if be .start == nil {
157
- be .start = func (ctx context.Context , host component.Host ) error { return nil }
158
- }
159
-
160
- if be .shutdown == nil {
161
- be .shutdown = func (ctx context.Context ) error { return nil }
162
- }
163
-
164
- be .sender = & timeoutSender {cfg : & be .cfg .TimeoutSettings }
165
-
166
- be .rSender = newRetrySender (& be .cfg .RetrySettings , be .sender )
167
- be .sender = be .rSender
168
-
169
- be .qSender = newQueuedSender (& be .cfg .QueuedSettings , be .sender )
170
- be .sender = be .qSender
173
+ be .qrSender = newQueuedRetrySender (opts .QueueSettings , opts .RetrySettings , & timeoutSender {cfg : opts .TimeoutSettings })
174
+ be .sender = be .qrSender
171
175
172
176
return be
173
177
}
174
178
179
+ // wrapConsumerSender wraps the consumer sender (the sender that uses retries and timeout) with the given wrapper.
180
+ // This can be used to wrap with observability (create spans, record metrics) the consumer sender.
181
+ func (be * baseExporter ) wrapConsumerSender (f func (consumer requestSender ) requestSender ) {
182
+ be .qrSender .consumerSender = f (be .qrSender .consumerSender )
183
+ }
184
+
185
+ // Start all senders and exporter and is invoked during service start.
175
186
func (be * baseExporter ) Start (ctx context.Context , host component.Host ) error {
176
187
err := componenterror .ErrAlreadyStarted
177
188
be .startOnce .Do (func () {
178
- // First start the nextSender
189
+ // First start the wrapped exporter.
179
190
err = be .start (ctx , host )
180
191
if err != nil {
192
+ // TODO: Log errors, or check if it is recorded by the caller.
181
193
return
182
194
}
183
195
184
- // If no error then start the queuedSender
185
- be .qSender .start ()
196
+ // If no error then start the queuedRetrySender.
197
+ be .qrSender .start ()
186
198
})
187
199
return err
188
200
}
189
201
190
- // Shutdown stops the nextSender and is invoked during shutdown.
202
+ // Shutdown all senders and exporter and is invoked during service shutdown.
191
203
func (be * baseExporter ) Shutdown (ctx context.Context ) error {
192
204
err := componenterror .ErrAlreadyStopped
193
205
be .shutdownOnce .Do (func () {
194
- // First stop the retry goroutines
195
- be .rSender .shutdown ()
196
-
197
- // All operations will try to export once but will not retry because retrying was disabled when be.rSender stopped.
198
- be .qSender .shutdown ()
199
-
200
- // Last shutdown the nextSender itself.
206
+ // First shutdown the queued retry sender
207
+ be .qrSender .shutdown ()
208
+ // Last shutdown the wrapped exporter itself.
201
209
err = be .shutdown (ctx )
202
210
})
203
211
return err
204
212
}
205
213
214
+ // timeoutSender is a request sender that adds a `timeout` to every request that passes this sender.
206
215
type timeoutSender struct {
207
- cfg * TimeoutSettings
216
+ cfg TimeoutSettings
208
217
}
209
218
210
- func (te * timeoutSender ) send (req request ) (int , error ) {
219
+ // send implements the requestSender interface
220
+ func (ts * timeoutSender ) send (req request ) (int , error ) {
211
221
// Intentionally don't overwrite the context inside the request, because in case of retries deadline will not be
212
222
// updated because this deadline most likely is before the next one.
213
223
ctx := req .context ()
214
- if te .cfg .Timeout > 0 {
224
+ if ts .cfg .Timeout > 0 {
215
225
var cancelFunc func ()
216
- ctx , cancelFunc = context .WithTimeout (req .context (), te .cfg .Timeout )
226
+ ctx , cancelFunc = context .WithTimeout (req .context (), ts .cfg .Timeout )
217
227
defer cancelFunc ()
218
228
}
219
229
return req .export (ctx )
0 commit comments