Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion modules/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,10 +280,14 @@ func (d *Distributor) checkForRateLimits(tracesSize, spanCount int, userID strin
now := time.Now()
if !d.ingestionRateLimiter.AllowN(now, userID, tracesSize) {
overrides.RecordDiscardedSpans(spanCount, reasonRateLimited, userID)
limit := int(d.ingestionRateLimiter.Limit(now, userID))
if d.overrides.IngestionRateStrategy() == overrides.GlobalIngestionRateStrategy {
limit = limit * d.DistributorRing.InstancesCount()
}
return status.Errorf(codes.ResourceExhausted,
"%s: ingestion rate limit (%d bytes) exceeded while adding %d bytes for user %s",
overrides.ErrorPrefixRateLimited,
int(d.ingestionRateLimiter.Limit(now, userID)),
limit,
tracesSize, userID)
}

Expand Down
3 changes: 3 additions & 0 deletions modules/overrides/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ const (
// GlobalIngestionRateStrategy indicates that an attempt should be made to consider this limit across the entire Tempo cluster
GlobalIngestionRateStrategy = "global"

// IngestionRateUser since ingestion rate strategy is not per tenant config
IngestionRateUserVariable = "ingestion rate strategy user variable"

// ErrorPrefixLiveTracesExceeded is used to flag batches from the ingester that were rejected b/c they had too many traces
ErrorPrefixLiveTracesExceeded = "LIVE_TRACES_EXCEEDED"
// ErrorPrefixTraceTooLarge is used to flag batches from the ingester that were rejected b/c they exceeded the single trace limit
Expand Down
6 changes: 3 additions & 3 deletions modules/overrides/runtime_config_overrides.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,8 @@ func (o *runtimeConfigOverridesManager) GetRuntimeOverridesFor(userID string) *O
// to each distributor instance (local) or evenly shared across the cluster (global).
func (o *runtimeConfigOverridesManager) IngestionRateStrategy() string {
// The ingestion rate strategy can't be overridden on a per-tenant basis,
// so here we just pick the value for a not-existing user ID (empty string).
return o.getOverridesForUser("").Ingestion.RateStrategy
// so here we just pass this specific variable to fetch the defaults value.
return o.getOverridesForUser(IngestionRateUserVariable).Ingestion.RateStrategy
Comment thread
yvrhdn marked this conversation as resolved.
Outdated
}

// MaxLocalTracesPerUser returns the maximum number of traces a user is allowed to store
Expand Down Expand Up @@ -484,7 +484,7 @@ func (o *runtimeConfigOverridesManager) DedicatedColumns(userID string) backend.
}

func (o *runtimeConfigOverridesManager) getOverridesForUser(userID string) *Overrides {
if tenantOverrides := o.tenantOverrides(); tenantOverrides != nil {
if tenantOverrides := o.tenantOverrides(); tenantOverrides != nil && userID != IngestionRateUserVariable {
l := tenantOverrides.forUser(userID)
if l != nil {
return l
Expand Down