Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for dynamically reloading sampling strategies #2040

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"default_strategy": {
},
"service_strategies": [
{
"service": "foo",
"type": "probabilistic",
"param": 0.5
}
]
}
84 changes: 81 additions & 3 deletions plugin/sampling/strategystore/static/strategy_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"path/filepath"
"sync"

"github.com/fsnotify/fsnotify"
"github.com/pkg/errors"
"go.uber.org/zap"

Expand All @@ -29,31 +32,102 @@ import (
)

type strategyStore struct {
// to allow concurrent update of sampling strategies
lock sync.RWMutex

logger *zap.Logger

defaultStrategy *sampling.SamplingStrategyResponse
serviceStrategies map[string]*sampling.SamplingStrategyResponse
}

func (h *strategyStore) loadAndParseStrategies(strategiesFile string) error {
s, err := loadStrategies(strategiesFile)
if err != nil {
h.logger.Warn("using the last saved configuration for sampling strategies.", zap.Error(err))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
h.logger.Warn("using the last saved configuration for sampling strategies.", zap.Error(err))
h.logger.Warn("Unable to reload the sampling strategies file. Using the last saved configuration.", zap.Error(err), zap.String("file", strategiesFile))

return err
}

h.parseStrategies(s)
return nil
}

// NewStrategyStore creates a strategy store that holds static sampling strategies.
func NewStrategyStore(options Options, logger *zap.Logger) (ss.StrategyStore, error) {
h := &strategyStore{
logger: logger,
serviceStrategies: make(map[string]*sampling.SamplingStrategyResponse),
}
strategies, err := loadStrategies(options.StrategiesFile)
if err != nil {

// Read strategies
if err := h.loadAndParseStrategies(options.StrategiesFile); err != nil {
return nil, err
}
h.parseStrategies(strategies)

// Watch strategies file for changes.
watcher, err := fsnotify.NewWatcher()
if err != nil {
logger.Error("failed to create a new watcher for the sampling strategies file", zap.Error(err))
}

go h.runWatcherLoop(watcher, options.StrategiesFile)

if err := watcher.Add(options.StrategiesFile); err != nil {
logger.Error("error adding watcher to file", zap.String("file", options.StrategiesFile), zap.Error(err))
} else {
logger.Info("watching", zap.String("file", options.StrategiesFile))
}

dir := filepath.Dir(options.StrategiesFile)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to watch the dir?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

err = watcher.Add(dir)
if err := watcher.Add(dir); err != nil {
h.logger.Error("error adding watcher to dir", zap.String("dir", dir), zap.Error(err))
} else {
h.logger.Info("watching", zap.String("dir", dir))
}

return h, nil
}

func (h *strategyStore) runWatcherLoop(watcher *fsnotify.Watcher, strategiesFile string) {
for {
select {
case event := <-watcher.Events:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we not simplify this by handling all event? E.g.

case event := <-watcher.Events:
    if event.Op == fsnotify.Remove {
        // resubscribe to handle k8s use case
    }
    h.loadAndParseStrategies(strategiesFile)
    continue

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current logic is actually very close to this

  1. Remove events could occur in a non k8s environment. In which case we need to handle only write events, hence the following block-
if event.Op&fsnotify.Write == fsnotify.Write {
	h.loadAndParseStrategies(strategiesFile)
}
  1. If they occur in a k8s environment, we need to reload the file strategies file hence the first part

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a note: the second case isn't specific to Kubernetes, but it's how Kubernetes mounts those files in the local file system (with two symlinks at different levels). The same would happen on non-Kubernetes environments as well, if people use symlinks on directories pointing to the "current" version of a config map.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my point is that you want special handing for Remove by re-subscribing, and all events should result in parsing the file. Right now the code ignores several events altogether.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now the code ignores several events altogether.

?

if event.Op&fsnotify.Remove == fsnotify.Remove {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are these treated as bitmasks? The Op constants are defined as ordinary numbers (https://godoc.org/github.com/fsnotify/fsnotify#Op).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

if event.Name == strategiesFile {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given that we're watching just a single file, when would the name not equal?

h.logger.Warn("the sampling strategies file has been removed")

// This is a workaround for k8s configmaps. Since k8s loads configmaps as
// symlinked files within the containers, changes to the configmap register
// as `fsnotify.Remove` events.
if err := watcher.Add(strategiesFile); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't there a race condition here? If the file was removed, even temporarily, and you call watcher.Add() on non-existing file, will it not error out immediately?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this line is specifically added for the k8s configmap scenario. It is added to handle successive reloads without restarting the collector (without this line, the collector reloads the strategies only the first time the file was changed). I've verified that it works.

However, if the config file was manually removed, yes it will error out immediately, but that works for us.

h.logger.Warn("cannot watch sampling strategy config file", zap.Error(err))
}

h.loadAndParseStrategies(strategiesFile)
}
continue
}
if event.Op&fsnotify.Write == fsnotify.Write {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about Create event? I think it would've addressed my question above.

h.loadAndParseStrategies(strategiesFile)
}
case err, ok := <-watcher.Errors:
if !ok {
return
}
h.logger.Error("file watcher error", zap.Error(err))
}
}
}

// GetSamplingStrategy implements StrategyStore#GetSamplingStrategy.
func (h *strategyStore) GetSamplingStrategy(serviceName string) (*sampling.SamplingStrategyResponse, error) {
h.lock.RLock()
defer h.lock.RUnlock()
if strategy, ok := h.serviceStrategies[serviceName]; ok {
return strategy, nil
}

return h.defaultStrategy, nil
}

Expand All @@ -74,7 +148,11 @@ func loadStrategies(strategiesFile string) (*strategies, error) {
}

func (h *strategyStore) parseStrategies(strategies *strategies) {
h.lock.Lock()
defer h.lock.Unlock()

h.defaultStrategy = defaultStrategyResponse()

if strategies == nil {
h.logger.Info("No sampling strategies provided, using defaults")
return
Expand Down
51 changes: 51 additions & 0 deletions plugin/sampling/strategystore/static/strategy_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ package static

import (
"fmt"
"io/ioutil"
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -218,6 +221,54 @@ func TestParseStrategy(t *testing.T) {
assert.Contains(t, buf.String(), "Failed to parse sampling strategy")
}

func TestHotReloadSamplingStrategiesTempFile(t *testing.T) {
tmpfile, err := ioutil.TempFile("fixtures", "strategies-hot-reload.*.json")
assert.NoError(t, err)

tmpFileName := tmpfile.Name()
defer os.Remove(tmpFileName)

content, err := ioutil.ReadFile("fixtures/strategies-hot-reload.json")
assert.NoError(t, err)

err = ioutil.WriteFile(tmpFileName, content, 0644)
assert.NoError(t, err)

logger, _ := testutils.NewLogger()
store, err := NewStrategyStore(Options{StrategiesFile: tmpFileName}, logger)
assert.NoError(t, err)

s, err := store.GetSamplingStrategy("foo")
require.NoError(t, err)
assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.5), *s)

newContent, err := ioutil.ReadFile("fixtures/strategies.json")
assert.NoError(t, err)
err = ioutil.WriteFile(tmpFileName, newContent, 0644)
assert.NoError(t, err)

done := make(chan bool)
go func() {
for {
s, err := store.GetSamplingStrategy("foo")
require.NoError(t, err)
if s.GetProbabilisticSampling().SamplingRate == 0.8 {
done <- true
}
time.Sleep(10 * time.Millisecond)
}
}()

select {
case <-done:
s, err := store.GetSamplingStrategy("foo")
require.NoError(t, err)
assert.EqualValues(t, makeResponse(sampling.SamplingStrategyType_PROBABILISTIC, 0.8), *s)
case <-time.After(time.Second):
assert.Fail(t, "timed out waiting for the hot reload to kick in")
}
}

func makeResponse(samplerType sampling.SamplingStrategyType, param float64) (resp sampling.SamplingStrategyResponse) {
resp.StrategyType = samplerType
if samplerType == sampling.SamplingStrategyType_PROBABILISTIC {
Expand Down