-
Notifications
You must be signed in to change notification settings - Fork 2.8k
Add URL option for sampling strategies file #2519
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 7 commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
270e96d
Add url option for sampling strategies
goku321 b50abef
Update NewStrategyStore constructor
goku321 964129a
Merge remote-tracking branch 'upstream/master'
goku321 f517c75
Refactor strategy store tests
goku321 2593ee2
Use strings.Replace to update strategies
goku321 535b4b7
Change strategiesJSON to a function
goku321 8e1719f
Merge remote-tracking branch 'upstream/master'
goku321 6673bdf
Fix linter failures
goku321 c95250b
Merge remote-tracking branch 'upstream/master'
goku321 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,8 @@ import ( | |
| "encoding/json" | ||
| "fmt" | ||
| "io/ioutil" | ||
| "net/http" | ||
| "net/url" | ||
| "path/filepath" | ||
| "sync/atomic" | ||
| "time" | ||
|
|
@@ -31,6 +33,10 @@ import ( | |
| "github.com/jaegertracing/jaeger/thrift-gen/sampling" | ||
| ) | ||
|
|
||
| // null represents "null" JSON value and | ||
| // it un-marshals to nil pointer. | ||
| var nullJSON = []byte("null") | ||
|
|
||
| type strategyStore struct { | ||
| logger *zap.Logger | ||
|
|
||
|
|
@@ -45,6 +51,8 @@ type storedStrategies struct { | |
| serviceStrategies map[string]*sampling.SamplingStrategyResponse | ||
| } | ||
|
|
||
| type strategyLoader func() ([]byte, error) | ||
|
|
||
| // NewStrategyStore creates a strategy store that holds static sampling strategies. | ||
| func NewStrategyStore(options Options, logger *zap.Logger) (ss.StrategyStore, error) { | ||
| ctx, cancelFunc := context.WithCancel(context.Background()) | ||
|
|
@@ -55,14 +63,20 @@ func NewStrategyStore(options Options, logger *zap.Logger) (ss.StrategyStore, er | |
| } | ||
| h.storedStrategies.Store(defaultStrategies()) | ||
|
|
||
| strategies, err := loadStrategies(options.StrategiesFile) | ||
| if options.StrategiesFile == "" { | ||
| h.parseStrategies(nil) | ||
| return h, nil | ||
| } | ||
|
|
||
| loadFn := samplingStrategyLoader(options.StrategiesFile) | ||
| strategies, err := loadStrategies(loadFn) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| h.parseStrategies(strategies) | ||
|
|
||
| if options.ReloadInterval > 0 { | ||
| go h.autoUpdateStrategies(options.ReloadInterval, options.StrategiesFile) | ||
| go h.autoUpdateStrategies(options.ReloadInterval, loadFn) | ||
| } | ||
| return h, nil | ||
| } | ||
|
|
@@ -83,35 +97,81 @@ func (h *strategyStore) Close() { | |
| h.cancelFunc() | ||
| } | ||
|
|
||
| func (h *strategyStore) autoUpdateStrategies(interval time.Duration, filePath string) { | ||
| lastValue := "" | ||
| func downloadSamplingStrategies(url string) ([]byte, error) { | ||
| resp, err := http.Get(url) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to download sampling strategies: %w", err) | ||
| } | ||
|
|
||
| defer resp.Body.Close() | ||
| buf := new(bytes.Buffer) | ||
| if _, err = buf.ReadFrom(resp.Body); err != nil { | ||
| return nil, fmt.Errorf("failed to read sampling strategies HTTP response body: %w", err) | ||
| } | ||
|
|
||
| if resp.StatusCode == http.StatusServiceUnavailable { | ||
| return nullJSON, nil | ||
| } | ||
| if resp.StatusCode != http.StatusOK { | ||
| return nil, fmt.Errorf( | ||
| "receiving %s while downloading strategies file: %s", | ||
| resp.Status, | ||
| buf.String(), | ||
| ) | ||
| } | ||
|
|
||
| return buf.Bytes(), nil | ||
| } | ||
|
|
||
| func isURL(str string) bool { | ||
| u, err := url.Parse(str) | ||
| return err == nil && u.Scheme != "" && u.Host != "" | ||
| } | ||
|
|
||
| func samplingStrategyLoader(strategiesFile string) strategyLoader { | ||
| if isURL(strategiesFile) { | ||
| return func() ([]byte, error) { | ||
| return downloadSamplingStrategies(strategiesFile) | ||
| } | ||
| } | ||
|
|
||
| return func() ([]byte, error) { | ||
| currBytes, err := ioutil.ReadFile(filepath.Clean(strategiesFile)) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to read strategies file %s: %w", strategiesFile, err) | ||
| } | ||
| return currBytes, nil | ||
| } | ||
| } | ||
|
|
||
| func (h *strategyStore) autoUpdateStrategies(interval time.Duration, loader strategyLoader) { | ||
| lastValue := string(nullJSON) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Technically, we could have already loaded something before calling this function, so lastValue would be different. But since it's a periodic check anyway, I think it's fine. |
||
| ticker := time.NewTicker(interval) | ||
| defer ticker.Stop() | ||
| for { | ||
| select { | ||
| case <-ticker.C: | ||
| lastValue = h.reloadSamplingStrategyFile(filePath, lastValue) | ||
| lastValue = h.reloadSamplingStrategy(loader, lastValue) | ||
| case <-h.ctx.Done(): | ||
| return | ||
| } | ||
| } | ||
| } | ||
|
|
||
| func (h *strategyStore) reloadSamplingStrategyFile(filePath string, lastValue string) string { | ||
| currBytes, err := ioutil.ReadFile(filepath.Clean(filePath)) | ||
| func (h *strategyStore) reloadSamplingStrategy(loadFn strategyLoader, lastValue string) string { | ||
| newValue, err := loadFn() | ||
| if err != nil { | ||
| h.logger.Error("failed to load sampling strategies", zap.String("file", filePath), zap.Error(err)) | ||
| h.logger.Error("failed to re-load sampling strategies", zap.Error(err)) | ||
| return lastValue | ||
| } | ||
| newValue := string(currBytes) | ||
| if lastValue == newValue { | ||
| if lastValue == string(newValue) { | ||
| return lastValue | ||
| } | ||
| if err = h.updateSamplingStrategy(currBytes); err != nil { | ||
| h.logger.Error("failed to update sampling strategies from file", zap.Error(err)) | ||
| if err := h.updateSamplingStrategy(newValue); err != nil { | ||
| h.logger.Error("failed to update sampling strategies", zap.Error(err)) | ||
| return lastValue | ||
| } | ||
| return newValue | ||
| return string(newValue) | ||
| } | ||
|
|
||
| func (h *strategyStore) updateSamplingStrategy(bytes []byte) error { | ||
|
|
@@ -125,24 +185,22 @@ func (h *strategyStore) updateSamplingStrategy(bytes []byte) error { | |
| } | ||
|
|
||
| // TODO good candidate for a global util function | ||
| func loadStrategies(strategiesFile string) (*strategies, error) { | ||
| if strategiesFile == "" { | ||
| return nil, nil | ||
| } | ||
| data, err := ioutil.ReadFile(strategiesFile) /* nolint #nosec , this comes from an admin, not user */ | ||
| func loadStrategies(loadFn strategyLoader) (*strategies, error) { | ||
| strategyBytes, err := loadFn() | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to open strategies file: %w", err) | ||
| return nil, err | ||
| } | ||
| var strategies strategies | ||
| if err := json.Unmarshal(data, &strategies); err != nil { | ||
|
|
||
| var strategies *strategies | ||
| if err := json.Unmarshal(strategyBytes, &strategies); err != nil { | ||
| return nil, fmt.Errorf("failed to unmarshal strategies: %w", err) | ||
| } | ||
| return &strategies, nil | ||
| return strategies, nil | ||
| } | ||
|
|
||
| func (h *strategyStore) parseStrategies(strategies *strategies) { | ||
| if strategies == nil { | ||
| h.logger.Info("No sampling strategies provided, using defaults") | ||
| h.logger.Info("No sampling strategies provided or URL is unavailable, using defaults") | ||
| return | ||
| } | ||
| newStore := defaultStrategies() | ||
|
|
||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.