Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
## main / unreleased

* [BUGFIX] Fix tempo-vulture ignoring `-tempo-push-tls` flag in normal operating mode. [#6974](https://github.com/grafana/tempo/pull/6974) (@xaque208)
* [BUGFIX] backend-scheduler: fix redaction batch not cleaned up after dead-job timeout, leaving tenant permanently blocked from new redaction submissions and compaction. [#6992](https://github.com/grafana/tempo/pull/6992) (@zalegrala)
* [BUGFIX] backend-scheduler: fix outstanding-blocks metric suppressed to zero during active redaction batch, causing autoscaler to scale down workers mid-redaction. [#6992](https://github.com/grafana/tempo/pull/6992) (@zalegrala)
* [BUGFIX] backend-scheduler: fix O(N) lock contention in GetJobForWorker under concurrent worker load; replace shard scan with O(1) index lookup. [#6992](https://github.com/grafana/tempo/pull/6992) (@zalegrala)
* [BUGFIX] Fix tempo-vulture ignoring `-tempo-push-tls` flag in normal operating mode. [#6974](https://github.com/grafana/tempo/pull/6974) (@xause208)
* [CHANGE] **BREAKING CHANGE** Remove duplicate "compaction" prefix from CompactorConfig CLI flags. Affected flags: `compaction.block-retention`, `compaction.max-objects-per-block`, `compaction.max-block-bytes`, `compaction.compaction-window`. [#6909](https://github.com/grafana/tempo/pull/6909) (@electron0zero)
* [ENHANCEMENT] Support OR conditions for tag name and tag value autocomplete (search tags v2) [#6827](https://github.com/grafana/tempo/pull/6827) (@ie-pham)
* [ENHANCEMENT] Expose MinIO retry settings via S3 config [#6561](https://github.com/grafana/tempo/pull/6561) (@rwhitty)
Expand Down
11 changes: 11 additions & 0 deletions modules/backendscheduler/backendscheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ func (s *BackendScheduler) running(ctx context.Context) error {
case <-maintenanceTicker.C:
s.work.Prune(ctx)
s.checkPendingRescans(ctx)
s.cleanupOrphanedBatches(ctx)
case <-backendFlushTicker.C:
err = s.flushWorkCacheToBackend(ctx)
metricWorkFlushes.Inc()
Expand Down Expand Up @@ -551,6 +552,16 @@ func (s *BackendScheduler) SubmitRedaction(ctx context.Context, req *tempopb.Sub
}, nil
}

// cleanupOrphanedBatches sweeps all active batches and removes any whose redaction
// jobs have all finished. Called after each Prune tick because Prune transitions
// timed-out running jobs to FAILED by calling j.Fail() directly, bypassing the
// UpdateJob path that normally invokes cleanupBatchIfDone.
func (s *BackendScheduler) cleanupOrphanedBatches(ctx context.Context) {
for _, batch := range s.work.ListBatches() {
s.cleanupBatchIfDone(ctx, batch.TenantId)
}
}

// cleanupBatchIfDone removes the batch manifest for a tenant once all of its redaction
// jobs have completed or failed (no pending, no in-flight, no running) and no rescan
// is pending. "In-flight" means a job has been popped from the pending queue and is
Expand Down
41 changes: 40 additions & 1 deletion modules/backendscheduler/provider/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,11 +394,50 @@ func (p *CompactionProvider) measureTenants() {

var blockSelector blockselector.CompactionBlockSelector
for _, tenant := range p.store.Tenants() {
blockSelector, _ = p.newBlockSelector(tenant)
// Use the measurement selector, which ignores TenantPending, so that the
// outstanding-blocks metric reflects real work even during an active
// redaction batch. Autoscaling must not see zero blocks just because
// compaction is gated for the tenant.
blockSelector, _ = p.newBlockSelectorForMeasurement(tenant)
tempodb.MeasureOutstandingBlocks(tenant, blockSelector, owns)
}
}

// newBlockSelectorForMeasurement builds a block selector from the full tenant
// blocklist without the TenantPending guard that newBlockSelector applies.
// Used exclusively by measureTenants so that the outstanding-blocks metric
// continues to reflect real work even while a redaction batch is active.
// The selector produced here is never used to create compaction jobs.
func (p *CompactionProvider) newBlockSelectorForMeasurement(tenantID string) (blockselector.CompactionBlockSelector, int) {
var (
fullBlocklist = p.store.BlockMetas(tenantID)
window = p.overrides.MaxCompactionRange(tenantID)
blocklist = make([]*backend.BlockMeta, 0, len(fullBlocklist))
)

busyBlocks := p.sched.BusyBlocksForTenant(tenantID)
for _, block := range fullBlocklist {
if _, ok := busyBlocks[block.BlockID.String()]; ok {
continue
}
blocklist = append(blocklist, block)
}

if window == 0 {
window = p.cfg.Compactor.MaxCompactionRange
}

return blockselector.NewTimeWindowBlockSelector(
blocklist,
window,
p.cfg.Compactor.MaxCompactionObjects,
p.cfg.Compactor.MaxBlockBytes,
p.cfg.MinInputBlocks,
p.cfg.MaxInputBlocks,
p.cfg.MaxCompactionLevel,
), len(blocklist)
}

func (p *CompactionProvider) newBlockSelector(tenantID string) (blockselector.CompactionBlockSelector, int) {
// Do not start new compaction jobs for a tenant with an active redaction batch.
// Compacting blocks during a live batch would produce output blocks not yet
Expand Down
42 changes: 42 additions & 0 deletions modules/backendscheduler/provider/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,48 @@ func TestCompactionProvider_SkipsAllCompactionDuringRedaction(t *testing.T) {
require.Empty(t, collectAllMetas(selector), "no blocks should be offered for compaction during redaction")
}

// TestCompactionProvider_MeasureTenantsIgnoresTenantPending verifies that
// newBlockSelectorForMeasurement returns the real block count even when
// TenantPending is true. This ensures the outstanding-blocks metric (and
// therefore autoscaling) is not disrupted by an active redaction batch.
func TestCompactionProvider_MeasureTenantsIgnoresTenantPending(t *testing.T) {
const testTenant = "test-tenant"
cfg := CompactionConfig{}
cfg.RegisterFlagsAndApplyDefaults("", &flag.FlagSet{})

tmpDir := t.TempDir()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

store, _, ww := newStore(ctx, t, tmpDir)
defer store.Shutdown()

writeTenantBlocks(ctx, t, backend.NewWriter(ww), testTenant, 5)
time.Sleep(150 * time.Millisecond)

w := work.New(work.Config{})
// Activate a redaction batch so TenantPending returns true.
require.NoError(t, w.AddBatch(&tempopb.RedactionBatch{
BatchId: "batch-1",
TenantId: testTenant,
}))
require.True(t, w.TenantPending(testTenant))

limits, err := overrides.NewOverrides(overrides.Config{Defaults: overrides.Overrides{}}, nil, prometheus.DefaultRegisterer)
require.NoError(t, err)

p := NewCompactionProvider(cfg, test.NewTestingLogger(t), store, limits, w)

// newBlockSelector must return 0 blocks (compaction gated during redaction).
_, blocklistLen := p.newBlockSelector(testTenant)
require.Equal(t, 0, blocklistLen, "newBlockSelector should return 0 blocks while TenantPending")

// newBlockSelectorForMeasurement must return the actual block count so that
// the outstanding-blocks metric is not suppressed during redaction.
_, measureLen := p.newBlockSelectorForMeasurement(testTenant)
require.Greater(t, measureLen, 0, "newBlockSelectorForMeasurement should return blocks even while TenantPending")
}

func TestCompactionProvider_InFlightJobsPreventDuplicates(t *testing.T) {
const tenant = "test-tenant"
cfg := CompactionConfig{}
Expand Down
93 changes: 64 additions & 29 deletions modules/backendscheduler/work/work.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,13 @@ type Work struct {
// Guarded by pendingMtx.
registeredJobs map[string]*Job

// workerJobs indexes workerID -> jobID for every job currently assigned to a
// worker that has not yet completed or failed. Populated in AddJob (the caller
// must call SetWorkerID before AddJob), cleared by CompleteJob, FailJob, and the
// dead-job timeout path in Prune. Not persisted; rebuilt on LoadFromLocal and
// Unmarshal from the active job map. Guarded by pendingMtx.
workerJobs map[string]string

// runningBlocks indexes (tenantID, blockID) -> *Job for every block referenced
// by a registered or active job. Populated by RegisterJob; entries persist until
// CompleteJob or FailJob. Guarded by pendingMtx. Not persisted; rebuilt by
Expand All @@ -87,6 +94,7 @@ func New(cfg Config) Interface {
sw.pendingByTenant = make(map[string]map[tempopb.JobType][]string)
sw.redactionInFlight = make(map[string]int)
sw.registeredJobs = make(map[string]*Job)
sw.workerJobs = make(map[string]string)
sw.runningBlocks = make(map[string]*Job)
sw.batches = newBatchStore()

Expand Down Expand Up @@ -122,6 +130,10 @@ func (w *Work) AddJob(j *Job) error {
w.redactionInFlight[j.Tenant()]--
}
}
// Index the worker -> job assignment so GetJobForWorker is O(1).
if wid := j.GetWorkerID(); wid != "" {
w.workerJobs[wid] = j.ID
}
w.pendingMtx.Unlock()

return nil
Expand Down Expand Up @@ -245,12 +257,17 @@ func (w *Work) ListJobs() []*Job {
return allJobs
}

// Prune removes old completed/failed jobs from all shards
// Prune removes old completed/failed jobs from all shards and transitions
// timed-out running jobs to FAILED. Index cleanup (runningBlocks, workerJobs)
// for timed-out jobs is performed after all shards are processed.
func (w *Work) Prune(ctx context.Context) {
_, span := tracer.Start(ctx, "ShardedPrune")
defer span.End()

// Prune each shard independently for better concurrency
// Pre-allocate one slot per shard so goroutines can collect timed-out jobs
// without a shared mutex — each goroutine writes only to its own slice.
timedOut := make([][]*Job, ShardCount)

var wg sync.WaitGroup
for i := range ShardCount {
wg.Add(1)
Expand All @@ -270,48 +287,52 @@ func (w *Work) Prune(ctx context.Context) {
case tempopb.JobStatus_JOB_STATUS_RUNNING:
if time.Since(j.GetStartTime()) > w.cfg.DeadJobTimeout {
j.Fail()
timedOut[shardIndex] = append(timedOut[shardIndex], j)
}
}
}
}(i)
}
wg.Wait()

// Clean up indexes for all timed-out jobs. CompleteJob/FailJob normally handle
// this, but Prune calls j.Fail() directly to avoid re-acquiring the shard lock.
w.pendingMtx.Lock()
for _, shardJobs := range timedOut {
for _, j := range shardJobs {
for _, key := range runningBlockKeys(j) {
delete(w.runningBlocks, key)
}
if wid := j.GetWorkerID(); wid != "" {
delete(w.workerJobs, wid)
}
}
}
w.pendingMtx.Unlock()
}

// GetJobForWorker finds a job for a specific worker across all shards
// GetJobForWorker returns the active job assigned to the given worker, or nil
// if none exists. Uses the O(1) workerJobs index rather than scanning all shards.
func (w *Work) GetJobForWorker(ctx context.Context, workerID string) *Job {
_, span := tracer.Start(ctx, "ShardedGetJobForWorker")
defer span.End()

var jj *Job

// Search across all shards for this worker's jobs
for i := range ShardCount {
shard := w.Shards[i]

jj = func() *Job {
shard.mtx.Lock()
defer shard.mtx.Unlock()

for _, j := range shard.Jobs {
if j.GetWorkerID() != workerID {
continue
}

switch j.GetStatus() {
case tempopb.JobStatus_JOB_STATUS_UNSPECIFIED, tempopb.JobStatus_JOB_STATUS_RUNNING:
return j
}
}

return nil
}()
w.pendingMtx.Lock()
jobID, ok := w.workerJobs[workerID]
w.pendingMtx.Unlock()

if jj != nil {
return jj
}
if !ok {
return nil
}

j := w.GetJob(jobID)
if j == nil {
return nil
}
switch j.GetStatus() {
case tempopb.JobStatus_JOB_STATUS_UNSPECIFIED, tempopb.JobStatus_JOB_STATUS_RUNNING:
return j
}
return nil
}

Expand All @@ -331,6 +352,9 @@ func (w *Work) CompleteJob(id string) {
for _, key := range runningBlockKeys(j) {
delete(w.runningBlocks, key)
}
if wid := j.GetWorkerID(); wid != "" {
delete(w.workerJobs, wid)
}
w.pendingMtx.Unlock()
}
}
Expand All @@ -351,6 +375,9 @@ func (w *Work) FailJob(id string) {
for _, key := range runningBlockKeys(j) {
delete(w.runningBlocks, key)
}
if wid := j.GetWorkerID(); wid != "" {
delete(w.workerJobs, wid)
}
w.pendingMtx.Unlock()
}
}
Expand Down Expand Up @@ -455,6 +482,7 @@ func (w *Work) Unmarshal(data []byte) error {
}
}

w.workerJobs = make(map[string]string)
w.runningBlocks = make(map[string]*Job)
for i := range ShardCount {
for _, j := range w.Shards[i].Jobs {
Expand All @@ -464,6 +492,9 @@ func (w *Work) Unmarshal(data []byte) error {
for _, key := range runningBlockKeys(j) {
w.runningBlocks[key] = j
}
if wid := j.GetWorkerID(); wid != "" {
w.workerJobs[wid] = j.ID
}
}
}
}
Expand Down Expand Up @@ -622,6 +653,7 @@ func (w *Work) rebuildPendingIndexes() {
shard.mtx.Unlock()
}

w.workerJobs = make(map[string]string)
w.runningBlocks = make(map[string]*Job)
for i := range ShardCount {
shard := w.Shards[i]
Expand All @@ -633,6 +665,9 @@ func (w *Work) rebuildPendingIndexes() {
for _, key := range runningBlockKeys(j) {
w.runningBlocks[key] = j
}
if wid := j.GetWorkerID(); wid != "" {
w.workerJobs[wid] = j.ID
}
}
}
shard.mtx.Unlock()
Expand Down
Loading
Loading