Skip to content

Commit 4b44892

Browse files
committed
Add CLI option for collector to refresh sampling strategies
Signed-off-by: defool <defool@foxmail.com>
1 parent d75eb14 commit 4b44892

File tree

6 files changed

+129
-18
lines changed

6 files changed

+129
-18
lines changed

cmd/collector/app/sampling/strategystore/interface.go

+6
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,9 @@ type StrategyStore interface {
2323
// GetSamplingStrategy retrieves the sampling strategy for the specified service.
2424
GetSamplingStrategy(serviceName string) (*sampling.SamplingStrategyResponse, error)
2525
}
26+
27+
// StrategyUpdater updates sampling strategies.
28+
type StrategyUpdater interface {
29+
//UpdateSamplingStrategy replaces the sampling strategy content.
30+
UpdateSamplingStrategy(bytes []byte) error
31+
}

plugin/sampling/strategystore/static/factory.go

+35-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@
1515
package static
1616

1717
import (
18+
"context"
1819
"flag"
20+
"io/ioutil"
21+
"time"
1922

2023
"github.com/spf13/viper"
2124
"github.com/uber/jaeger-lib/metrics"
@@ -56,5 +59,36 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
5659

5760
// CreateStrategyStore implements strategystore.Factory
5861
func (f *Factory) CreateStrategyStore() (strategystore.StrategyStore, error) {
59-
return NewStrategyStore(*f.options, f.logger)
62+
ss, err := NewStrategyStore(*f.options, f.logger)
63+
if err != nil {
64+
return ss, err
65+
}
66+
if s, ok := ss.(strategystore.StrategyUpdater); f.options.ReloadInterval > 0 && ok {
67+
go f.autoUpdateStrategy(context.Background(), s)
68+
}
69+
return ss, nil
70+
}
71+
72+
func (f *Factory) autoUpdateStrategy(ctx context.Context, s strategystore.StrategyUpdater) {
73+
lastString := ""
74+
interval, filePath := f.options.ReloadInterval, f.options.StrategiesFile
75+
ticker := time.NewTicker(interval)
76+
for {
77+
select {
78+
case <-ticker.C:
79+
if currBytes, err := ioutil.ReadFile(filePath); err == nil {
80+
currStr := string(currBytes)
81+
if lastString == currStr {
82+
continue
83+
}
84+
err := s.UpdateSamplingStrategy(currBytes)
85+
if err != nil {
86+
f.logger.Error("UpdateSamplingStrategy failed", zap.Error(err))
87+
}
88+
lastString = currStr
89+
}
90+
case <-ctx.Done():
91+
return
92+
}
93+
}
6094
}

plugin/sampling/strategystore/static/factory_test.go

+46
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,21 @@
1515
package static
1616

1717
import (
18+
"io/ioutil"
19+
"os"
20+
"strings"
1821
"testing"
22+
"time"
1923

2024
"github.com/stretchr/testify/assert"
25+
"github.com/stretchr/testify/require"
2126
"github.com/uber/jaeger-lib/metrics"
2227
"go.uber.org/zap"
2328

2429
ss "github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore"
2530
"github.com/jaegertracing/jaeger/pkg/config"
2631
"github.com/jaegertracing/jaeger/plugin"
32+
"github.com/jaegertracing/jaeger/thrift-gen/sampling"
2733
)
2834

2935
var _ ss.Factory = new(Factory)
@@ -39,3 +45,43 @@ func TestFactory(t *testing.T) {
3945
_, err := f.CreateStrategyStore()
4046
assert.NoError(t, err)
4147
}
48+
49+
func TestAutoReload(t *testing.T) {
50+
// copy from fixtures/strategies.json
51+
srcFile, dstFile := "fixtures/strategies.json", "fixtures/strategies_for_reload.json"
52+
srcBytes, err := ioutil.ReadFile(srcFile)
53+
require.NoError(t, err)
54+
err = ioutil.WriteFile(dstFile, srcBytes, 0644)
55+
require.NoError(t, err)
56+
57+
f := NewFactory()
58+
v, command := config.Viperize(f.AddFlags)
59+
_ = command.ParseFlags([]string{"--sampling.strategies-file=" + dstFile, "--sampling.strategies-reload-interval=50ms"})
60+
f.InitFromViper(v)
61+
62+
// Test reading strategies from a file
63+
//ctx, canf := context.WithCancel(context.Background())
64+
//defer canf()
65+
66+
store, err := f.CreateStrategyStore()
67+
require.NoError(t, err)
68+
69+
s, err := store.GetSamplingStrategy("foo")
70+
require.NoError(t, err)
71+
assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.8), *s)
72+
73+
// update file
74+
newStr := strings.Replace(string(srcBytes), "0.8", "0.9", 1)
75+
err = ioutil.WriteFile(dstFile, []byte(newStr), 0644)
76+
require.NoError(t, err)
77+
78+
// wait for reload
79+
time.Sleep(time.Millisecond * 50 * 2)
80+
81+
// verity reloading
82+
s, err = store.GetSamplingStrategy("foo")
83+
require.NoError(t, err)
84+
assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.9), *s)
85+
86+
os.Remove(dstFile)
87+
}

plugin/sampling/strategystore/static/options.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -16,27 +16,33 @@ package static
1616

1717
import (
1818
"flag"
19+
"time"
1920

2021
"github.com/spf13/viper"
2122
)
2223

2324
const (
24-
samplingStrategiesFile = "sampling.strategies-file"
25+
samplingStrategiesFile = "sampling.strategies-file"
26+
samplingStrategiesReloadInterval = "sampling.strategies-reload-interval"
2527
)
2628

2729
// Options holds configuration for the static sampling strategy store.
2830
type Options struct {
2931
// StrategiesFile is the path for the sampling strategies file in JSON format
3032
StrategiesFile string
33+
// ReloadInterval is the time interval to check and reload sampling strategies file
34+
ReloadInterval time.Duration
3135
}
3236

3337
// AddFlags adds flags for Options
3438
func AddFlags(flagSet *flag.FlagSet) {
3539
flagSet.String(samplingStrategiesFile, "", "The path for the sampling strategies file in JSON format. See sampling documentation to see format of the file")
40+
flagSet.Duration(samplingStrategiesReloadInterval, 0, "Reload interval to check and reload sampling strategies file. Zero value means no checks (default 0s)")
3641
}
3742

3843
// InitFromViper initializes Options with properties from viper
3944
func (opts *Options) InitFromViper(v *viper.Viper) *Options {
4045
opts.StrategiesFile = v.GetString(samplingStrategiesFile)
46+
opts.ReloadInterval = v.GetDuration(samplingStrategiesReloadInterval)
4147
return opts
4248
}

plugin/sampling/strategystore/static/strategy_store.go

+34-14
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"encoding/json"
2121
"fmt"
2222
"io/ioutil"
23+
"sync/atomic"
2324

2425
"go.uber.org/zap"
2526

@@ -30,16 +31,16 @@ import (
3031
type strategyStore struct {
3132
logger *zap.Logger
3233

33-
defaultStrategy *sampling.SamplingStrategyResponse
34-
serviceStrategies map[string]*sampling.SamplingStrategyResponse
34+
defaultStrategy atomic.Value
35+
serviceStrategies atomic.Value
3536
}
3637

3738
// NewStrategyStore creates a strategy store that holds static sampling strategies.
3839
func NewStrategyStore(options Options, logger *zap.Logger) (ss.StrategyStore, error) {
3940
h := &strategyStore{
40-
logger: logger,
41-
serviceStrategies: make(map[string]*sampling.SamplingStrategyResponse),
41+
logger: logger,
4242
}
43+
h.serviceStrategies.Store(make(map[string]*sampling.SamplingStrategyResponse))
4344
strategies, err := loadStrategies(options.StrategiesFile)
4445
if err != nil {
4546
return nil, err
@@ -50,10 +51,25 @@ func NewStrategyStore(options Options, logger *zap.Logger) (ss.StrategyStore, er
5051

5152
// GetSamplingStrategy implements StrategyStore#GetSamplingStrategy.
5253
func (h *strategyStore) GetSamplingStrategy(serviceName string) (*sampling.SamplingStrategyResponse, error) {
53-
if strategy, ok := h.serviceStrategies[serviceName]; ok {
54+
serviceStrategies, ok := h.serviceStrategies.Load().(map[string]*sampling.SamplingStrategyResponse)
55+
if !ok {
56+
return nil, fmt.Errorf("wrong type of serviceStrategies")
57+
}
58+
if strategy, ok := serviceStrategies[serviceName]; ok {
5459
return strategy, nil
5560
}
56-
return h.defaultStrategy, nil
61+
return h.defaultStrategy.Load().(*sampling.SamplingStrategyResponse), nil
62+
}
63+
64+
// UpdateSamplingStrategy implements StrategyStore#UpdateSamplingStrategy.
65+
func (h *strategyStore) UpdateSamplingStrategy(bytes []byte) error {
66+
var strategies strategies
67+
if err := json.Unmarshal(bytes, &strategies); err != nil {
68+
return fmt.Errorf("failed to unmarshal strategies: %w", err)
69+
}
70+
h.parseStrategies(&strategies)
71+
h.logger.Info("Updated strategy:" + string(bytes))
72+
return nil
5773
}
5874

5975
// TODO good candidate for a global util function
@@ -73,40 +89,44 @@ func loadStrategies(strategiesFile string) (*strategies, error) {
7389
}
7490

7591
func (h *strategyStore) parseStrategies(strategies *strategies) {
76-
h.defaultStrategy = defaultStrategyResponse()
92+
defaultStrategy := defaultStrategyResponse()
93+
h.defaultStrategy.Store(defaultStrategy)
7794
if strategies == nil {
7895
h.logger.Info("No sampling strategies provided, using defaults")
7996
return
8097
}
8198
if strategies.DefaultStrategy != nil {
82-
h.defaultStrategy = h.parseServiceStrategies(strategies.DefaultStrategy)
99+
defaultStrategy = h.parseServiceStrategies(strategies.DefaultStrategy)
83100
}
84101

85102
merge := true
86-
if h.defaultStrategy.OperationSampling == nil ||
87-
h.defaultStrategy.OperationSampling.PerOperationStrategies == nil {
103+
if defaultStrategy.OperationSampling == nil ||
104+
defaultStrategy.OperationSampling.PerOperationStrategies == nil {
88105
merge = false
89106
}
90107

108+
serviceStrategies := make(map[string]*sampling.SamplingStrategyResponse)
91109
for _, s := range strategies.ServiceStrategies {
92-
h.serviceStrategies[s.Service] = h.parseServiceStrategies(s)
110+
serviceStrategies[s.Service] = h.parseServiceStrategies(s)
93111

94112
// Merge with the default operation strategies, because only merging with
95113
// the default strategy has no effect on service strategies (the default strategy
96114
// is not merged with and only used as a fallback).
97-
opS := h.serviceStrategies[s.Service].OperationSampling
115+
opS := serviceStrategies[s.Service].OperationSampling
98116
if opS == nil {
99117
// Service has no per-operation strategies, so just reference the default settings.
100-
h.serviceStrategies[s.Service].OperationSampling = h.defaultStrategy.OperationSampling
118+
serviceStrategies[s.Service].OperationSampling = defaultStrategy.OperationSampling
101119
continue
102120
}
103121

104122
if merge {
105123
opS.PerOperationStrategies = mergePerOperationSamplingStrategies(
106124
opS.PerOperationStrategies,
107-
h.defaultStrategy.OperationSampling.PerOperationStrategies)
125+
defaultStrategy.OperationSampling.PerOperationStrategies)
108126
}
109127
}
128+
h.defaultStrategy.Store(defaultStrategy)
129+
h.serviceStrategies.Store(serviceStrategies)
110130
}
111131

112132
// mergePerOperationStrategies merges two operation strategies a and b, where a takes precedence over b.

plugin/sampling/strategystore/static/strategy_store_test.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
package static
1616

1717
import (
18-
"fmt"
1918
"testing"
2019

2120
"github.com/stretchr/testify/assert"
@@ -79,7 +78,7 @@ func TestPerOperationSamplingStrategies(t *testing.T) {
7978
os := s.OperationSampling
8079
assert.EqualValues(t, os.DefaultSamplingProbability, 0.8)
8180
require.Len(t, os.PerOperationStrategies, 4)
82-
fmt.Println(os)
81+
//fmt.Println(os)
8382
assert.Equal(t, "op6", os.PerOperationStrategies[0].Operation)
8483
assert.EqualValues(t, 0.5, os.PerOperationStrategies[0].ProbabilisticSampling.SamplingRate)
8584
assert.Equal(t, "op1", os.PerOperationStrategies[1].Operation)

0 commit comments

Comments
 (0)