diff --git a/CHANGELOG.md b/CHANGELOG.md index 7101ea813cb..15aefeeac6b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ * [CHANGE] TraceQL/Structural operators performance improvement. [#3088](https://github.com/grafana/tempo/pull/3088) (@joe-elliott) * [CHANGE] Merge the processors overrides set through runtime overrides and user-configurable overrides [#3125](https://github.com/grafana/tempo/pull/3125) (@kvrhdn) * [FEATURE] Introduce list_blocks_concurrency on GCS and S3 backends to control backend load and performance. [#2652](https://github.com/grafana/tempo/pull/2652) (@zalegrala) +* [FEATURE] Add per-tenant compaction window [#3129](https://github.com/grafana/tempo/pull/3129) (@zalegrala) * [BUGFIX] Include statusMessage intrinsic attribute in tag search. [#3084](https://github.com/grafana/tempo/pull/3084) (@rcrowe) * [ENHANCEMENT] Update poller to make use of previous results and reduce backend load. [#2652](https://github.com/grafana/tempo/pull/2652) (@zalegrala) diff --git a/docs/sources/tempo/configuration/_index.md b/docs/sources/tempo/configuration/_index.md index 600197d4afa..11bc91548e6 100644 --- a/docs/sources/tempo/configuration/_index.md +++ b/docs/sources/tempo/configuration/_index.md @@ -1323,6 +1323,9 @@ overrides: # Per-user block retention. If this value is set to 0 (default), # then block_retention in the compactor configuration is used. [block_retention: | default = 0s] + # Per-user compaction window. If this value is set to 0 (default), + # then block_retention in the compactor configuration is used. + [compaction_window: | default = 0s] # Metrics-generator related overrides metrics_generator: diff --git a/modules/compactor/compactor.go b/modules/compactor/compactor.go index c53bb303958..b79c84ac132 100644 --- a/modules/compactor/compactor.go +++ b/modules/compactor/compactor.go @@ -258,6 +258,10 @@ func (c *Compactor) MaxBytesPerTraceForTenant(tenantID string) int { return c.overrides.MaxBytesPerTrace(tenantID) } +func (c *Compactor) MaxCompactionRangeForTenant(tenantID string) time.Duration { + return c.overrides.MaxCompactionRange(tenantID) +} + func (c *Compactor) isSharded() bool { return c.cfg.ShardingRing.KVStore.Store != "" } diff --git a/modules/overrides/config.go b/modules/overrides/config.go index 7957228f68a..7fcb2590c24 100644 --- a/modules/overrides/config.go +++ b/modules/overrides/config.go @@ -8,9 +8,10 @@ import ( "github.com/grafana/tempo/tempodb/backend" + "github.com/prometheus/client_golang/prometheus" + "github.com/grafana/tempo/pkg/sharedconfig" filterconfig "github.com/grafana/tempo/pkg/spanfilter/config" - "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" ) @@ -44,6 +45,7 @@ const ( MetricIngestionRateLimitBytes = "ingestion_rate_limit_bytes" MetricIngestionBurstSizeBytes = "ingestion_burst_size_bytes" MetricBlockRetention = "block_retention" + MetricCompactionWindow = "compaction_window" MetricMetricsGeneratorMaxActiveSeries = "metrics_generator_max_active_series" MetricsGeneratorDryRunEnabled = "metrics_generator_dry_run_enabled" ) @@ -129,7 +131,8 @@ type ReadOverrides struct { type CompactionOverrides struct { // Compactor enforced overrides. - BlockRetention model.Duration `yaml:"block_retention,omitempty" json:"block_retention,omitempty"` + BlockRetention model.Duration `yaml:"block_retention,omitempty" json:"block_retention,omitempty"` + CompactionWindow model.Duration `yaml:"compaction_window,omitempty" json:"compaction_window,omitempty"` } type GlobalOverrides struct { diff --git a/modules/overrides/config_legacy.go b/modules/overrides/config_legacy.go index 66a4144a4fc..442f14f798b 100644 --- a/modules/overrides/config_legacy.go +++ b/modules/overrides/config_legacy.go @@ -48,7 +48,8 @@ func (c *Overrides) toLegacy() LegacyOverrides { MetricsGeneratorProcessorLocalBlocksCompleteBlockTimeout: c.MetricsGenerator.Processor.LocalBlocks.CompleteBlockTimeout, MetricsGeneratorIngestionSlack: c.MetricsGenerator.IngestionSlack, - BlockRetention: c.Compaction.BlockRetention, + BlockRetention: c.Compaction.BlockRetention, + CompactionWindow: c.Compaction.CompactionWindow, MaxBytesPerTagValuesQuery: c.Read.MaxBytesPerTagValuesQuery, MaxBlocksPerTagValuesQuery: c.Read.MaxBlocksPerTagValuesQuery, @@ -103,7 +104,8 @@ type LegacyOverrides struct { MetricsGeneratorIngestionSlack time.Duration `yaml:"metrics_generator_ingestion_time_range_slack" json:"metrics_generator_ingestion_time_range_slack"` // Compactor enforced limits. - BlockRetention model.Duration `yaml:"block_retention" json:"block_retention"` + BlockRetention model.Duration `yaml:"block_retention" json:"block_retention"` + CompactionWindow model.Duration `yaml:"compaction_window" json:"compaction_window"` // Querier and Ingester enforced limits. MaxBytesPerTagValuesQuery int `yaml:"max_bytes_per_tag_values_query" json:"max_bytes_per_tag_values_query"` @@ -135,7 +137,8 @@ func (l *LegacyOverrides) toNewLimits() Overrides { MaxSearchDuration: l.MaxSearchDuration, }, Compaction: CompactionOverrides{ - BlockRetention: l.BlockRetention, + BlockRetention: l.BlockRetention, + CompactionWindow: l.CompactionWindow, }, MetricsGenerator: MetricsGeneratorOverrides{ RingSize: l.MetricsGeneratorRingSize, diff --git a/modules/overrides/config_test.go b/modules/overrides/config_test.go index d4df243a128..7d0b8e0ede3 100644 --- a/modules/overrides/config_test.go +++ b/modules/overrides/config_test.go @@ -45,6 +45,7 @@ max_global_traces_per_user: 1000 max_bytes_per_trace: 100_000 block_retention: 24h +compaction_window: 4h per_tenant_override_config: /etc/Overrides.yaml per_tenant_override_period: 1m @@ -65,6 +66,7 @@ max_search_duration: 5m "max_bytes_per_trace": 100000, "block_retention": "24h", + "compaction_window": "4h", "per_tenant_override_config": "/etc/Overrides.yaml", "per_tenant_override_period": "1m", diff --git a/modules/overrides/interface.go b/modules/overrides/interface.go index cf9d6f8bfe3..3af54ad44c4 100644 --- a/modules/overrides/interface.go +++ b/modules/overrides/interface.go @@ -26,6 +26,7 @@ type Interface interface { MaxLocalTracesPerUser(userID string) int MaxGlobalTracesPerUser(userID string) int MaxBytesPerTrace(userID string) int + MaxCompactionRange(userID string) time.Duration Forwarders(userID string) []string MaxBytesPerTagValuesQuery(userID string) int MaxBlocksPerTagValuesQuery(userID string) int diff --git a/modules/overrides/runtime_config_overrides.go b/modules/overrides/runtime_config_overrides.go index cd8d2f7883a..a8b9a94c760 100644 --- a/modules/overrides/runtime_config_overrides.go +++ b/modules/overrides/runtime_config_overrides.go @@ -261,6 +261,11 @@ func (o *runtimeConfigOverridesManager) MaxGlobalTracesPerUser(userID string) in return o.getOverridesForUser(userID).Ingestion.MaxGlobalTracesPerUser } +// MaxCompactionRange returns the maximum compaction window for this tenant. +func (o *runtimeConfigOverridesManager) MaxCompactionRange(userID string) time.Duration { + return time.Duration(o.getOverridesForUser(userID).Compaction.CompactionWindow) +} + // IngestionRateLimitBytes is the number of spans per second allowed for this tenant. func (o *runtimeConfigOverridesManager) IngestionRateLimitBytes(userID string) float64 { return float64(o.getOverridesForUser(userID).Ingestion.RateLimitBytes) diff --git a/tempodb/compactor.go b/tempodb/compactor.go index f6d4b712b6c..f4e264ac763 100644 --- a/tempodb/compactor.go +++ b/tempodb/compactor.go @@ -107,6 +107,11 @@ func (rw *readerWriter) doCompaction(ctx context.Context) { // Get the meta file of all non-compacted blocks for the given tenant blocklist := rw.blocklist.Metas(tenantID) + window := rw.compactorOverrides.MaxCompactionRangeForTenant(tenantID) + if window == 0 { + window = rw.compactorCfg.MaxCompactionRange + } + // Select which blocks to compact. // // Blocks are firstly divided by the active compaction window (default: most recent 24h) @@ -115,7 +120,7 @@ func (rw *readerWriter) doCompaction(ctx context.Context) { // 2. If blocks are outside the active window, they're grouped only by windows, ignoring compaction level. // It picks more recent windows first, and compacting blocks only from the same tenant. blockSelector := newTimeWindowBlockSelector(blocklist, - rw.compactorCfg.MaxCompactionRange, + window, rw.compactorCfg.MaxCompactionObjects, rw.compactorCfg.MaxBlockBytes, defaultMinInputBlocks, diff --git a/tempodb/compactor_test.go b/tempodb/compactor_test.go index 90ce11e6e56..e6c8d6c9f66 100644 --- a/tempodb/compactor_test.go +++ b/tempodb/compactor_test.go @@ -46,8 +46,9 @@ type mockJobSharder struct{} func (m *mockJobSharder) Owns(string) bool { return true } type mockOverrides struct { - blockRetention time.Duration - maxBytesPerTrace int + blockRetention time.Duration + maxBytesPerTrace int + maxCompactionWindow time.Duration } func (m *mockOverrides) BlockRetentionForTenant(_ string) time.Duration { @@ -58,6 +59,10 @@ func (m *mockOverrides) MaxBytesPerTraceForTenant(_ string) int { return m.maxBytesPerTrace } +func (m *mockOverrides) MaxCompactionRangeForTenant(_ string) time.Duration { + return m.maxCompactionWindow +} + func TestCompactionRoundtrip(t *testing.T) { for _, enc := range encoding.AllEncodings() { version := enc.Version() diff --git a/tempodb/tempodb.go b/tempodb/tempodb.go index 8008697b504..4e0dc318da3 100644 --- a/tempodb/tempodb.go +++ b/tempodb/tempodb.go @@ -96,6 +96,7 @@ type CompactorSharder interface { type CompactorOverrides interface { BlockRetentionForTenant(tenantID string) time.Duration MaxBytesPerTraceForTenant(tenantID string) int + MaxCompactionRangeForTenant(tenantID string) time.Duration } type WriteableBlock interface {