-
Notifications
You must be signed in to change notification settings - Fork 692
Feature: Distributor usage trackers #4162
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
c17aa36
dd1c660
f631f66
9841b90
86a64d6
bbae991
b972253
4488a7b
40a3845
f35267e
83856bd
73f55a0
5ed9bb8
33fa586
40a0964
7e7e949
9c670ec
c7c2001
84225bf
61608f8
4f53dde
ba69c98
300f2db
95660e4
0941a04
4042829
eed21b5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,6 +5,7 @@ import ( | |
| "encoding/hex" | ||
| "fmt" | ||
| "math" | ||
| "net/http" | ||
| "sync" | ||
| "time" | ||
|
|
||
|
|
@@ -28,6 +29,7 @@ import ( | |
|
|
||
| "github.com/grafana/tempo/modules/distributor/forwarder" | ||
| "github.com/grafana/tempo/modules/distributor/receiver" | ||
| "github.com/grafana/tempo/modules/distributor/usage" | ||
| generator_client "github.com/grafana/tempo/modules/generator/client" | ||
| ingester_client "github.com/grafana/tempo/modules/ingester/client" | ||
| "github.com/grafana/tempo/modules/overrides" | ||
|
|
@@ -154,6 +156,8 @@ type Distributor struct { | |
| subservices *services.Manager | ||
| subservicesWatcher *services.FailureWatcher | ||
|
|
||
| usage *usage.Tracker | ||
|
|
||
| logger log.Logger | ||
| } | ||
|
|
||
|
|
@@ -214,6 +218,14 @@ func New(cfg Config, clientCfg ingester_client.Config, ingestersRing ring.ReadRi | |
| logger: logger, | ||
| } | ||
|
|
||
| if cfg.Usage.Enabled { | ||
| usage, err := usage.NewTracker(cfg.Usage, "cost-attribution", o.CostAttributionDimensions) | ||
|
joe-elliott marked this conversation as resolved.
Outdated
|
||
| if err != nil { | ||
| return nil, fmt.Errorf("creating usage tracker: %w", err) | ||
| } | ||
| d.usage = usage | ||
| } | ||
|
|
||
| var generatorsPoolFactory ring_client.PoolAddrFunc = func(addr string) (ring_client.PoolClient, error) { | ||
| return generator_client.New(addr, generatorClientCfg) | ||
| } | ||
|
|
@@ -328,6 +340,7 @@ func (d *Distributor) PushTraces(ctx context.Context, traces ptrace.Traces) (*te | |
| return &tempopb.PushResponse{}, nil | ||
| } | ||
| // check limits | ||
| // todo - usage tracker include discarded bytes? | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i believe the answer is no? we don't want it to included discarded?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To clarify this TODO is about creating separate metrics for discards. I think for now let's proceed without it. We can add it later if needed. |
||
| err = d.checkForRateLimits(size, spanCount, userID) | ||
| if err != nil { | ||
| return nil, err | ||
|
|
@@ -360,6 +373,11 @@ func (d *Distributor) PushTraces(ctx context.Context, traces ptrace.Traces) (*te | |
| statBytesReceived.Inc(int64(size)) | ||
| statSpansReceived.Inc(int64(spanCount)) | ||
|
|
||
| // Usage tracking | ||
| if d.usage != nil { | ||
| d.usage.Observe(userID, batches) | ||
| } | ||
|
|
||
| keys, rebatchedTraces, err := requestsByTraceID(batches, userID, spanCount) | ||
| if err != nil { | ||
| overrides.RecordDiscardedSpans(spanCount, reasonInternalError, userID) | ||
|
|
@@ -498,6 +516,14 @@ func (*Distributor) Check(_ context.Context, _ *grpc_health_v1.HealthCheckReques | |
| return &grpc_health_v1.HealthCheckResponse{Status: grpc_health_v1.HealthCheckResponse_SERVING}, nil | ||
| } | ||
|
|
||
| func (d *Distributor) UsageTrackerHandler() http.Handler { | ||
| if d.usage != nil { | ||
| return d.usage.Handler() | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| // requestsByTraceID takes an incoming tempodb.PushRequest and creates a set of keys for the hash ring | ||
| // and traces to pass onto the ingesters. | ||
| func requestsByTraceID(batches []*v1.ResourceSpans, userID string, spanCount int) ([]uint32, []*rebatchedTrace, error) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| package usage | ||
|
|
||
| import ( | ||
| "flag" | ||
| "time" | ||
| ) | ||
|
|
||
| const ( | ||
| defaultMaxCardinality = 1000 | ||
| defaultStaleDuration = 15 * time.Minute | ||
| defaultPurgePeriod = time.Minute | ||
| ) | ||
|
|
||
| type Config struct { | ||
| Enabled bool `yaml:"enabled,omitempty" json:"enabled,omitempty"` | ||
| MaxCardinality uint `yaml:"max_cardinality,omitempty" json:"max_cardinality,omitempty"` | ||
| StaleDuration time.Duration `yaml:"stale_duration,omitempty" json:"stale_duration,omitempty"` | ||
| PurgePeriod time.Duration `yaml:"purge_period,omitempty" json:"purge_period,omitempty"` | ||
| } | ||
|
|
||
| func (c *Config) RegisterFlagsAndApplyDefaults(_ string, _ *flag.FlagSet) { | ||
| c.Enabled = true | ||
|
mdisibio marked this conversation as resolved.
Outdated
|
||
| c.MaxCardinality = defaultMaxCardinality | ||
| c.StaleDuration = defaultStaleDuration | ||
| c.PurgePeriod = defaultPurgePeriod | ||
| } | ||
|
|
||
| func DefaultConfig() Config { | ||
| return Config{ | ||
| Enabled: true, | ||
| MaxCardinality: defaultMaxCardinality, | ||
| StaleDuration: defaultStaleDuration, | ||
| PurgePeriod: defaultPurgePeriod, | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.