Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* [CHANGE] TraceQL/Structural operators performance improvement. [#3088](https://github.com/grafana/tempo/pull/3088) (@joe-elliott)
* [CHANGE] Merge the processors overrides set through runtime overrides and user-configurable overrides [#3125](https://github.com/grafana/tempo/pull/3125) (@kvrhdn)
* [FEATURE] Introduce list_blocks_concurrency on GCS and S3 backends to control backend load and performance. [#2652](https://github.com/grafana/tempo/pull/2652) (@zalegrala)
* [FEATURE] Add per-tenant compaction window [#3129](https://github.com/grafana/tempo/pull/3129) (@zalegrala)
* [BUGFIX] Include statusMessage intrinsic attribute in tag search. [#3084](https://github.com/grafana/tempo/pull/3084) (@rcrowe)
* [ENHANCEMENT] Update poller to make use of previous results and reduce backend load. [#2652](https://github.com/grafana/tempo/pull/2652) (@zalegrala)

Expand Down
3 changes: 3 additions & 0 deletions docs/sources/tempo/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1323,6 +1323,9 @@ overrides:
# Per-user block retention. If this value is set to 0 (default),
# then block_retention in the compactor configuration is used.
[block_retention: <duration> | default = 0s]
# Per-user compaction window. If this value is set to 0 (default),
# then block_retention in the compactor configuration is used.
[compaction_window: <duration> | default = 0s]

# Metrics-generator related overrides
metrics_generator:
Expand Down
4 changes: 4 additions & 0 deletions modules/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,10 @@ func (c *Compactor) MaxBytesPerTraceForTenant(tenantID string) int {
return c.overrides.MaxBytesPerTrace(tenantID)
}

func (c *Compactor) MaxCompactionRangeForTenant(tenantID string) time.Duration {
return c.overrides.MaxCompactionRange(tenantID)
}

func (c *Compactor) isSharded() bool {
return c.cfg.ShardingRing.KVStore.Store != ""
}
Expand Down
7 changes: 5 additions & 2 deletions modules/overrides/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ import (

"github.com/grafana/tempo/tempodb/backend"

"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/tempo/pkg/sharedconfig"
filterconfig "github.com/grafana/tempo/pkg/spanfilter/config"
"github.com/prometheus/client_golang/prometheus"

"github.com/prometheus/common/model"
)
Expand Down Expand Up @@ -44,6 +45,7 @@ const (
MetricIngestionRateLimitBytes = "ingestion_rate_limit_bytes"
MetricIngestionBurstSizeBytes = "ingestion_burst_size_bytes"
MetricBlockRetention = "block_retention"
MetricCompactionWindow = "compaction_window"
MetricMetricsGeneratorMaxActiveSeries = "metrics_generator_max_active_series"
MetricsGeneratorDryRunEnabled = "metrics_generator_dry_run_enabled"
)
Expand Down Expand Up @@ -129,7 +131,8 @@ type ReadOverrides struct {

type CompactionOverrides struct {
// Compactor enforced overrides.
BlockRetention model.Duration `yaml:"block_retention,omitempty" json:"block_retention,omitempty"`
BlockRetention model.Duration `yaml:"block_retention,omitempty" json:"block_retention,omitempty"`
CompactionWindow model.Duration `yaml:"compaction_window,omitempty" json:"compaction_window,omitempty"`
}

type GlobalOverrides struct {
Expand Down
9 changes: 6 additions & 3 deletions modules/overrides/config_legacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ func (c *Overrides) toLegacy() LegacyOverrides {
MetricsGeneratorProcessorLocalBlocksCompleteBlockTimeout: c.MetricsGenerator.Processor.LocalBlocks.CompleteBlockTimeout,
MetricsGeneratorIngestionSlack: c.MetricsGenerator.IngestionSlack,

BlockRetention: c.Compaction.BlockRetention,
BlockRetention: c.Compaction.BlockRetention,
CompactionWindow: c.Compaction.CompactionWindow,

MaxBytesPerTagValuesQuery: c.Read.MaxBytesPerTagValuesQuery,
MaxBlocksPerTagValuesQuery: c.Read.MaxBlocksPerTagValuesQuery,
Expand Down Expand Up @@ -103,7 +104,8 @@ type LegacyOverrides struct {
MetricsGeneratorIngestionSlack time.Duration `yaml:"metrics_generator_ingestion_time_range_slack" json:"metrics_generator_ingestion_time_range_slack"`

// Compactor enforced limits.
BlockRetention model.Duration `yaml:"block_retention" json:"block_retention"`
BlockRetention model.Duration `yaml:"block_retention" json:"block_retention"`
CompactionWindow model.Duration `yaml:"compaction_window" json:"compaction_window"`

// Querier and Ingester enforced limits.
MaxBytesPerTagValuesQuery int `yaml:"max_bytes_per_tag_values_query" json:"max_bytes_per_tag_values_query"`
Expand Down Expand Up @@ -135,7 +137,8 @@ func (l *LegacyOverrides) toNewLimits() Overrides {
MaxSearchDuration: l.MaxSearchDuration,
},
Compaction: CompactionOverrides{
BlockRetention: l.BlockRetention,
BlockRetention: l.BlockRetention,
CompactionWindow: l.CompactionWindow,
},
MetricsGenerator: MetricsGeneratorOverrides{
RingSize: l.MetricsGeneratorRingSize,
Expand Down
2 changes: 2 additions & 0 deletions modules/overrides/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ max_global_traces_per_user: 1000
max_bytes_per_trace: 100_000

block_retention: 24h
compaction_window: 4h

per_tenant_override_config: /etc/Overrides.yaml
per_tenant_override_period: 1m
Expand All @@ -65,6 +66,7 @@ max_search_duration: 5m
"max_bytes_per_trace": 100000,

"block_retention": "24h",
"compaction_window": "4h",

"per_tenant_override_config": "/etc/Overrides.yaml",
"per_tenant_override_period": "1m",
Expand Down
1 change: 1 addition & 0 deletions modules/overrides/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type Interface interface {
MaxLocalTracesPerUser(userID string) int
MaxGlobalTracesPerUser(userID string) int
MaxBytesPerTrace(userID string) int
MaxCompactionRange(userID string) time.Duration
Forwarders(userID string) []string
MaxBytesPerTagValuesQuery(userID string) int
MaxBlocksPerTagValuesQuery(userID string) int
Expand Down
5 changes: 5 additions & 0 deletions modules/overrides/runtime_config_overrides.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,11 @@ func (o *runtimeConfigOverridesManager) MaxGlobalTracesPerUser(userID string) in
return o.getOverridesForUser(userID).Ingestion.MaxGlobalTracesPerUser
}

// MaxCompactionRange returns the maximum compaction window for this tenant.
func (o *runtimeConfigOverridesManager) MaxCompactionRange(userID string) time.Duration {
return time.Duration(o.getOverridesForUser(userID).Compaction.CompactionWindow)
}

// IngestionRateLimitBytes is the number of spans per second allowed for this tenant.
func (o *runtimeConfigOverridesManager) IngestionRateLimitBytes(userID string) float64 {
return float64(o.getOverridesForUser(userID).Ingestion.RateLimitBytes)
Expand Down
7 changes: 6 additions & 1 deletion tempodb/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ func (rw *readerWriter) doCompaction(ctx context.Context) {
// Get the meta file of all non-compacted blocks for the given tenant
blocklist := rw.blocklist.Metas(tenantID)

window := rw.compactorOverrides.MaxCompactionRangeForTenant(tenantID)
if window == 0 {
window = rw.compactorCfg.MaxCompactionRange
}

// Select which blocks to compact.
//
// Blocks are firstly divided by the active compaction window (default: most recent 24h)
Expand All @@ -115,7 +120,7 @@ func (rw *readerWriter) doCompaction(ctx context.Context) {
// 2. If blocks are outside the active window, they're grouped only by windows, ignoring compaction level.
// It picks more recent windows first, and compacting blocks only from the same tenant.
blockSelector := newTimeWindowBlockSelector(blocklist,
rw.compactorCfg.MaxCompactionRange,
window,
rw.compactorCfg.MaxCompactionObjects,
rw.compactorCfg.MaxBlockBytes,
defaultMinInputBlocks,
Expand Down
9 changes: 7 additions & 2 deletions tempodb/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ type mockJobSharder struct{}
func (m *mockJobSharder) Owns(string) bool { return true }

type mockOverrides struct {
blockRetention time.Duration
maxBytesPerTrace int
blockRetention time.Duration
maxBytesPerTrace int
maxCompactionWindow time.Duration
}

func (m *mockOverrides) BlockRetentionForTenant(_ string) time.Duration {
Expand All @@ -58,6 +59,10 @@ func (m *mockOverrides) MaxBytesPerTraceForTenant(_ string) int {
return m.maxBytesPerTrace
}

func (m *mockOverrides) MaxCompactionRangeForTenant(_ string) time.Duration {
return m.maxCompactionWindow
}

func TestCompactionRoundtrip(t *testing.T) {
for _, enc := range encoding.AllEncodings() {
version := enc.Version()
Expand Down
1 change: 1 addition & 0 deletions tempodb/tempodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ type CompactorSharder interface {
type CompactorOverrides interface {
BlockRetentionForTenant(tenantID string) time.Duration
MaxBytesPerTraceForTenant(tenantID string) int
MaxCompactionRangeForTenant(tenantID string) time.Duration
}

type WriteableBlock interface {
Expand Down