-
Notifications
You must be signed in to change notification settings - Fork 822
Add nativeHistogram IngestionRate limit #6794
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
e7d86bc
be1d61b
0b4cb0d
822cb4d
f2e4c1f
55896c8
ff78e59
ab7a85c
a113044
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -95,7 +95,8 @@ type Distributor struct { | |
HATracker *ha.HATracker | ||
|
||
// Per-user rate limiter. | ||
ingestionRateLimiter *limiter.RateLimiter | ||
ingestionRateLimiter *limiter.RateLimiter | ||
nativeHistogramsIngestionRateLimiter *limiter.RateLimiter | ||
|
||
// Manager for subservices (HA Tracker, distributor ring and client pool) | ||
subservices *services.Manager | ||
|
@@ -267,11 +268,13 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove | |
// it's an internal dependency and can't join the distributors ring, we skip rate | ||
// limiting. | ||
var ingestionRateStrategy limiter.RateLimiterStrategy | ||
var nativeHistogramsIngestionRateStrategy limiter.RateLimiterStrategy | ||
var distributorsLifeCycler *ring.Lifecycler | ||
var distributorsRing *ring.Ring | ||
|
||
if !canJoinDistributorsRing { | ||
ingestionRateStrategy = newInfiniteIngestionRateStrategy() | ||
nativeHistogramsIngestionRateStrategy = newInfiniteIngestionRateStrategy() | ||
} else if limits.IngestionRateStrategy() == validation.GlobalIngestionRateStrategy { | ||
distributorsLifeCycler, err = ring.NewLifecycler(cfg.DistributorRing.ToLifecyclerConfig(), nil, "distributor", ringKey, true, true, log, prometheus.WrapRegistererWithPrefix("cortex_", reg)) | ||
if err != nil { | ||
|
@@ -285,21 +288,24 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove | |
subservices = append(subservices, distributorsLifeCycler, distributorsRing) | ||
|
||
ingestionRateStrategy = newGlobalIngestionRateStrategy(limits, distributorsLifeCycler) | ||
nativeHistogramsIngestionRateStrategy = newGlobalNativeHistogramsIngestionRateStrategy(limits, distributorsLifeCycler) | ||
} else { | ||
ingestionRateStrategy = newLocalIngestionRateStrategy(limits) | ||
nativeHistogramsIngestionRateStrategy = newLocalNativeHistogramsIngestionRateStrategy(limits) | ||
} | ||
|
||
d := &Distributor{ | ||
cfg: cfg, | ||
log: log, | ||
ingestersRing: ingestersRing, | ||
ingesterPool: NewPool(cfg.PoolConfig, ingestersRing, cfg.IngesterClientFactory, log), | ||
distributorsLifeCycler: distributorsLifeCycler, | ||
distributorsRing: distributorsRing, | ||
limits: limits, | ||
ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second), | ||
HATracker: haTracker, | ||
ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval), | ||
cfg: cfg, | ||
log: log, | ||
ingestersRing: ingestersRing, | ||
ingesterPool: NewPool(cfg.PoolConfig, ingestersRing, cfg.IngesterClientFactory, log), | ||
distributorsLifeCycler: distributorsLifeCycler, | ||
distributorsRing: distributorsRing, | ||
limits: limits, | ||
ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second), | ||
nativeHistogramsIngestionRateLimiter: limiter.NewRateLimiter(nativeHistogramsIngestionRateStrategy, 10*time.Second), | ||
HATracker: haTracker, | ||
ingestionRate: util_math.NewEWMARate(0.2, instanceIngestionRateTickInterval), | ||
|
||
queryDuration: instrument.NewHistogramCollector(promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ | ||
Namespace: "cortex", | ||
|
@@ -774,16 +780,29 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co | |
|
||
totalSamples := validatedFloatSamples + validatedHistogramSamples | ||
totalN := totalSamples + validatedExemplars + len(validatedMetadata) | ||
if !d.ingestionRateLimiter.AllowN(now, userID, totalN) { | ||
|
||
nhRateLimited := !d.nativeHistogramsIngestionRateLimiter.AllowN(now, userID, validatedHistogramSamples) | ||
rateLimited := !d.ingestionRateLimiter.AllowN(now, userID, totalN) | ||
|
||
// Return a 429 here to tell the client it is going too fast. | ||
// Client may discard the data or slow down and re-send. | ||
// Prometheus v2.26 added a remote-write option 'retry_on_http_429'. | ||
if nhRateLimited { | ||
// Ensure the request slice is reused if the request is rate limited. | ||
cortexpb.ReuseSlice(req.Timeseries) | ||
d.validateMetrics.DiscardedSamples.WithLabelValues(validation.NativeHistogramsRateLimited, userID).Add(float64(totalSamples)) | ||
d.validateMetrics.DiscardedExemplars.WithLabelValues(validation.NativeHistogramsRateLimited, userID).Add(float64(validatedExemplars)) | ||
d.validateMetrics.DiscardedMetadata.WithLabelValues(validation.NativeHistogramsRateLimited, userID).Add(float64(len(validatedMetadata))) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are we always returning There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks very much. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why we need to drop all samples, exemplars and metadata if native histograms are rate limited? I think it doesn't make sense for this limit to impact the existing ingestion rate limit if NH limit is set very small but there is still big room for the default ingestion rate There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1. we can just block NH There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is still the same. no? Ben suggested we drop only There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a use case for ingesting partial samples? I feel it's simpler to drop everything. Also, we don't do partial ingestion for float samples. For example, in a remote write request with 10K samples, even if ingesting 9K samples is within the rate limit, we still reject the entire request. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For me this seems a different type of rejection. For the ones we have today is a global limiter which if we want to accept until the limit we need to "choose" what goes through and what is rejected. This seems more complicated. For a new limit as specific for nh we can just limit nh samples at all even if we could accept part of it, but allow the rest to go. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks. Updated now. Even when the NH samples are dropped by the NHRateLimiter, this PR does not reduce the Incase of NHRateLimiting, this CR doesn't return any error message or 429 to Client. It only publishes the discarded samples metric with label value exclusive for NHRateLimiter. |
||
|
||
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "nativeHistograms ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.nativeHistogramsIngestionRateLimiter.Limit(now, userID), totalSamples, len(validatedMetadata)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's not use camel case for
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks. I have updated now. |
||
} | ||
if rateLimited { | ||
// Ensure the request slice is reused if the request is rate limited. | ||
cortexpb.ReuseSlice(req.Timeseries) | ||
d.validateMetrics.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(totalSamples)) | ||
d.validateMetrics.DiscardedExemplars.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedExemplars)) | ||
d.validateMetrics.DiscardedMetadata.WithLabelValues(validation.RateLimited, userID).Add(float64(len(validatedMetadata))) | ||
// Return a 429 here to tell the client it is going too fast. | ||
// Client may discard the data or slow down and re-send. | ||
// Prometheus v2.26 added a remote-write option 'retry_on_http_429'. | ||
|
||
return nil, httpgrpc.Errorf(http.StatusTooManyRequests, "ingestion rate limit (%v) exceeded while adding %d samples and %d metadata", d.ingestionRateLimiter.Limit(now, userID), totalSamples, len(validatedMetadata)) | ||
yeya24 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am worry that we are deploying this as 25k default. This can cause impact for users who are using NH and override their ingester_rate but are not aware of new limit when deploying. i think i prefer this being 0 which means disabled as default. It will be limited by ingestion_rate if disabled by default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks very much Daniel. I have now implemented this.