Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
* [BUGFIX] livestore: check readiness before lag for SearchRecent and QueryRange queries [#6911](https://github.com/grafana/tempo/pull/6911) (@zhxiaogg)
* [BUGFIX] Fix integer overflow in query parameters by using `strconv.ParseUint` instead of `strconv.Atoi`/`strconv.ParseInt` for unsigned integer fields. [#6612](https://github.com/grafana/tempo/pull/6612) (@bejaratommy)
* [BUGFIX] Fix live-store SearchTagValuesV2 disk cache never being populated on complete blocks [#6858](https://github.com/grafana/tempo/pull/6858) (@mapno)
* [ENHANCEMENT] Livestore: skipped WAL complete op during shutdown [#6839](https://github.com/grafana/tempo/pull/6839) (@zhxiaogg)
* [BUGFIX] Fix dedicated columns fallback in `block_builder` and `live_store` to use `storage.trace.block.parquet_dedicated_columns` when not set via overrides. [#6647](https://github.com/grafana/tempo/pull/6647) (@stoewer)
Comment thread
zhxiaogg marked this conversation as resolved.
* [BUGFIX] Force live-store to rehydrate from Kafka lookback period when local data is missing (e.g. PVC wipe, new node) instead of resuming from the committed consumer group offset [#6428](https://github.com/grafana/tempo/pull/6428) (@oleg-kozlyuk-grafana)
* [BUGFIX] fix: reload span_name_sanitization overrides during runtime [#6435](https://github.com/grafana/tempo/pull/6435) (@electron0zero)
Expand Down
7 changes: 6 additions & 1 deletion modules/livestore/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ const (
reasonWaitingForLiveTraces = "waiting_for_live_traces"
reasonWaitingForWAL = "waiting_for_wal"
maxTraceLogLinesPerSecond = 10
// walBackpressureLimit is the maximum number of outstanding WAL blocks before
// backpressure is applied. In the ideal case, shutdown can leave up to 2
// uncompleted WAL blocks on disk, and after restart ingestion may outpace WAL
// completion, so we use 4 to avoid unnecessary backpressure during catch-up.
walBackpressureLimit = 4
)

var (
Expand Down Expand Up @@ -200,7 +205,7 @@ func (i *instance) backpressure(ctx context.Context) bool {
count := len(i.walBlocks)
i.blocksMtx.RUnlock()

if count > 1 {
if count > walBackpressureLimit {
// There are multiple outstanding WAL blocks that need completion
// so wait a bit.
select {
Expand Down
28 changes: 28 additions & 0 deletions modules/livestore/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,3 +248,31 @@ func TestInstanceBackpressure(t *testing.T) {

require.NoError(t, services.StopAndAwaitTerminated(t.Context(), ls))
}

func TestInstanceWALBackpressure(t *testing.T) {
inst, ls := defaultInstance(t)
// Disable live traces backpressure so we only test WAL backpressure.
inst.Cfg.MaxLiveTracesBytes = 0

// Build up WAL blocks: push a trace, flush to head, cut to WAL.
createWALBlock := func() {
id := test.ValidTraceID(nil)
pushTrace(t.Context(), t, inst, test.MakeTrace(1, id), id)
require.NoError(t, inst.cutIdleTraces(t.Context(), true))
walID, err := inst.cutBlocks(t.Context(), true)
require.NoError(t, err)
require.NotEqual(t, walID, [16]byte{})
}
Comment thread
zhxiaogg marked this conversation as resolved.

// At the limit, no backpressure.
for range walBackpressureLimit {
createWALBlock()
}
require.False(t, inst.backpressure(t.Context()), "expected no backpressure at %d WAL blocks", walBackpressureLimit)

// One more WAL block should trigger backpressure.
createWALBlock()
require.True(t, inst.backpressure(t.Context()), "expected backpressure at %d WAL blocks", walBackpressureLimit+1)

require.NoError(t, services.StopAndAwaitTerminated(t.Context(), ls))
}
11 changes: 10 additions & 1 deletion modules/livestore/live_store_background.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,13 @@ func (s *LiveStore) processCompleteOp(op *completeOp) error {
return err
}

// If the context is cancelled (shutdown), abandon the completion. The WAL block remains on
// disk and will be re-enqueued by reloadBlocks() on next startup.
if ctx.Err() != nil {
level.Info(s.logger).Log("msg", "abandoning WAL block completion on shutdown, will replay on restart", "tenant", op.tenantID, "block", op.blockID)
Comment thread
zhxiaogg marked this conversation as resolved.
return nil
}

err = inst.completeBlock(ctx, op.blockID)
metricCompletionDuration.Observe(time.Since(start).Seconds())

Expand Down Expand Up @@ -255,7 +262,7 @@ func (s *LiveStore) reloadBlocks() error {
level.Info(s.logger).Log("msg", "reloaded wal block", "block", meta.BlockID.String())
inst.walBlocks[(uuid.UUID)(meta.BlockID)] = blk

level.Info(s.logger).Log("msg", "queueing replayed wal block for completion", "block", meta.BlockID.String())
level.Info(s.logger).Log("msg", "queueing replayed wal block for completion", "block", meta.BlockID.String(), "size", blk.DataLength())
if err := s.enqueueCompleteOp(meta.TenantID, uuid.UUID(meta.BlockID), true); err != nil {
return fmt.Errorf("failed to enqueue wal block for completion for tenant %s: %w", meta.TenantID, err)
}
Expand All @@ -269,6 +276,8 @@ func (s *LiveStore) reloadBlocks() error {
}
}

level.Info(s.logger).Log("msg", "wal blocks to complete at startup", "count", len(walBlocks))

// ------------------------------------
// Complete blocks
// ------------------------------------
Expand Down
59 changes: 59 additions & 0 deletions modules/livestore/live_store_background_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package livestore

import (
"context"
"testing"
"time"

"github.com/google/uuid"
"github.com/grafana/dskit/services"
"github.com/stretchr/testify/require"
)

// TestProcessCompleteOpAbandonOnCancelledContext verifies that processCompleteOp
// skips WAL block completion when the service context is already cancelled
// (i.e. during shutdown), rather than attempting the work and scheduling a retry.
// The WAL block must remain on disk so that reloadBlocks() can re-enqueue it on
// the next startup.
func TestProcessCompleteOpAbandonOnCancelledContext(t *testing.T) {
tmpDir := t.TempDir()

liveStore, err := defaultLiveStore(t, tmpDir)
require.NoError(t, err)
t.Cleanup(func() { _ = services.StopAndAwaitTerminated(context.Background(), liveStore) })

inst, err := liveStore.getOrCreateInstance(testTenantID)
require.NoError(t, err)

Comment thread
zhxiaogg marked this conversation as resolved.
// Push a trace, flush live traces to head block, then cut to WAL.
pushTracesToInstance(t, inst, 1)
err = inst.cutIdleTraces(t.Context(), true)
require.NoError(t, err)
walID, err := inst.cutBlocks(t.Context(), true)
require.NoError(t, err)
require.NotEqual(t, uuid.Nil, walID)
requireInstanceState(t, inst, instanceState{liveTraces: 0, walBlocks: 1, completeBlocks: 0})

// Simulate shutdown by cancelling the service context.
liveStore.cancel()

op := &completeOp{
tenantID: testTenantID,
blockID: walID,
at: time.Now(),
bo: liveStore.cfg.initialBackoff,
Comment thread
zhxiaogg marked this conversation as resolved.
maxBackoff: liveStore.cfg.maxBackoff,
attempts: 1,
}

// processCompleteOp must return nil (not exit the worker loop) and must NOT
// schedule a retry — the WAL block stays on disk for reloadBlocks() on restart.
err = liveStore.processCompleteOp(op)
require.NoError(t, err)

// WAL block is still present: abandoned, not completed or removed.
requireInstanceState(t, inst, instanceState{liveTraces: 0, walBlocks: 1, completeBlocks: 0})

// No retry was enqueued.
require.True(t, liveStore.completeQueues.IsEmpty())
Comment thread
zhxiaogg marked this conversation as resolved.
}
Comment thread
zhxiaogg marked this conversation as resolved.
3 changes: 3 additions & 0 deletions tempodb/encoding/vparquet4/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ func CreateBlock(ctx context.Context, cfg *common.BlockConfig, meta *backend.Blo
}

for {
if err := ctx.Err(); err != nil {
Comment thread
mdisibio marked this conversation as resolved.
Outdated
return nil, err
}
err := next(ctx)
if errors.Is(err, io.EOF) {
break
Expand Down
29 changes: 29 additions & 0 deletions tempodb/encoding/vparquet4/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,35 @@ func TestCreateBlockFilterDedicatedColumns(t *testing.T) {
require.Equal(t, original, meta.DedicatedColumns) // the original meta is not changed
}

func TestCreateBlockCancelledContext(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel() // cancel immediately

rawR, rawW, _, err := local.New(&local.Config{
Path: t.TempDir(),
})
require.NoError(t, err)

r := backend.NewReader(rawR)
w := backend.NewWriter(rawW)

iter := newTestIterator()
for i := 0; i < 100; i++ {
iter.Add(test.MakeTrace(10, nil), 0, 0)
}

cfg := &common.BlockConfig{
BloomFP: 0.01,
BloomShardSizeBytes: 100 * 1024,
}

meta := backend.NewBlockMeta("fake", uuid.New(), VersionString)
meta.TotalObjects = 1

_, err = CreateBlock(ctx, cfg, meta, iter, r, w)
require.ErrorIs(t, err, context.Canceled)
}

// func TestEstimateTraceSize(t *testing.T) {
// f := "<put data.parquet file here>"
// file, err := os.OpenFile(f, os.O_RDONLY, 0644)
Expand Down
3 changes: 3 additions & 0 deletions tempodb/encoding/vparquet5/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ func CreateBlock(ctx context.Context, cfg *common.BlockConfig, meta *backend.Blo
}

for {
if err := ctx.Err(); err != nil {
return nil, err
Comment thread
zhxiaogg marked this conversation as resolved.
Outdated
}
err := next(ctx)
if errors.Is(err, io.EOF) {
break
Expand Down
29 changes: 29 additions & 0 deletions tempodb/encoding/vparquet5/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,35 @@ func TestCreateBlockHonorsTraceStartEndTimesFromWalMeta(t *testing.T) {
require.Equal(t, 305, int(outMeta.EndTime.Unix()))
}

func TestCreateBlockCancelledContext(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel() // cancel immediately

rawR, rawW, _, err := local.New(&local.Config{
Path: t.TempDir(),
})
require.NoError(t, err)

r := backend.NewReader(rawR)
w := backend.NewWriter(rawW)

iter := newTestIterator()
for i := 0; i < 100; i++ {
iter.Add(test.MakeTrace(10, nil), 0, 0)
}

cfg := &common.BlockConfig{
BloomFP: 0.01,
BloomShardSizeBytes: 100 * 1024,
}

meta := backend.NewBlockMeta("fake", uuid.New(), VersionString)
meta.TotalObjects = 1

_, err = CreateBlock(ctx, cfg, meta, iter, r, w)
require.ErrorIs(t, err, context.Canceled)
}

type testIterator struct {
traces []*tempopb.Trace
}
Expand Down
Loading