@@ -16,10 +16,14 @@ package static
16
16
17
17
import (
18
18
"bytes"
19
+ "context"
19
20
"encoding/gob"
20
21
"encoding/json"
21
22
"fmt"
22
23
"io/ioutil"
24
+ "path/filepath"
25
+ "sync/atomic"
26
+ "time"
23
27
24
28
"go.uber.org/zap"
25
29
@@ -30,31 +34,89 @@ import (
30
34
type strategyStore struct {
31
35
logger * zap.Logger
32
36
37
+ storedStrategies atomic.Value // holds *storedStrategies
38
+
39
+ ctx context.Context
40
+ cancelFunc context.CancelFunc
41
+ }
42
+
43
+ type storedStrategies struct {
33
44
defaultStrategy * sampling.SamplingStrategyResponse
34
45
serviceStrategies map [string ]* sampling.SamplingStrategyResponse
35
46
}
36
47
37
48
// NewStrategyStore creates a strategy store that holds static sampling strategies.
38
49
func NewStrategyStore (options Options , logger * zap.Logger ) (ss.StrategyStore , error ) {
50
+ ctx , cancelFunc := context .WithCancel (context .Background ())
39
51
h := & strategyStore {
40
- logger : logger ,
41
- serviceStrategies : make (map [string ]* sampling.SamplingStrategyResponse ),
52
+ logger : logger ,
53
+ ctx : ctx ,
54
+ cancelFunc : cancelFunc ,
42
55
}
56
+ h .storedStrategies .Store (defaultStrategies ())
57
+
43
58
strategies , err := loadStrategies (options .StrategiesFile )
44
59
if err != nil {
45
60
return nil , err
46
61
}
47
62
h .parseStrategies (strategies )
63
+
64
+ if options .ReloadInterval > 0 {
65
+ go h .autoUpdateStrategies (options .ReloadInterval , options .StrategiesFile )
66
+ }
48
67
return h , nil
49
68
}
50
69
51
70
// GetSamplingStrategy implements StrategyStore#GetSamplingStrategy.
52
71
func (h * strategyStore ) GetSamplingStrategy (serviceName string ) (* sampling.SamplingStrategyResponse , error ) {
53
- if strategy , ok := h .serviceStrategies [serviceName ]; ok {
72
+ ss := h .storedStrategies .Load ().(* storedStrategies )
73
+ serviceStrategies := ss .serviceStrategies
74
+ if strategy , ok := serviceStrategies [serviceName ]; ok {
54
75
return strategy , nil
55
76
}
56
77
h .logger .Debug ("sampling strategy not found, using default" , zap .String ("service" , serviceName ))
57
- return h .defaultStrategy , nil
78
+ return ss .defaultStrategy , nil
79
+ }
80
+
81
+ // StopUpdateStrategy stops updating the strategy
82
+ func (h * strategyStore ) StopUpdateStrategy () {
83
+ h .cancelFunc ()
84
+ }
85
+
86
+ func (h * strategyStore ) autoUpdateStrategies (interval time.Duration , filePath string ) {
87
+ lastString := ""
88
+ ticker := time .NewTicker (interval )
89
+ defer ticker .Stop ()
90
+ for {
91
+ select {
92
+ case <- ticker .C :
93
+ if currBytes , err := ioutil .ReadFile (filepath .Clean (filePath )); err == nil {
94
+ currStr := string (currBytes )
95
+ if lastString == currStr {
96
+ continue
97
+ }
98
+ err := h .updateSamplingStrategy (currBytes )
99
+ if err != nil {
100
+ h .logger .Error ("UpdateSamplingStrategy failed" , zap .Error (err ))
101
+ }
102
+ lastString = currStr
103
+ } else {
104
+ h .logger .Error ("UpdateSamplingStrategy failed" , zap .Error (err ))
105
+ }
106
+ case <- h .ctx .Done ():
107
+ return
108
+ }
109
+ }
110
+ }
111
+
112
+ func (h * strategyStore ) updateSamplingStrategy (bytes []byte ) error {
113
+ var strategies strategies
114
+ if err := json .Unmarshal (bytes , & strategies ); err != nil {
115
+ return fmt .Errorf ("failed to unmarshal strategies: %w" , err )
116
+ }
117
+ h .parseStrategies (& strategies )
118
+ h .logger .Info ("Updated strategy:" + string (bytes ))
119
+ return nil
58
120
}
59
121
60
122
// TODO good candidate for a global util function
@@ -74,40 +136,41 @@ func loadStrategies(strategiesFile string) (*strategies, error) {
74
136
}
75
137
76
138
func (h * strategyStore ) parseStrategies (strategies * strategies ) {
77
- h .defaultStrategy = defaultStrategyResponse ()
78
139
if strategies == nil {
79
140
h .logger .Info ("No sampling strategies provided, using defaults" )
80
141
return
81
142
}
143
+ newStore := defaultStrategies ()
82
144
if strategies .DefaultStrategy != nil {
83
- h .defaultStrategy = h .parseServiceStrategies (strategies .DefaultStrategy )
145
+ newStore .defaultStrategy = h .parseServiceStrategies (strategies .DefaultStrategy )
84
146
}
85
147
86
148
merge := true
87
- if h .defaultStrategy .OperationSampling == nil ||
88
- h .defaultStrategy .OperationSampling .PerOperationStrategies == nil {
149
+ if newStore .defaultStrategy .OperationSampling == nil ||
150
+ newStore .defaultStrategy .OperationSampling .PerOperationStrategies == nil {
89
151
merge = false
90
152
}
91
153
92
154
for _ , s := range strategies .ServiceStrategies {
93
- h .serviceStrategies [s .Service ] = h .parseServiceStrategies (s )
155
+ newStore .serviceStrategies [s .Service ] = h .parseServiceStrategies (s )
94
156
95
157
// Merge with the default operation strategies, because only merging with
96
158
// the default strategy has no effect on service strategies (the default strategy
97
159
// is not merged with and only used as a fallback).
98
- opS := h .serviceStrategies [s .Service ].OperationSampling
160
+ opS := newStore .serviceStrategies [s .Service ].OperationSampling
99
161
if opS == nil {
100
162
// Service has no per-operation strategies, so just reference the default settings.
101
- h .serviceStrategies [s .Service ].OperationSampling = h .defaultStrategy .OperationSampling
163
+ newStore .serviceStrategies [s .Service ].OperationSampling = newStore .defaultStrategy .OperationSampling
102
164
continue
103
165
}
104
166
105
167
if merge {
106
168
opS .PerOperationStrategies = mergePerOperationSamplingStrategies (
107
169
opS .PerOperationStrategies ,
108
- h .defaultStrategy .OperationSampling .PerOperationStrategies )
170
+ newStore .defaultStrategy .OperationSampling .PerOperationStrategies )
109
171
}
110
172
}
173
+ h .storedStrategies .Store (newStore )
111
174
}
112
175
113
176
// mergePerOperationStrategies merges two operation strategies a and b, where a takes precedence over b.
0 commit comments