Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/sources/tempo/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1323,6 +1323,9 @@ overrides:
# Per-user block retention. If this value is set to 0 (default),
# then block_retention in the compactor configuration is used.
[block_retention: <duration> | default = 0s]
# Per-user compaction window. If this value is set to 0 (default),
# then block_retention in the compactor configuration is used.
[compaction_window: <duration> | default = 0s]

# Metrics-generator related overrides
metrics_generator:
Expand Down
4 changes: 4 additions & 0 deletions modules/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,10 @@ func (c *Compactor) MaxBytesPerTraceForTenant(tenantID string) int {
return c.overrides.MaxBytesPerTrace(tenantID)
}

func (c *Compactor) MaxCompactionRangeForTenant(tenantID string) time.Duration {
return c.overrides.MaxCompactionRange(tenantID)
}

func (c *Compactor) isSharded() bool {
return c.cfg.ShardingRing.KVStore.Store != ""
}
Expand Down
8 changes: 6 additions & 2 deletions modules/overrides/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ import (

"github.com/grafana/tempo/tempodb/backend"

"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/tempo/pkg/sharedconfig"
filterconfig "github.com/grafana/tempo/pkg/spanfilter/config"
"github.com/prometheus/client_golang/prometheus"

"github.com/prometheus/common/model"
)
Expand Down Expand Up @@ -44,6 +45,7 @@ const (
MetricIngestionRateLimitBytes = "ingestion_rate_limit_bytes"
MetricIngestionBurstSizeBytes = "ingestion_burst_size_bytes"
MetricBlockRetention = "block_retention"
MetricCompactionWindow = "compaction_window"
MetricMetricsGeneratorMaxActiveSeries = "metrics_generator_max_active_series"
MetricsGeneratorDryRunEnabled = "metrics_generator_dry_run_enabled"
)
Expand Down Expand Up @@ -129,7 +131,8 @@ type ReadOverrides struct {

type CompactionOverrides struct {
// Compactor enforced overrides.
BlockRetention model.Duration `yaml:"block_retention,omitempty" json:"block_retention,omitempty"`
BlockRetention model.Duration `yaml:"block_retention,omitempty" json:"block_retention,omitempty"`
CompactionWindow model.Duration `yaml:"compaction_window,omitempty" json:"compaction_window,omitempty"`
}

type GlobalOverrides struct {
Expand Down Expand Up @@ -247,5 +250,6 @@ func (c *Config) Collect(ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(metricLimitsDesc, prometheus.GaugeValue, float64(c.Defaults.Read.MaxBlocksPerTagValuesQuery), MetricMaxBlocksPerTagValuesQuery)
ch <- prometheus.MustNewConstMetric(metricLimitsDesc, prometheus.GaugeValue, float64(c.Defaults.Global.MaxBytesPerTrace), MetricMaxBytesPerTrace)
ch <- prometheus.MustNewConstMetric(metricLimitsDesc, prometheus.GaugeValue, float64(c.Defaults.Compaction.BlockRetention), MetricBlockRetention)
ch <- prometheus.MustNewConstMetric(metricLimitsDesc, prometheus.GaugeValue, float64(c.Defaults.Compaction.CompactionWindow), MetricCompactionWindow)
Comment thread
zalegrala marked this conversation as resolved.
Outdated
ch <- prometheus.MustNewConstMetric(metricLimitsDesc, prometheus.GaugeValue, float64(c.Defaults.MetricsGenerator.MaxActiveSeries), MetricMetricsGeneratorMaxActiveSeries)
}
9 changes: 6 additions & 3 deletions modules/overrides/config_legacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ func (c *Overrides) toLegacy() LegacyOverrides {
MetricsGeneratorProcessorLocalBlocksCompleteBlockTimeout: c.MetricsGenerator.Processor.LocalBlocks.CompleteBlockTimeout,
MetricsGeneratorIngestionSlack: c.MetricsGenerator.IngestionSlack,

BlockRetention: c.Compaction.BlockRetention,
BlockRetention: c.Compaction.BlockRetention,
CompactionWindow: c.Compaction.CompactionWindow,

MaxBytesPerTagValuesQuery: c.Read.MaxBytesPerTagValuesQuery,
MaxBlocksPerTagValuesQuery: c.Read.MaxBlocksPerTagValuesQuery,
Expand Down Expand Up @@ -103,7 +104,8 @@ type LegacyOverrides struct {
MetricsGeneratorIngestionSlack time.Duration `yaml:"metrics_generator_ingestion_time_range_slack" json:"metrics_generator_ingestion_time_range_slack"`

// Compactor enforced limits.
BlockRetention model.Duration `yaml:"block_retention" json:"block_retention"`
BlockRetention model.Duration `yaml:"block_retention" json:"block_retention"`
CompactionWindow model.Duration `yaml:"compaction_window" json:"compaction_window"`

// Querier and Ingester enforced limits.
MaxBytesPerTagValuesQuery int `yaml:"max_bytes_per_tag_values_query" json:"max_bytes_per_tag_values_query"`
Expand Down Expand Up @@ -135,7 +137,8 @@ func (l *LegacyOverrides) toNewLimits() Overrides {
MaxSearchDuration: l.MaxSearchDuration,
},
Compaction: CompactionOverrides{
BlockRetention: l.BlockRetention,
BlockRetention: l.BlockRetention,
CompactionWindow: l.CompactionWindow,
},
MetricsGenerator: MetricsGeneratorOverrides{
RingSize: l.MetricsGeneratorRingSize,
Expand Down
1 change: 1 addition & 0 deletions modules/overrides/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type Interface interface {
MaxLocalTracesPerUser(userID string) int
MaxGlobalTracesPerUser(userID string) int
MaxBytesPerTrace(userID string) int
MaxCompactionRange(userID string) time.Duration
Forwarders(userID string) []string
MaxBytesPerTagValuesQuery(userID string) int
MaxBlocksPerTagValuesQuery(userID string) int
Expand Down
5 changes: 5 additions & 0 deletions modules/overrides/runtime_config_overrides.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,11 @@ func (o *runtimeConfigOverridesManager) MaxGlobalTracesPerUser(userID string) in
return o.getOverridesForUser(userID).Ingestion.MaxGlobalTracesPerUser
}

// MaxCompactionRange returns the maximum compaction window for this tenant.
func (o *runtimeConfigOverridesManager) MaxCompactionRange(userID string) time.Duration {
return time.Duration(o.getOverridesForUser(userID).Compaction.CompactionWindow)
}

// IngestionRateLimitBytes is the number of spans per second allowed for this tenant.
func (o *runtimeConfigOverridesManager) IngestionRateLimitBytes(userID string) float64 {
return float64(o.getOverridesForUser(userID).Ingestion.RateLimitBytes)
Expand Down
7 changes: 6 additions & 1 deletion tempodb/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ func (rw *readerWriter) doCompaction(ctx context.Context) {
// Get the meta file of all non-compacted blocks for the given tenant
blocklist := rw.blocklist.Metas(tenantID)

window := rw.compactorOverrides.MaxCompactionRangeForTenant(tenantID)
if window == 0 {
window = rw.compactorCfg.MaxCompactionRange
}

// Select which blocks to compact.
//
// Blocks are firstly divided by the active compaction window (default: most recent 24h)
Expand All @@ -115,7 +120,7 @@ func (rw *readerWriter) doCompaction(ctx context.Context) {
// 2. If blocks are outside the active window, they're grouped only by windows, ignoring compaction level.
// It picks more recent windows first, and compacting blocks only from the same tenant.
blockSelector := newTimeWindowBlockSelector(blocklist,
rw.compactorCfg.MaxCompactionRange,
window,
rw.compactorCfg.MaxCompactionObjects,
rw.compactorCfg.MaxBlockBytes,
defaultMinInputBlocks,
Expand Down
9 changes: 7 additions & 2 deletions tempodb/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ type mockJobSharder struct{}
func (m *mockJobSharder) Owns(string) bool { return true }

type mockOverrides struct {
blockRetention time.Duration
maxBytesPerTrace int
blockRetention time.Duration
maxBytesPerTrace int
maxCompactionWindow time.Duration
}

func (m *mockOverrides) BlockRetentionForTenant(_ string) time.Duration {
Expand All @@ -58,6 +59,10 @@ func (m *mockOverrides) MaxBytesPerTraceForTenant(_ string) int {
return m.maxBytesPerTrace
}

func (m *mockOverrides) MaxCompactionRangeForTenant(_ string) time.Duration {
return m.maxCompactionWindow
}

func TestCompactionRoundtrip(t *testing.T) {
for _, enc := range encoding.AllEncodings() {
version := enc.Version()
Expand Down
1 change: 1 addition & 0 deletions tempodb/tempodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ type CompactorSharder interface {
type CompactorOverrides interface {
BlockRetentionForTenant(tenantID string) time.Duration
MaxBytesPerTraceForTenant(tenantID string) int
MaxCompactionRangeForTenant(tenantID string) time.Duration
}

type WriteableBlock interface {
Expand Down