Skip to content

Commit 8353fa6

Browse files
committed
Support periodic reload of sampling strategies file
Signed-off-by: defool <defool@foxmail.com>
1 parent b99114e commit 8353fa6

File tree

3 files changed

+142
-14
lines changed

3 files changed

+142
-14
lines changed

plugin/sampling/strategystore/static/options.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -16,27 +16,33 @@ package static
1616

1717
import (
1818
"flag"
19+
"time"
1920

2021
"github.com/spf13/viper"
2122
)
2223

2324
const (
24-
samplingStrategiesFile = "sampling.strategies-file"
25+
samplingStrategiesFile = "sampling.strategies-file"
26+
samplingStrategiesReloadInterval = "sampling.strategies-reload-interval"
2527
)
2628

2729
// Options holds configuration for the static sampling strategy store.
2830
type Options struct {
2931
// StrategiesFile is the path for the sampling strategies file in JSON format
3032
StrategiesFile string
33+
// ReloadInterval is the time interval to check and reload sampling strategies file
34+
ReloadInterval time.Duration
3135
}
3236

3337
// AddFlags adds flags for Options
3438
func AddFlags(flagSet *flag.FlagSet) {
3539
flagSet.String(samplingStrategiesFile, "", "The path for the sampling strategies file in JSON format. See sampling documentation to see format of the file")
40+
flagSet.Duration(samplingStrategiesReloadInterval, 0, "Reload interval to check and reload sampling strategies file. Zero value means no reloading")
3641
}
3742

3843
// InitFromViper initializes Options with properties from viper
3944
func (opts *Options) InitFromViper(v *viper.Viper) *Options {
4045
opts.StrategiesFile = v.GetString(samplingStrategiesFile)
46+
opts.ReloadInterval = v.GetDuration(samplingStrategiesReloadInterval)
4147
return opts
4248
}

plugin/sampling/strategystore/static/strategy_store.go

+82-11
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,14 @@ package static
1616

1717
import (
1818
"bytes"
19+
"context"
1920
"encoding/gob"
2021
"encoding/json"
2122
"fmt"
2223
"io/ioutil"
24+
"path/filepath"
25+
"sync/atomic"
26+
"time"
2327

2428
"go.uber.org/zap"
2529

@@ -30,31 +34,93 @@ import (
3034
type strategyStore struct {
3135
logger *zap.Logger
3236

37+
storedStrategies atomic.Value // holds *storedStrategies
38+
39+
ctx context.Context
40+
cancelFunc context.CancelFunc
41+
}
42+
43+
type storedStrategies struct {
3344
defaultStrategy *sampling.SamplingStrategyResponse
3445
serviceStrategies map[string]*sampling.SamplingStrategyResponse
3546
}
3647

3748
// NewStrategyStore creates a strategy store that holds static sampling strategies.
3849
func NewStrategyStore(options Options, logger *zap.Logger) (ss.StrategyStore, error) {
50+
ctx, cancelFunc := context.WithCancel(context.Background())
3951
h := &strategyStore{
40-
logger: logger,
52+
logger: logger,
53+
ctx: ctx,
54+
cancelFunc: cancelFunc,
55+
}
56+
newStore := &storedStrategies{
4157
serviceStrategies: make(map[string]*sampling.SamplingStrategyResponse),
4258
}
59+
newStore.defaultStrategy = defaultStrategyResponse()
60+
h.storedStrategies.Store(newStore)
61+
4362
strategies, err := loadStrategies(options.StrategiesFile)
4463
if err != nil {
4564
return nil, err
4665
}
4766
h.parseStrategies(strategies)
67+
68+
if options.ReloadInterval > 0 {
69+
go h.autoUpdateStrategy(options.ReloadInterval, options.StrategiesFile)
70+
}
4871
return h, nil
4972
}
5073

5174
// GetSamplingStrategy implements StrategyStore#GetSamplingStrategy.
5275
func (h *strategyStore) GetSamplingStrategy(serviceName string) (*sampling.SamplingStrategyResponse, error) {
53-
if strategy, ok := h.serviceStrategies[serviceName]; ok {
76+
ss := h.storedStrategies.Load().(*storedStrategies)
77+
serviceStrategies := ss.serviceStrategies
78+
if strategy, ok := serviceStrategies[serviceName]; ok {
5479
return strategy, nil
5580
}
5681
h.logger.Debug("sampling strategy not found, using default", zap.String("service", serviceName))
57-
return h.defaultStrategy, nil
82+
return ss.defaultStrategy, nil
83+
}
84+
85+
// StopUpdateStrategy stops updating the strategy
86+
func (h *strategyStore) StopUpdateStrategy() {
87+
h.cancelFunc()
88+
}
89+
90+
func (h *strategyStore) autoUpdateStrategy(interval time.Duration, filePath string) {
91+
lastString := ""
92+
ticker := time.NewTicker(interval)
93+
defer ticker.Stop()
94+
for {
95+
select {
96+
case <-ticker.C:
97+
if currBytes, err := ioutil.ReadFile(filepath.Clean(filePath)); err == nil {
98+
currStr := string(currBytes)
99+
if lastString == currStr {
100+
continue
101+
}
102+
err := h.updateSamplingStrategy(currBytes)
103+
if err != nil {
104+
h.logger.Error("UpdateSamplingStrategy failed", zap.Error(err))
105+
}
106+
lastString = currStr
107+
} else {
108+
h.logger.Error("UpdateSamplingStrategy failed", zap.Error(err))
109+
}
110+
case <-h.ctx.Done():
111+
return
112+
}
113+
}
114+
}
115+
116+
func (h *strategyStore) updateSamplingStrategy(bytes []byte) error {
117+
var strategies strategies
118+
if err := json.Unmarshal(bytes, &strategies); err != nil {
119+
return fmt.Errorf("failed to unmarshal strategies: %w", err)
120+
}
121+
h.parseStrategies(&strategies)
122+
h.logger.Info("Updated strategy:" + string(bytes))
123+
return nil
58124
}
59125

60126
// TODO good candidate for a global util function
@@ -74,40 +140,45 @@ func loadStrategies(strategiesFile string) (*strategies, error) {
74140
}
75141

76142
func (h *strategyStore) parseStrategies(strategies *strategies) {
77-
h.defaultStrategy = defaultStrategyResponse()
143+
newStore := &storedStrategies{
144+
serviceStrategies: make(map[string]*sampling.SamplingStrategyResponse),
145+
}
146+
newStore.defaultStrategy = defaultStrategyResponse()
147+
h.storedStrategies.Store(newStore)
78148
if strategies == nil {
79149
h.logger.Info("No sampling strategies provided, using defaults")
80150
return
81151
}
82152
if strategies.DefaultStrategy != nil {
83-
h.defaultStrategy = h.parseServiceStrategies(strategies.DefaultStrategy)
153+
newStore.defaultStrategy = h.parseServiceStrategies(strategies.DefaultStrategy)
84154
}
85155

86156
merge := true
87-
if h.defaultStrategy.OperationSampling == nil ||
88-
h.defaultStrategy.OperationSampling.PerOperationStrategies == nil {
157+
if newStore.defaultStrategy.OperationSampling == nil ||
158+
newStore.defaultStrategy.OperationSampling.PerOperationStrategies == nil {
89159
merge = false
90160
}
91161

92162
for _, s := range strategies.ServiceStrategies {
93-
h.serviceStrategies[s.Service] = h.parseServiceStrategies(s)
163+
newStore.serviceStrategies[s.Service] = h.parseServiceStrategies(s)
94164

95165
// Merge with the default operation strategies, because only merging with
96166
// the default strategy has no effect on service strategies (the default strategy
97167
// is not merged with and only used as a fallback).
98-
opS := h.serviceStrategies[s.Service].OperationSampling
168+
opS := newStore.serviceStrategies[s.Service].OperationSampling
99169
if opS == nil {
100170
// Service has no per-operation strategies, so just reference the default settings.
101-
h.serviceStrategies[s.Service].OperationSampling = h.defaultStrategy.OperationSampling
171+
newStore.serviceStrategies[s.Service].OperationSampling = newStore.defaultStrategy.OperationSampling
102172
continue
103173
}
104174

105175
if merge {
106176
opS.PerOperationStrategies = mergePerOperationSamplingStrategies(
107177
opS.PerOperationStrategies,
108-
h.defaultStrategy.OperationSampling.PerOperationStrategies)
178+
newStore.defaultStrategy.OperationSampling.PerOperationStrategies)
109179
}
110180
}
181+
h.storedStrategies.Store(newStore)
111182
}
112183

113184
// mergePerOperationStrategies merges two operation strategies a and b, where a takes precedence over b.

plugin/sampling/strategystore/static/strategy_store_test.go

+53-2
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,11 @@
1515
package static
1616

1717
import (
18-
"fmt"
18+
"io/ioutil"
19+
"os"
20+
"strings"
1921
"testing"
22+
"time"
2023

2124
"github.com/stretchr/testify/assert"
2225
"github.com/stretchr/testify/require"
@@ -79,7 +82,7 @@ func TestPerOperationSamplingStrategies(t *testing.T) {
7982
os := s.OperationSampling
8083
assert.EqualValues(t, os.DefaultSamplingProbability, 0.8)
8184
require.Len(t, os.PerOperationStrategies, 4)
82-
fmt.Println(os)
85+
8386
assert.Equal(t, "op6", os.PerOperationStrategies[0].Operation)
8487
assert.EqualValues(t, 0.5, os.PerOperationStrategies[0].ProbabilisticSampling.SamplingRate)
8588
assert.Equal(t, "op1", os.PerOperationStrategies[1].Operation)
@@ -243,3 +246,51 @@ func TestDeepCopy(t *testing.T) {
243246
assert.False(t, copy == s)
244247
assert.EqualValues(t, copy, s)
245248
}
249+
250+
func TestAutoUpdateStrategy(t *testing.T) {
251+
// copy from fixtures/strategies.json
252+
tempFile, _ := ioutil.TempFile("", "for_go_test_*.json")
253+
tempFile.Close()
254+
255+
srcFile, dstFile := "fixtures/strategies.json", tempFile.Name()
256+
srcBytes, err := ioutil.ReadFile(srcFile)
257+
require.NoError(t, err)
258+
err = ioutil.WriteFile(dstFile, srcBytes, 0644)
259+
require.NoError(t, err)
260+
261+
interval := time.Millisecond * 10
262+
store, err := NewStrategyStore(Options{
263+
StrategiesFile: dstFile,
264+
ReloadInterval: interval,
265+
}, zap.NewNop())
266+
require.NoError(t, err)
267+
defer store.(*strategyStore).StopUpdateStrategy()
268+
269+
s, err := store.GetSamplingStrategy("foo")
270+
require.NoError(t, err)
271+
assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.8), *s)
272+
273+
// update file
274+
newStr := strings.Replace(string(srcBytes), "0.8", "0.9", 1)
275+
err = ioutil.WriteFile(dstFile, []byte(newStr), 0644)
276+
require.NoError(t, err)
277+
278+
// wait for reload
279+
time.Sleep(interval * 2)
280+
281+
// verity reloading
282+
s, err = store.GetSamplingStrategy("foo")
283+
require.NoError(t, err)
284+
assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.9), *s)
285+
286+
// check bad file content
287+
_ = ioutil.WriteFile(dstFile, []byte("bad value"), 0644)
288+
time.Sleep(interval * 2)
289+
290+
// check file not exist
291+
os.Remove(dstFile)
292+
293+
// wait for delete and update failed
294+
time.Sleep(interval * 2)
295+
296+
}

0 commit comments

Comments
 (0)