Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
* [CHANGE] Command tempo-cli analyse block(s) excludes attributes with array values [#5380](https://github.com/grafana/tempo/pull/5380) (@stoewer)
* [CHANGE] **BREAKING CHANGE** Drop unused `backend_scheduler.tenant_measurement_interval`, use `backend_scheduler.compaction.measure_interval` instead. [#5328](https://github.com/grafana/tempo/pull/5328) (@zalegrala)
* [CHANGE] Allow configuration of min/max input blocks for compaction provider. [#5373](https://github.com/grafana/tempo/pull/5373) (@zalegrala)
* [CHANGE] **BREAKING CHANGE** Add require minimum time between tenant sorting in backend-scheduler. [#5410](https://github.com/grafana/tempo/pull/5410) (@zalegrala)
The configuration for `backend_scheduler.provider.compaction.backoff` has been removed.
Additionally the `compaction_tenant_backoff_total` metric has been renamed to `compaction_empty_tenant_cycle_total` for clarity.
* [FEATURE] Add histograms `spans_distance_in_future_seconds` / `spans_distance_in_past_seconds` that count spans with end timestamp in the future / past. While spans in the future are accepted, they are invalid and may not be found using the Search API. [#4936](https://github.com/grafana/tempo/pull/4936) (@carles-grafana)
* [FEATURE] Add MCP Server support. [#5212](https://github.com/grafana/tempo/pull/5212) (@joe-elliott)
* [FEATURE] Add counter `query_frontend_bytes_inspected_total`, which shows the total number of bytes read from disk and object storage [#5310](https://github.com/grafana/tempo/pull/5310) (@carles-grafana)
Expand Down
5 changes: 1 addition & 4 deletions docs/sources/tempo/configuration/manifest.md
Original file line number Diff line number Diff line change
Expand Up @@ -1006,12 +1006,9 @@ backend_scheduler:
max_time_per_tenant: 5m0s
compaction_cycle: 30s
max_jobs_per_tenant: 1000
backoff:
min_period: 100ms
max_period: 10s
max_retries: 0
min_input_blocks: 2
max_input_blocks: 4
min_cycle_interval: 30s
job_timeout: 15s
local_work_path: /var/tempo
backend_scheduler_client:
Expand Down
47 changes: 29 additions & 18 deletions modules/backendscheduler/provider/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/google/uuid"
"github.com/grafana/dskit/backoff"
"github.com/grafana/tempo/modules/backendscheduler/work"
"github.com/grafana/tempo/modules/backendscheduler/work/tenantselector"
"github.com/grafana/tempo/modules/overrides"
Expand All @@ -31,9 +30,9 @@ type CompactionConfig struct {
MeasureInterval time.Duration `yaml:"measure_interval"`
Compactor tempodb.CompactorConfig `yaml:"compaction"`
MaxJobsPerTenant int `yaml:"max_jobs_per_tenant"`
Backoff backoff.Config `yaml:"backoff"`
MinInputBlocks int `yaml:"min_input_blocks"`
MaxInputBlocks int `yaml:"max_input_blocks"`
MinCycleInterval time.Duration `yaml:"min_cycle_interval"`
}

func (cfg *CompactionConfig) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) {
Expand All @@ -44,10 +43,8 @@ func (cfg *CompactionConfig) RegisterFlagsAndApplyDefaults(prefix string, f *fla
f.IntVar(&cfg.MinInputBlocks, prefix+".min-input-blocks", blockselector.DefaultMinInputBlocks, "Minimum number of blocks to compact in a single job.")
f.IntVar(&cfg.MaxInputBlocks, prefix+".max-input-blocks", blockselector.DefaultMaxInputBlocks, "Maximum number of blocks to compact in a single job.")

// Backoff
f.DurationVar(&cfg.Backoff.MinBackoff, prefix+".backoff-min-period", 100*time.Millisecond, "Minimum delay when backing off.")
f.DurationVar(&cfg.Backoff.MaxBackoff, prefix+".backoff-max-period", 10*time.Second, "Maximum delay when backing off.")
cfg.Backoff.MaxRetries = 0
// Tenant prioritization
f.DurationVar(&cfg.MinCycleInterval, prefix+".min-cycle-interval", 30*time.Second, "Minimum time between tenant prioritization cycles to prevent excessive CPU usage when no work is available.")

cfg.Compactor = tempodb.CompactorConfig{}
cfg.Compactor.RegisterFlagsAndApplyDefaults(util.PrefixConfig(prefix, "compaction"), f)
Expand All @@ -65,9 +62,10 @@ type CompactionProvider struct {
sched Scheduler

// Dependencies needed for tenant selection
curPriority *tenantselector.PriorityQueue
curTenant *tenantselector.Item
curSelector blockselector.CompactionBlockSelector
curPriority *tenantselector.PriorityQueue
curTenant *tenantselector.Item
curSelector blockselector.CompactionBlockSelector
lastPrioritizeTime time.Time
}

func NewCompactionProvider(
Expand Down Expand Up @@ -97,7 +95,6 @@ func (p *CompactionProvider) Start(ctx context.Context) <-chan *work.Job {

var (
job *work.Job
b = backoff.New(ctx, p.cfg.Backoff)
curTenantJobCount int
span trace.Span
loopCtx context.Context
Expand Down Expand Up @@ -129,12 +126,12 @@ func (p *CompactionProvider) Start(ctx context.Context) <-chan *work.Job {
}

if p.curSelector == nil {
b.Wait()
if !p.prepareNextTenant(loopCtx) {
level.Info(p.logger).Log("msg", "received empty tenant", "waiting", b.NextDelay())
metricTenantBackoff.Inc()
span.AddEvent("tenant not prepared")
level.Info(p.logger).Log("msg", "received empty tenant")
metricEmptyTenantCycle.Inc()
span.AddEvent("no tenant selected")
}

continue
}

Expand All @@ -147,11 +144,9 @@ func (p *CompactionProvider) Start(ctx context.Context) <-chan *work.Job {

job = p.createJob(loopCtx)
if job == nil {
level.Info(p.logger).Log("msg", "tenant exhausted, skipping to next tenant after delay", "waiting", b.NextDelay())
level.Info(p.logger).Log("msg", "tenant exhausted")
// we don't have a job, reset the curTenant and try again
metricTenantEmptyJob.Inc()
// Avoid CPU spin
b.Wait()
reset()
continue
}
Expand All @@ -165,7 +160,6 @@ func (p *CompactionProvider) Start(ctx context.Context) <-chan *work.Job {
case jobs <- job:
metricJobsCreated.WithLabelValues(p.curTenant.Value()).Inc()
curTenantJobCount++
b.Reset()
span.AddEvent("job created", trace.WithAttributes(
attribute.String("job_id", job.ID),
attribute.String("tenant_id", p.curTenant.Value()),
Expand Down Expand Up @@ -200,7 +194,22 @@ func (p *CompactionProvider) prepareNextTenant(ctx context.Context) bool {
defer span.End()

if p.curPriority.Len() == 0 {
// Rate limit calls to prioritizeTenants to prevent excessive CPU usage
// when cycling through tenants with no available work. We only expect new
// work for tenants after a the next blocklist poll.
if elapsed := time.Since(p.lastPrioritizeTime); elapsed < p.cfg.MinCycleInterval {
Comment thread
zalegrala marked this conversation as resolved.
waitTime := p.cfg.MinCycleInterval - elapsed
level.Debug(p.logger).Log("msg", "rate limiting tenant prioritization", "wait_time", waitTime)
select {
case <-ctx.Done():
return false
case <-time.After(waitTime):
// Continue to prioritizeTenants
}
}

p.prioritizeTenants(ctx)
p.lastPrioritizeTime = time.Now()
if p.curPriority.Len() == 0 {
return false
}
Expand All @@ -212,6 +221,8 @@ func (p *CompactionProvider) prepareNextTenant(ctx context.Context) bool {
return false
}

level.Info(p.logger).Log("msg", "new tenant selected", "tenant_id", p.curTenant.Value())

p.curSelector, _ = p.newBlockSelector(p.curTenant.Value())
return true
}
Expand Down
6 changes: 2 additions & 4 deletions modules/backendscheduler/provider/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ func TestCompactionProvider(t *testing.T) {
cfg.RegisterFlagsAndApplyDefaults("", &flag.FlagSet{})
cfg.MaxJobsPerTenant = 2
cfg.MeasureInterval = 100 * time.Millisecond
cfg.Backoff.MinBackoff = 10 * time.Millisecond
cfg.Backoff.MaxBackoff = 100 * time.Millisecond
cfg.MinCycleInterval = 100 * time.Millisecond

tmpDir := t.TempDir()

Expand Down Expand Up @@ -104,8 +103,7 @@ func TestCompactionProvider_EmptyStart(t *testing.T) {
cfg.RegisterFlagsAndApplyDefaults("", &flag.FlagSet{})
cfg.MaxJobsPerTenant = 1
cfg.MeasureInterval = 100 * time.Millisecond
cfg.Backoff.MinBackoff = 30 * time.Millisecond // twice the poll cycle
cfg.Backoff.MaxBackoff = 50 * time.Millisecond
cfg.MinCycleInterval = 100 * time.Millisecond

tmpDir := t.TempDir()

Expand Down
4 changes: 2 additions & 2 deletions modules/backendscheduler/provider/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ func ValidateConfig(cfg *Config) error {
return fmt.Errorf("max_jobs_per_tenant must be greater than 0")
}

if cfg.Compaction.Backoff.MaxRetries != 0 {
return fmt.Errorf("max_retries must be 0, since it is not respected")
if cfg.Compaction.MinCycleInterval <= 0 {
return fmt.Errorf("min_cycle_interval must be greater than 0, and should be at least half the blocklist_poll cycle for general use")
}

if cfg.Compaction.MeasureInterval <= 0 {
Expand Down
6 changes: 3 additions & 3 deletions modules/backendscheduler/provider/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ var (
Name: "compaction_tenant_reset_total",
Help: "The number of times the tenant is changed",
}, []string{"tenant"})
metricTenantBackoff = promauto.NewCounter(prometheus.CounterOpts{
metricEmptyTenantCycle = promauto.NewCounter(prometheus.CounterOpts{
Namespace: "tempo_backend_scheduler",
Name: "compaction_tenant_backoff_total",
Help: "The number of times the backoff is triggered",
Name: "compaction_empty_tenant_cycle_total",
Help: "The number of compaction cycles where no tenant had work available",
})
metricTenantEmptyJob = promauto.NewCounter(prometheus.CounterOpts{
Namespace: "tempo_backend_scheduler",
Expand Down
Loading