Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
* [ENHANCEMENT] Remove explicit `runtime.GC()` calls in vParquet5 compactor/block creation and CLI [#6603](https://github.com/grafana/tempo/pull/6603) (@oleg-kozlyuk-grafana)
* [ENHANCEMENT] Reduce allocations in `extendReuseSlice` growth path during WAL writes and block creation [#6863](https://github.com/grafana/tempo/pull/6863) (@mapno)
* [ENHANCEMENT] Implemented anti-affinity for pods in same livestore zone [#6757](https://github.com/grafana/tempo/pull/6757) (@zhxiaogg)
* [ENHANCEMENT] Livestore: skipped WAL complete op during shutdown [#6839](https://github.com/grafana/tempo/pull/6839) (@zhxiaogg)
* [BUGFIX] livestore: check readiness before lag for SearchRecent and QueryRange queries [#6911](https://github.com/grafana/tempo/pull/6911) (@zhxiaogg)
* [BUGFIX] Fix integer overflow in query parameters by using `strconv.ParseUint` instead of `strconv.Atoi`/`strconv.ParseInt` for unsigned integer fields. [#6612](https://github.com/grafana/tempo/pull/6612) (@bejaratommy)
* [BUGFIX] Fix live-store SearchTagValuesV2 disk cache never being populated on complete blocks [#6858](https://github.com/grafana/tempo/pull/6858) (@mapno)
Expand Down
7 changes: 6 additions & 1 deletion modules/livestore/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ const (
reasonWaitingForLiveTraces = "waiting_for_live_traces"
reasonWaitingForWAL = "waiting_for_wal"
maxTraceLogLinesPerSecond = 10
// walBackpressureLimit is the maximum number of outstanding WAL blocks before
// backpressure is applied. In the ideal case, shutdown can leave up to 2
// uncompleted WAL blocks on disk, and after restart ingestion may outpace WAL
// completion, so we use 4 to avoid unnecessary backpressure during catch-up.
walBackpressureLimit = 4
)

var (
Expand Down Expand Up @@ -200,7 +205,7 @@ func (i *instance) backpressure(ctx context.Context) bool {
count := len(i.walBlocks)
i.blocksMtx.RUnlock()

if count > 1 {
if count > walBackpressureLimit {
// There are multiple outstanding WAL blocks that need completion
// so wait a bit.
select {
Expand Down
28 changes: 28 additions & 0 deletions modules/livestore/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,3 +248,31 @@ func TestInstanceBackpressure(t *testing.T) {

require.NoError(t, services.StopAndAwaitTerminated(t.Context(), ls))
}

func TestInstanceWALBackpressure(t *testing.T) {
inst, ls := defaultInstance(t)
// Disable live traces backpressure so we only test WAL backpressure.
inst.Cfg.MaxLiveTracesBytes = 0

// Build up WAL blocks: push a trace, flush to head, cut to WAL.
createWALBlock := func() {
id := test.ValidTraceID(nil)
pushTrace(t.Context(), t, inst, test.MakeTrace(1, id), id)
require.NoError(t, inst.cutIdleTraces(t.Context(), true))
walID, err := inst.cutBlocks(t.Context(), true)
require.NoError(t, err)
require.NotEqual(t, walID, [16]byte{})
}
Comment thread
zhxiaogg marked this conversation as resolved.

// At the limit, no backpressure.
for range walBackpressureLimit {
createWALBlock()
}
require.False(t, inst.backpressure(t.Context()), "expected no backpressure at %d WAL blocks", walBackpressureLimit)

// One more WAL block should trigger backpressure.
createWALBlock()
require.True(t, inst.backpressure(t.Context()), "expected backpressure at %d WAL blocks", walBackpressureLimit+1)

require.NoError(t, services.StopAndAwaitTerminated(t.Context(), ls))
}
12 changes: 11 additions & 1 deletion modules/livestore/live_store_background.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,14 @@ func (s *LiveStore) processCompleteOp(op *completeOp) error {
return err
}

// If the context is cancelled (shutdown), abandon the completion. The WAL block remains on
// disk and will be re-enqueued by reloadBlocks() on next startup.
if ctx.Err() != nil {
level.Info(s.logger).Log("msg", "abandoning WAL block completion on shutdown, will replay on restart", "tenant", op.tenantID, "block", op.blockID)
Comment thread
zhxiaogg marked this conversation as resolved.
s.completeQueues.Clear(op)
return nil
}

err = inst.completeBlock(ctx, op.blockID)
metricCompletionDuration.Observe(time.Since(start).Seconds())

Expand Down Expand Up @@ -255,7 +263,7 @@ func (s *LiveStore) reloadBlocks() error {
level.Info(s.logger).Log("msg", "reloaded wal block", "block", meta.BlockID.String())
inst.walBlocks[(uuid.UUID)(meta.BlockID)] = blk

level.Info(s.logger).Log("msg", "queueing replayed wal block for completion", "block", meta.BlockID.String())
level.Info(s.logger).Log("msg", "queueing replayed wal block for completion", "block", meta.BlockID.String(), "size", blk.DataLength())
if err := s.enqueueCompleteOp(meta.TenantID, uuid.UUID(meta.BlockID), true); err != nil {
return fmt.Errorf("failed to enqueue wal block for completion for tenant %s: %w", meta.TenantID, err)
}
Expand All @@ -269,6 +277,8 @@ func (s *LiveStore) reloadBlocks() error {
}
}

level.Info(s.logger).Log("msg", "wal blocks to complete at startup", "count", len(walBlocks))

// ------------------------------------
// Complete blocks
// ------------------------------------
Expand Down
59 changes: 59 additions & 0 deletions modules/livestore/live_store_background_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package livestore

import (
"context"
"testing"
"time"

"github.com/google/uuid"
"github.com/grafana/dskit/services"
"github.com/stretchr/testify/require"
)

// TestProcessCompleteOpAbandonOnCancelledContext verifies that processCompleteOp
// skips WAL block completion when the service context is already cancelled
// (i.e. during shutdown), rather than attempting the work and scheduling a retry.
// The WAL block must remain on disk so that reloadBlocks() can re-enqueue it on
// the next startup.
func TestProcessCompleteOpAbandonOnCancelledContext(t *testing.T) {
tmpDir := t.TempDir()

liveStore, err := defaultLiveStore(t, tmpDir)
require.NoError(t, err)
t.Cleanup(func() { _ = services.StopAndAwaitTerminated(context.Background(), liveStore) })

inst, err := liveStore.getOrCreateInstance(testTenantID)
require.NoError(t, err)

Comment thread
zhxiaogg marked this conversation as resolved.
// Push a trace, flush live traces to head block, then cut to WAL.
pushTracesToInstance(t, inst, 1)
err = inst.cutIdleTraces(t.Context(), true)
require.NoError(t, err)
walID, err := inst.cutBlocks(t.Context(), true)
require.NoError(t, err)
require.NotEqual(t, uuid.Nil, walID)
requireInstanceState(t, inst, instanceState{liveTraces: 0, walBlocks: 1, completeBlocks: 0})

// Simulate shutdown by cancelling the service context.
liveStore.cancel()

op := &completeOp{
tenantID: testTenantID,
blockID: walID,
at: time.Now(),
bo: liveStore.cfg.initialBackoff,
Comment thread
zhxiaogg marked this conversation as resolved.
maxBackoff: liveStore.cfg.maxBackoff,
attempts: 1,
}

// processCompleteOp must return nil (not exit the worker loop) and must NOT
// schedule a retry — the WAL block stays on disk for reloadBlocks() on restart.
err = liveStore.processCompleteOp(op)
require.NoError(t, err)

// WAL block is still present: abandoned, not completed or removed.
requireInstanceState(t, inst, instanceState{liveTraces: 0, walBlocks: 1, completeBlocks: 0})

// No retry was enqueued.
require.True(t, liveStore.completeQueues.IsEmpty())
Comment thread
zhxiaogg marked this conversation as resolved.
}
Comment thread
zhxiaogg marked this conversation as resolved.
Loading