Skip to content

Commit b687b1b

Browse files
authored
[feat] Support periodic refresh of sampling strategies (#2188)
* Support periodic reload of sampling strategies file Signed-off-by: defool <defool@foxmail.com> * Optimizing code struct and comments Signed-off-by: defool <defool@foxmail.com>
1 parent feaf06d commit b687b1b

File tree

4 files changed

+142
-15
lines changed

4 files changed

+142
-15
lines changed

plugin/sampling/strategystore/static/constants.go

+8
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,11 @@ func defaultStrategyResponse() *sampling.SamplingStrategyResponse {
4242
},
4343
}
4444
}
45+
46+
func defaultStrategies() *storedStrategies {
47+
s := &storedStrategies{
48+
serviceStrategies: make(map[string]*sampling.SamplingStrategyResponse),
49+
}
50+
s.defaultStrategy = defaultStrategyResponse()
51+
return s
52+
}

plugin/sampling/strategystore/static/options.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -16,27 +16,33 @@ package static
1616

1717
import (
1818
"flag"
19+
"time"
1920

2021
"github.com/spf13/viper"
2122
)
2223

2324
const (
24-
samplingStrategiesFile = "sampling.strategies-file"
25+
samplingStrategiesFile = "sampling.strategies-file"
26+
samplingStrategiesReloadInterval = "sampling.strategies-reload-interval"
2527
)
2628

2729
// Options holds configuration for the static sampling strategy store.
2830
type Options struct {
2931
// StrategiesFile is the path for the sampling strategies file in JSON format
3032
StrategiesFile string
33+
// ReloadInterval is the time interval to check and reload sampling strategies file
34+
ReloadInterval time.Duration
3135
}
3236

3337
// AddFlags adds flags for Options
3438
func AddFlags(flagSet *flag.FlagSet) {
3539
flagSet.String(samplingStrategiesFile, "", "The path for the sampling strategies file in JSON format. See sampling documentation to see format of the file")
40+
flagSet.Duration(samplingStrategiesReloadInterval, 0, "Reload interval to check and reload sampling strategies file. Zero value means no reloading")
3641
}
3742

3843
// InitFromViper initializes Options with properties from viper
3944
func (opts *Options) InitFromViper(v *viper.Viper) *Options {
4045
opts.StrategiesFile = v.GetString(samplingStrategiesFile)
46+
opts.ReloadInterval = v.GetDuration(samplingStrategiesReloadInterval)
4147
return opts
4248
}

plugin/sampling/strategystore/static/strategy_store.go

+75-12
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,14 @@ package static
1616

1717
import (
1818
"bytes"
19+
"context"
1920
"encoding/gob"
2021
"encoding/json"
2122
"fmt"
2223
"io/ioutil"
24+
"path/filepath"
25+
"sync/atomic"
26+
"time"
2327

2428
"go.uber.org/zap"
2529

@@ -30,31 +34,89 @@ import (
3034
type strategyStore struct {
3135
logger *zap.Logger
3236

37+
storedStrategies atomic.Value // holds *storedStrategies
38+
39+
ctx context.Context
40+
cancelFunc context.CancelFunc
41+
}
42+
43+
type storedStrategies struct {
3344
defaultStrategy *sampling.SamplingStrategyResponse
3445
serviceStrategies map[string]*sampling.SamplingStrategyResponse
3546
}
3647

3748
// NewStrategyStore creates a strategy store that holds static sampling strategies.
3849
func NewStrategyStore(options Options, logger *zap.Logger) (ss.StrategyStore, error) {
50+
ctx, cancelFunc := context.WithCancel(context.Background())
3951
h := &strategyStore{
40-
logger: logger,
41-
serviceStrategies: make(map[string]*sampling.SamplingStrategyResponse),
52+
logger: logger,
53+
ctx: ctx,
54+
cancelFunc: cancelFunc,
4255
}
56+
h.storedStrategies.Store(defaultStrategies())
57+
4358
strategies, err := loadStrategies(options.StrategiesFile)
4459
if err != nil {
4560
return nil, err
4661
}
4762
h.parseStrategies(strategies)
63+
64+
if options.ReloadInterval > 0 {
65+
go h.autoUpdateStrategies(options.ReloadInterval, options.StrategiesFile)
66+
}
4867
return h, nil
4968
}
5069

5170
// GetSamplingStrategy implements StrategyStore#GetSamplingStrategy.
5271
func (h *strategyStore) GetSamplingStrategy(serviceName string) (*sampling.SamplingStrategyResponse, error) {
53-
if strategy, ok := h.serviceStrategies[serviceName]; ok {
72+
ss := h.storedStrategies.Load().(*storedStrategies)
73+
serviceStrategies := ss.serviceStrategies
74+
if strategy, ok := serviceStrategies[serviceName]; ok {
5475
return strategy, nil
5576
}
5677
h.logger.Debug("sampling strategy not found, using default", zap.String("service", serviceName))
57-
return h.defaultStrategy, nil
78+
return ss.defaultStrategy, nil
79+
}
80+
81+
// Close stops updating the strategies
82+
func (h *strategyStore) Close() {
83+
h.cancelFunc()
84+
}
85+
86+
func (h *strategyStore) autoUpdateStrategies(interval time.Duration, filePath string) {
87+
lastString := ""
88+
ticker := time.NewTicker(interval)
89+
defer ticker.Stop()
90+
for {
91+
select {
92+
case <-ticker.C:
93+
currBytes, err := ioutil.ReadFile(filepath.Clean(filePath))
94+
if err != nil {
95+
h.logger.Error("ReadFile failed", zap.Error(err))
96+
}
97+
currStr := string(currBytes)
98+
if lastString == currStr {
99+
continue
100+
}
101+
if err = h.updateSamplingStrategy(currBytes); err != nil {
102+
h.logger.Error("UpdateSamplingStrategy failed", zap.Error(err))
103+
}
104+
lastString = currStr
105+
106+
case <-h.ctx.Done():
107+
return
108+
}
109+
}
110+
}
111+
112+
func (h *strategyStore) updateSamplingStrategy(bytes []byte) error {
113+
var strategies strategies
114+
if err := json.Unmarshal(bytes, &strategies); err != nil {
115+
return fmt.Errorf("failed to unmarshal strategies: %w", err)
116+
}
117+
h.parseStrategies(&strategies)
118+
h.logger.Info("Updated sampling strategies:" + string(bytes))
119+
return nil
58120
}
59121

60122
// TODO good candidate for a global util function
@@ -74,40 +136,41 @@ func loadStrategies(strategiesFile string) (*strategies, error) {
74136
}
75137

76138
func (h *strategyStore) parseStrategies(strategies *strategies) {
77-
h.defaultStrategy = defaultStrategyResponse()
78139
if strategies == nil {
79140
h.logger.Info("No sampling strategies provided, using defaults")
80141
return
81142
}
143+
newStore := defaultStrategies()
82144
if strategies.DefaultStrategy != nil {
83-
h.defaultStrategy = h.parseServiceStrategies(strategies.DefaultStrategy)
145+
newStore.defaultStrategy = h.parseServiceStrategies(strategies.DefaultStrategy)
84146
}
85147

86148
merge := true
87-
if h.defaultStrategy.OperationSampling == nil ||
88-
h.defaultStrategy.OperationSampling.PerOperationStrategies == nil {
149+
if newStore.defaultStrategy.OperationSampling == nil ||
150+
newStore.defaultStrategy.OperationSampling.PerOperationStrategies == nil {
89151
merge = false
90152
}
91153

92154
for _, s := range strategies.ServiceStrategies {
93-
h.serviceStrategies[s.Service] = h.parseServiceStrategies(s)
155+
newStore.serviceStrategies[s.Service] = h.parseServiceStrategies(s)
94156

95157
// Merge with the default operation strategies, because only merging with
96158
// the default strategy has no effect on service strategies (the default strategy
97159
// is not merged with and only used as a fallback).
98-
opS := h.serviceStrategies[s.Service].OperationSampling
160+
opS := newStore.serviceStrategies[s.Service].OperationSampling
99161
if opS == nil {
100162
// Service has no per-operation strategies, so just reference the default settings.
101-
h.serviceStrategies[s.Service].OperationSampling = h.defaultStrategy.OperationSampling
163+
newStore.serviceStrategies[s.Service].OperationSampling = newStore.defaultStrategy.OperationSampling
102164
continue
103165
}
104166

105167
if merge {
106168
opS.PerOperationStrategies = mergePerOperationSamplingStrategies(
107169
opS.PerOperationStrategies,
108-
h.defaultStrategy.OperationSampling.PerOperationStrategies)
170+
newStore.defaultStrategy.OperationSampling.PerOperationStrategies)
109171
}
110172
}
173+
h.storedStrategies.Store(newStore)
111174
}
112175

113176
// mergePerOperationStrategies merges two operation strategies a and b, where a takes precedence over b.

plugin/sampling/strategystore/static/strategy_store_test.go

+52-2
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,11 @@
1515
package static
1616

1717
import (
18-
"fmt"
18+
"io/ioutil"
19+
"os"
20+
"strings"
1921
"testing"
22+
"time"
2023

2124
"github.com/stretchr/testify/assert"
2225
"github.com/stretchr/testify/require"
@@ -79,7 +82,7 @@ func TestPerOperationSamplingStrategies(t *testing.T) {
7982
os := s.OperationSampling
8083
assert.EqualValues(t, os.DefaultSamplingProbability, 0.8)
8184
require.Len(t, os.PerOperationStrategies, 4)
82-
fmt.Println(os)
85+
8386
assert.Equal(t, "op6", os.PerOperationStrategies[0].Operation)
8487
assert.EqualValues(t, 0.5, os.PerOperationStrategies[0].ProbabilisticSampling.SamplingRate)
8588
assert.Equal(t, "op1", os.PerOperationStrategies[1].Operation)
@@ -243,3 +246,50 @@ func TestDeepCopy(t *testing.T) {
243246
assert.False(t, copy == s)
244247
assert.EqualValues(t, copy, s)
245248
}
249+
250+
func TestAutoUpdateStrategy(t *testing.T) {
251+
// copy from fixtures/strategies.json
252+
tempFile, _ := ioutil.TempFile("", "for_go_test_*.json")
253+
tempFile.Close()
254+
255+
srcFile, dstFile := "fixtures/strategies.json", tempFile.Name()
256+
srcBytes, err := ioutil.ReadFile(srcFile)
257+
require.NoError(t, err)
258+
err = ioutil.WriteFile(dstFile, srcBytes, 0644)
259+
require.NoError(t, err)
260+
261+
interval := time.Millisecond * 10
262+
store, err := NewStrategyStore(Options{
263+
StrategiesFile: dstFile,
264+
ReloadInterval: interval,
265+
}, zap.NewNop())
266+
require.NoError(t, err)
267+
defer store.(*strategyStore).Close()
268+
269+
s, err := store.GetSamplingStrategy("foo")
270+
require.NoError(t, err)
271+
assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.8), *s)
272+
273+
// update file
274+
newStr := strings.Replace(string(srcBytes), "0.8", "0.9", 1)
275+
err = ioutil.WriteFile(dstFile, []byte(newStr), 0644)
276+
require.NoError(t, err)
277+
278+
// wait for reloading
279+
time.Sleep(interval * 4)
280+
281+
// verity reloading
282+
s, err = store.GetSamplingStrategy("foo")
283+
require.NoError(t, err)
284+
assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.9), *s)
285+
286+
// check bad file content
287+
_ = ioutil.WriteFile(dstFile, []byte("bad value"), 0644)
288+
time.Sleep(interval * 2)
289+
290+
// remove file(test read file failure)
291+
_ = os.Remove(dstFile)
292+
// wait for delete and update failure
293+
time.Sleep(interval * 2)
294+
295+
}

0 commit comments

Comments
 (0)