Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
* [BUGFIX] Force live-store to rehydrate from Kafka lookback period when local data is missing (e.g. PVC wipe, new node) instead of resuming from the committed consumer group offset [#6428](https://github.com/grafana/tempo/pull/6428) (@oleg-kozlyuk-grafana)
* [ENHANCEMENT] Add new metric for generator ring size: `tempo_distributor_metrics_generator_tenant_ring_size` [#5686](https://github.com/grafana/tempo/pull/5686) (@zalegrala)
* [BUGFIX] fix: reload span_name_sanitization overrides during runtime [#6435](https://github.com/grafana/tempo/pull/6435) (@electron0zero)
* [BUGFIX] fix: live store honor the config options for block and WAL versions [#6509](https://github.com/grafana/tempo/pull/6509) (@mdisibio)
* [BUGFIX] fix: normalize allowlist headers when building the allowlist map [#6481](https://github.com/grafana/tempo/pull/6481) (@javiermolinar)

### 3.0 Cleanup
Expand Down
2 changes: 2 additions & 0 deletions cmd/tempo/app/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,8 +709,10 @@
// Always use partition 0. This is for small installs or local/debugging setups.
singlePartition := IsSingleBinary(t.cfg.Target)

// Inject config from other locations.

Check notice on line 712 in cmd/tempo/app/modules.go

View workflow job for this annotation

GitHub Actions / Coverage Annotations

Uncovered line

Line 712 is not covered by tests
t.cfg.LiveStore.IngestConfig = t.cfg.Ingest
t.cfg.LiveStore.Ring.ListenPort = t.cfg.Server.GRPCListenPort
t.cfg.LiveStore.GlobalBlockConfig = t.cfg.StorageConfig.Trace.Block

Check notice on line 715 in cmd/tempo/app/modules.go

View workflow job for this annotation

GitHub Actions / Coverage Annotations

Uncovered line

Line 715 is not covered by tests

var err error
t.liveStore, err = livestore.New(t.cfg.LiveStore, t.Overrides, log.Logger, prometheus.DefaultRegisterer, singlePartition)
Expand Down
1 change: 0 additions & 1 deletion docs/sources/tempo/configuration/manifest.md
Original file line number Diff line number Diff line change
Expand Up @@ -1345,7 +1345,6 @@ live_store:
wal:
path: /var/tempo/live-store/traces
ingestion_time_range_slack: 2m0s
version: vParquet4
query_block_concurrency: 10
complete_block_timeout: 1h0m0s
complete_block_concurrency: 2
Expand Down
57 changes: 54 additions & 3 deletions modules/livestore/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,16 @@
// Block configuration
BlockConfig common.BlockConfig `yaml:"block_config"`

// GlobalBlockConfig is the main storage trace block config (storage.trace.block). Used as fallback
// when block_config.version and wal.version are not set. This config is injected by the application when creating the LiveStore.
GlobalBlockConfig *common.BlockConfig `yaml:"-"`

// ReadinessTargetLag is the target consumer lag threshold before the live-store
// is considered ready to serve queries. The live-store will wait until lag drops
// below this value. Set to 0 to disable readiness waiting (default, backward compatible).
ReadinessTargetLag time.Duration `yaml:"readiness_target_lag"`

// ReadinessMaxWait is the maximum time to wait for catching up at startup.
// ReadinessMaxWait is the maximum time to wait for catching up a˛t startup.
// If this timeout is exceeded, the live-store becomes ready anyway.
// Only used if ReadinessTargetLag > 0. Default: 30m.
ReadinessMaxWait time.Duration `yaml:"readiness_max_wait"`
Expand Down Expand Up @@ -127,7 +131,6 @@
f.BoolVar(&cfg.RemoveOwnerOnShutdown, prefix+".remove-owner-on-shutdown", cfg.RemoveOwnerOnShutdown, "Remove partition owner from the ring on shutdown.")

cfg.WAL.RegisterFlags(f) // WAL config has no flags, only defaults
cfg.WAL.Version = encoding.DefaultEncoding().Version()
f.StringVar(&cfg.WAL.Filepath, prefix+".wal.path", "/var/tempo/live-store/traces", "Path at which store WAL blocks.")
f.StringVar(&cfg.ShutdownMarkerDir, prefix+".shutdown_marker_dir", "/var/tempo/live-store/shutdown-marker", "Path to the shutdown marker directory.")
}
Expand Down Expand Up @@ -173,9 +176,57 @@
return fmt.Errorf("max_trace_idle (%s) cannot be greater than max_trace_live (%s)", cfg.MaxTraceIdle, cfg.MaxTraceLive)
}

if _, _, err := coalesceBlockVersions(cfg); err != nil {
return err
}

if err := common.ValidateConfig(&cfg.BlockConfig); err != nil {
return fmt.Errorf("block_config validation failed: %w", err)
}

return cfg.WAL.Validate()
if err := cfg.WAL.Validate(); err != nil {
return err
}

Check notice on line 189 in modules/livestore/config.go

View workflow job for this annotation

GitHub Actions / Coverage Annotations

Uncovered lines

Lines 188-189 are not covered by tests

return nil
}

// coalesceBlockVersions resolves complete block and WAL encodings from configs.
// Starts with the default version and overrides as each layer is checked (global block config, then block_config, then for WAL also wal.version).
// Returns an error if any resolved version isn't writable.
func coalesceBlockVersions(cfg *Config) (completeBlockEncoding, walEncoding encoding.VersionedEncoding, err error) {
var (
blockVer = encoding.DefaultEncoding().Version()
walVer = encoding.DefaultEncoding().Version()
)

if cfg.GlobalBlockConfig != nil && cfg.GlobalBlockConfig.Version != "" {
blockVer = cfg.GlobalBlockConfig.Version
walVer = cfg.GlobalBlockConfig.Version
}

if cfg.BlockConfig.Version != "" {
blockVer = cfg.BlockConfig.Version
walVer = cfg.BlockConfig.Version
Comment thread
mapno marked this conversation as resolved.
}

if cfg.WAL.Version != "" {
walVer = cfg.WAL.Version
}

completeBlockEncoding, err = encoding.FromVersionForWrites(blockVer)
if err != nil {
return nil, nil, fmt.Errorf("complete block version %q: %w", blockVer, err)
}

walEncoding, err = encoding.FromVersionForWrites(walVer)
if err != nil {
return nil, nil, fmt.Errorf("wal version %q: %w", walVer, err)
}

// Inject final coalesced versions back into the configs.
cfg.BlockConfig.Version = blockVer
cfg.WAL.Version = walVer

return completeBlockEncoding, walEncoding, nil
}
101 changes: 101 additions & 0 deletions modules/livestore/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/tempo/tempodb/encoding"
"github.com/grafana/tempo/tempodb/encoding/common"
"github.com/grafana/tempo/tempodb/encoding/vparquet5"
)

func TestConfigValidate(t *testing.T) {
Expand Down Expand Up @@ -128,6 +132,20 @@ func TestConfigValidate(t *testing.T) {
},
expectedErr: "max_trace_idle (20s) cannot be greater than max_trace_live (10s)",
},
{
name: "unsupported wal version fails",
modifyConfig: func(cfg *Config) {
cfg.WAL.Version = "preview"
},
expectedErr: "preview",
},
{
name: "unsupported block version fails",
modifyConfig: func(cfg *Config) {
cfg.BlockConfig.Version = "preview"
},
expectedErr: "preview",
},
}

for _, tt := range tests {
Expand All @@ -148,3 +166,86 @@ func TestConfigValidate(t *testing.T) {
})
}
}

func TestCoalesceBlockVersions(t *testing.T) {
defaultVer := encoding.DefaultEncoding().Version()

tests := []struct {
name string
modifyConfig func(*Config)
expectedCompleteVersion string
expectedWalVersion string
expectedErr string
}{
{
name: "uses specific versions when set",
modifyConfig: func(cfg *Config) {
cfg.BlockConfig.Version = vparquet5.VersionString
cfg.WAL.Version = vparquet5.VersionString
},
expectedCompleteVersion: vparquet5.VersionString,
expectedWalVersion: vparquet5.VersionString,
},
{
name: "fallback to GlobalBlockConfig when empty",
modifyConfig: func(cfg *Config) {
cfg.BlockConfig.Version = ""
cfg.WAL.Version = ""
cfg.GlobalBlockConfig = &common.BlockConfig{Version: vparquet5.VersionString}
},
expectedCompleteVersion: vparquet5.VersionString,
expectedWalVersion: vparquet5.VersionString,
},
{
name: "use default when all version fields empty",
modifyConfig: func(cfg *Config) {
cfg.BlockConfig.Version = ""
cfg.WAL.Version = ""
},
expectedCompleteVersion: defaultVer,
expectedWalVersion: defaultVer,
},
{
name: "wal fallback to block version when empty",
modifyConfig: func(cfg *Config) {
cfg.BlockConfig.Version = vparquet5.VersionString
cfg.WAL.Version = ""
},
expectedCompleteVersion: vparquet5.VersionString,
expectedWalVersion: vparquet5.VersionString,
},
{
name: "unsupported wal version returns error",
modifyConfig: func(cfg *Config) {
cfg.WAL.Version = "preview"
},
expectedErr: "preview",
},
{
name: "unsupported block version returns error",
modifyConfig: func(cfg *Config) {
cfg.BlockConfig.Version = "preview"
},
expectedErr: "preview",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg := Config{}
cfg.RegisterFlagsAndApplyDefaults("", flag.NewFlagSet("", flag.PanicOnError))
tt.modifyConfig(&cfg)

completeEnc, walEnc, err := coalesceBlockVersions(&cfg)

if tt.expectedErr != "" {
require.Error(t, err)
assert.Contains(t, err.Error(), tt.expectedErr)
return
}
require.NoError(t, err)
assert.Equal(t, tt.expectedCompleteVersion, completeEnc.Version())
assert.Equal(t, tt.expectedWalVersion, walEnc.Version())
})
}
}
37 changes: 18 additions & 19 deletions modules/livestore/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ type instance struct {
Cfg Config

// WAL and encoding
wal *wal.WAL
enc encoding.VersionedEncoding
wal *wal.WAL
completeBlockEncoding encoding.VersionedEncoding

// Block management
blocksMtx sync.RWMutex
Expand All @@ -111,24 +111,23 @@ type instance struct {
overrides overrides.Interface
}

func newInstance(instanceID string, cfg Config, wal *wal.WAL, overrides overrides.Interface, logger log.Logger) (*instance, error) {
enc := encoding.DefaultEncoding()
func newInstance(instanceID string, cfg Config, wal *wal.WAL, completeBlockEncoding encoding.VersionedEncoding, overrides overrides.Interface, logger log.Logger) (*instance, error) {
logger = log.With(logger, "tenant", instanceID)

i := &instance{
tenantID: instanceID,
logger: logger,
Cfg: cfg,
wal: wal,
enc: enc,
walBlocks: map[uuid.UUID]common.WALBlock{},
completeBlocks: map[uuid.UUID]*ingester.LocalBlock{},
liveTraces: livetraces.New[*v1.ResourceSpans](func(rs *v1.ResourceSpans) uint64 { return uint64(rs.Size()) }, cfg.MaxTraceIdle, cfg.MaxTraceLive, instanceID),
traceSizes: tracesizes.New(),
maxTraceLogger: util_log.NewRateLimitedLogger(maxTraceLogLinesPerSecond, level.Warn(logger)),
overrides: overrides,
tracesCreatedTotal: metricTracesCreatedTotal.WithLabelValues(instanceID),
bytesReceivedTotal: metricBytesReceivedTotal,
tenantID: instanceID,
logger: logger,
Cfg: cfg,
wal: wal,
completeBlockEncoding: completeBlockEncoding,
walBlocks: map[uuid.UUID]common.WALBlock{},
completeBlocks: map[uuid.UUID]*ingester.LocalBlock{},
liveTraces: livetraces.New[*v1.ResourceSpans](func(rs *v1.ResourceSpans) uint64 { return uint64(rs.Size()) }, cfg.MaxTraceIdle, cfg.MaxTraceLive, instanceID),
traceSizes: tracesizes.New(),
maxTraceLogger: util_log.NewRateLimitedLogger(maxTraceLogLinesPerSecond, level.Warn(logger)),
overrides: overrides,
tracesCreatedTotal: metricTracesCreatedTotal.WithLabelValues(instanceID),
bytesReceivedTotal: metricBytesReceivedTotal,
// blockOffsetMeta: make(map[uuid.UUID]offsetMetadata),
}

Expand Down Expand Up @@ -443,14 +442,14 @@ func (i *instance) completeBlock(ctx context.Context, id uuid.UUID) error {
}
defer iter.Close()

newMeta, err := i.enc.CreateBlock(ctx, &i.Cfg.BlockConfig, walBlock.BlockMeta(), iter, reader, writer)
newMeta, err := i.completeBlockEncoding.CreateBlock(ctx, &i.Cfg.BlockConfig, walBlock.BlockMeta(), iter, reader, writer)
if err != nil {
level.Error(i.logger).Log("msg", "failed to create complete block", "id", id, "err", err)
span.RecordError(err)
return err
}

newBlock, err := i.enc.OpenBlock(newMeta, reader)
newBlock, err := i.completeBlockEncoding.OpenBlock(newMeta, reader)
if err != nil {
level.Error(i.logger).Log("msg", "failed to open complete block", "id", id, "err", err)
span.RecordError(err)
Expand Down
2 changes: 1 addition & 1 deletion modules/livestore/instance_search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1075,7 +1075,7 @@ func TestLiveStoreQueryRange(t *testing.T) {
mover, err := overrides.NewOverrides(overrides.Config{}, nil, prometheus.DefaultRegisterer)
require.NoError(t, err)
// Create instance
inst, err := newInstance(tenant, cfg, w, mover, log.NewNopLogger())
inst, err := newInstance(tenant, cfg, w, encoding.DefaultEncoding(), mover, log.NewNopLogger())
require.NoError(t, err)

// Create test spans
Expand Down
39 changes: 24 additions & 15 deletions modules/livestore/live_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/util/shutdownmarker"
"github.com/grafana/tempo/pkg/validation"
"github.com/grafana/tempo/tempodb/encoding"
"github.com/grafana/tempo/tempodb/wal"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand Down Expand Up @@ -131,10 +132,11 @@
reader *PartitionReader

// Multi-tenant instances
instancesMtx sync.RWMutex
instances map[string]*instance
wal *wal.WAL
overrides overrides.Interface
instancesMtx sync.RWMutex
instances map[string]*instance
wal *wal.WAL
completeBlockEncoding encoding.VersionedEncoding
overrides overrides.Interface

// Background processing
ctx context.Context // context for the service. all background processes should exit if this is cancelled
Expand All @@ -148,19 +150,26 @@
}

func New(cfg Config, overridesService overrides.Interface, logger log.Logger, reg prometheus.Registerer, singlePartition bool) (*LiveStore, error) {
completeBlockEncoding, walEncoding, encErr := coalesceBlockVersions(&cfg)
if encErr != nil {
return nil, encErr
}

Check notice on line 156 in modules/livestore/live_store.go

View workflow job for this annotation

GitHub Actions / Coverage Annotations

Uncovered lines

Lines 155-156 are not covered by tests
cfg.WAL.Version = walEncoding.Version()

ctx, cancel := context.WithCancel(context.Background())

s := &LiveStore{
cfg: cfg,
logger: logger,
reg: reg,
decoder: ingest.NewDecoder(),
ctx: ctx,
cancel: cancel,
instances: make(map[string]*instance),
overrides: overridesService,
completeQueues: flushqueues.New[*completeOp](cfg.CompleteBlockConcurrency, metricCompleteQueueLength),
startupComplete: make(chan struct{}),
cfg: cfg,
logger: logger,
reg: reg,
decoder: ingest.NewDecoder(),
completeBlockEncoding: completeBlockEncoding,
ctx: ctx,
cancel: cancel,
instances: make(map[string]*instance),
overrides: overridesService,
completeQueues: flushqueues.New[*completeOp](cfg.CompleteBlockConcurrency, metricCompleteQueueLength),
startupComplete: make(chan struct{}),
}

// Initialize ready state to starting
Expand Down Expand Up @@ -610,7 +619,7 @@
}

// Create new instance
inst, err := newInstance(tenantID, s.cfg, s.wal, s.overrides, s.logger)
inst, err := newInstance(tenantID, s.cfg, s.wal, s.completeBlockEncoding, s.overrides, s.logger)
if err != nil {
return nil, fmt.Errorf("failed to create instance for tenant %s: %w", tenantID, err)
}
Expand Down
6 changes: 3 additions & 3 deletions modules/livestore/live_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,11 +541,11 @@ func TestRequeueOnError(t *testing.T) {
inst, err := liveStore.getOrCreateInstance(testTenantID)
require.NoError(t, err)
enc := erroredEnc{
VersionedEncoding: inst.enc,
VersionedEncoding: inst.completeBlockEncoding,
mx: sync.Mutex{},
}
enc.SetError(errors.New("forced error"))
inst.enc = &enc
inst.completeBlockEncoding = &enc

// push data
expectedID, expectedTrace := pushToLiveStore(t, liveStore)
Expand All @@ -559,7 +559,7 @@ func TestRequeueOnError(t *testing.T) {
// wait for the first backoff that should not be successful
time.Sleep(initialBackoff * 2)
requireInstanceState(t, inst, instanceState{liveTraces: 0, walBlocks: 1, completeBlocks: 0})
// now enc does not error and block should be flushed successfully
// now completeBlockEncoding does not error and block should be flushed successfully
enc.SetError(nil)
time.Sleep(initialBackoff * 8)
requireInstanceState(t, inst, instanceState{liveTraces: 0, walBlocks: 0, completeBlocks: 1})
Expand Down
Loading
Loading