@@ -16,16 +16,65 @@ package exporterhelper
16
16
17
17
import (
18
18
"context"
19
+ "sync"
20
+ "time"
19
21
20
22
"go.opencensus.io/trace"
21
23
22
24
"go.opentelemetry.io/collector/component"
25
+ "go.opentelemetry.io/collector/component/componenterror"
26
+ "go.opentelemetry.io/collector/config/configmodels"
27
+ "go.opentelemetry.io/collector/consumer/consumererror"
23
28
)
24
29
25
30
var (
26
31
okStatus = trace.Status {Code : trace .StatusCodeOK }
27
32
)
28
33
34
+ type TimeoutSettings struct {
35
+ // Timeout is the timeout for each operation.
36
+ Timeout time.Duration `mapstructure:"timeout"`
37
+ }
38
+
39
+ func CreateDefaultTimeoutSettings () TimeoutSettings {
40
+ return TimeoutSettings {
41
+ Timeout : 5 * time .Second ,
42
+ }
43
+ }
44
+
45
+ type settings struct {
46
+ configmodels.Exporter
47
+ TimeoutSettings
48
+ QueuedSettings
49
+ RetrySettings
50
+ }
51
+
52
+ type request interface {
53
+ context () context.Context
54
+ setContext (context.Context )
55
+ export (ctx context.Context ) (int , error )
56
+ // Returns a new queue request that contains the items left to be exported.
57
+ onPartialError (consumererror.PartialError ) request
58
+ // Returns the cnt of spans/metric points or log records.
59
+ count () int
60
+ }
61
+
62
+ type requestSender interface {
63
+ send (req request ) (int , error )
64
+ }
65
+
66
+ type baseRequest struct {
67
+ ctx context.Context
68
+ }
69
+
70
+ func (req * baseRequest ) context () context.Context {
71
+ return req .ctx
72
+ }
73
+
74
+ func (req * baseRequest ) setContext (ctx context.Context ) {
75
+ req .ctx = ctx
76
+ }
77
+
29
78
// Start specifies the function invoked when the exporter is being started.
30
79
type Start func (context.Context , component.Host ) error
31
80
@@ -51,37 +100,121 @@ func WithStart(start Start) ExporterOption {
51
100
}
52
101
}
53
102
103
+ // WithShutdown overrides the default TimeoutSettings for an exporter.
104
+ // The default TimeoutSettings is 5 seconds.
105
+ func WithTimeout (timeout TimeoutSettings ) ExporterOption {
106
+ return func (o * baseExporter ) {
107
+ o .cfg .TimeoutSettings = timeout
108
+ }
109
+ }
110
+
111
+ // WithRetry overrides the default RetrySettings for an exporter.
112
+ // The default RetrySettings is to disable retries.
113
+ func WithRetry (retry RetrySettings ) ExporterOption {
114
+ return func (o * baseExporter ) {
115
+ o .cfg .RetrySettings = retry
116
+ }
117
+ }
118
+
119
+ // WithQueued overrides the default QueuedSettings for an exporter.
120
+ // The default QueuedSettings is to disable queueing.
121
+ func WithQueued (queued QueuedSettings ) ExporterOption {
122
+ return func (o * baseExporter ) {
123
+ o .cfg .QueuedSettings = queued
124
+ }
125
+ }
126
+
54
127
// internalOptions contains internalOptions concerning how an Exporter is configured.
55
128
type baseExporter struct {
56
- exporterFullName string
57
- start Start
58
- shutdown Shutdown
129
+ cfg * settings
130
+ sender requestSender
131
+ rSender * retrySender
132
+ qSender * queuedSender
133
+ start Start
134
+ shutdown Shutdown
135
+ startOnce sync.Once
136
+ shutdownOnce sync.Once
59
137
}
60
138
61
139
// Construct the internalOptions from multiple ExporterOption.
62
- func newBaseExporter (exporterFullName string , options ... ExporterOption ) baseExporter {
63
- be := baseExporter {
64
- exporterFullName : exporterFullName ,
140
+ func newBaseExporter (cfg configmodels.Exporter , options ... ExporterOption ) * baseExporter {
141
+ be := & baseExporter {
142
+ cfg : & settings {
143
+ Exporter : cfg ,
144
+ TimeoutSettings : CreateDefaultTimeoutSettings (),
145
+ // TODO: Enable queuing by default (call CreateDefaultQueuedSettings
146
+ QueuedSettings : QueuedSettings {Disabled : true },
147
+ // TODO: Enable retry by default (call CreateDefaultRetrySettings)
148
+ RetrySettings : RetrySettings {Disabled : true },
149
+ },
65
150
}
66
151
67
152
for _ , op := range options {
68
- op (& be )
153
+ op (be )
154
+ }
155
+
156
+ if be .start == nil {
157
+ be .start = func (ctx context.Context , host component.Host ) error { return nil }
69
158
}
70
159
160
+ if be .shutdown == nil {
161
+ be .shutdown = func (ctx context.Context ) error { return nil }
162
+ }
163
+
164
+ be .sender = & timeoutSender {cfg : & be .cfg .TimeoutSettings }
165
+
166
+ be .rSender = newRetrySender (& be .cfg .RetrySettings , be .sender )
167
+ be .sender = be .rSender
168
+
169
+ be .qSender = newQueuedSender (& be .cfg .QueuedSettings , be .sender )
170
+ be .sender = be .qSender
171
+
71
172
return be
72
173
}
73
174
74
175
func (be * baseExporter ) Start (ctx context.Context , host component.Host ) error {
75
- if be .start != nil {
76
- return be .start (ctx , host )
77
- }
78
- return nil
176
+ err := componenterror .ErrAlreadyStarted
177
+ be .startOnce .Do (func () {
178
+ // First start the nextSender
179
+ err = be .start (ctx , host )
180
+ if err != nil {
181
+ return
182
+ }
183
+
184
+ // If no error then start the queuedSender
185
+ be .qSender .start ()
186
+ })
187
+ return err
79
188
}
80
189
81
- // Shutdown stops the exporter and is invoked during shutdown.
190
+ // Shutdown stops the nextSender and is invoked during shutdown.
82
191
func (be * baseExporter ) Shutdown (ctx context.Context ) error {
83
- if be .shutdown != nil {
84
- return be .shutdown (ctx )
192
+ err := componenterror .ErrAlreadyStopped
193
+ be .shutdownOnce .Do (func () {
194
+ // First stop the retry goroutines
195
+ be .rSender .shutdown ()
196
+
197
+ // All operations will try to export once but will not retry because retrying was disabled when be.rSender stopped.
198
+ be .qSender .shutdown ()
199
+
200
+ // Last shutdown the nextSender itself.
201
+ err = be .shutdown (ctx )
202
+ })
203
+ return err
204
+ }
205
+
206
+ type timeoutSender struct {
207
+ cfg * TimeoutSettings
208
+ }
209
+
210
+ func (te * timeoutSender ) send (req request ) (int , error ) {
211
+ // Intentionally don't overwrite the context inside the request, because in case of retries deadline will not be
212
+ // updated because this deadline most likely is before the next one.
213
+ ctx := req .context ()
214
+ if te .cfg .Timeout > 0 {
215
+ var cancelFunc func ()
216
+ ctx , cancelFunc = context .WithTimeout (req .context (), te .cfg .Timeout )
217
+ defer cancelFunc ()
85
218
}
86
- return nil
219
+ return req . export ( ctx )
87
220
}
0 commit comments