Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for dynamically reloading sampling strategies #2040

Closed
wants to merge 16 commits into from
Closed
52 changes: 51 additions & 1 deletion plugin/sampling/strategystore/static/strategy_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"sync"

"github.com/fsnotify/fsnotify"
"github.com/pkg/errors"
"go.uber.org/zap"

Expand All @@ -29,6 +31,9 @@ import (
)

type strategyStore struct {
// to allow concurrent update of sampling strategies
lock sync.RWMutex

logger *zap.Logger

defaultStrategy *sampling.SamplingStrategyResponse
Expand All @@ -41,19 +46,62 @@ func NewStrategyStore(options Options, logger *zap.Logger) (ss.StrategyStore, er
logger: logger,
serviceStrategies: make(map[string]*sampling.SamplingStrategyResponse),
}

// Read strategies
strategies, err := loadStrategies(options.StrategiesFile)
if err != nil {
return nil, err
}
h.parseStrategies(strategies)

// Watch strategies file for changes.
watcher, err := fsnotify.NewWatcher()
if err != nil {
logger.Error("failed to create a new watcher for the sampling strategies file", zap.Error(err))
}

go func() {
for {
select {
case event := <-watcher.Events:
if event.Op&fsnotify.Remove == fsnotify.Remove {
logger.Warn("the sampling strategies file has been removed, using the last known version")
continue
}

s, err := loadStrategies(options.StrategiesFile)
if err != nil {
logger.Warn("Error while parsing strategies file", zap.Error(err))
} else {
h.parseStrategies(s)
}
case err, ok := <-watcher.Errors:
if !ok {
return
}
logger.Error("event", zap.Error(err))
}
}
}()

err = watcher.Add(options.StrategiesFile)
if err != nil {
logger.Error("error adding watcher to file", zap.String("file", options.StrategiesFile), zap.Error(err))
} else {
logger.Info("watching", zap.String("file", options.StrategiesFile))
}

return h, nil
}

// GetSamplingStrategy implements StrategyStore#GetSamplingStrategy.
func (h *strategyStore) GetSamplingStrategy(serviceName string) (*sampling.SamplingStrategyResponse, error) {
h.lock.RLock()
defer h.lock.RUnlock()
if strategy, ok := h.serviceStrategies[serviceName]; ok {
return strategy, nil
}

return h.defaultStrategy, nil
}

Expand All @@ -74,7 +122,9 @@ func loadStrategies(strategiesFile string) (*strategies, error) {
}

func (h *strategyStore) parseStrategies(strategies *strategies) {
h.defaultStrategy = defaultStrategyResponse()
h.lock.Lock()
defer h.lock.Unlock()

if strategies == nil {
h.logger.Info("No sampling strategies provided, using defaults")
return
Expand Down