Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
* [ENHANCEMENT] Validate metrics-generator histogram buckets [#5991](https://github.com/grafana/tempo/pull/5991) (@carles-grafana)
* [ENHANCEMENT] Removed MustNewConstMetric to prevent panic and added validation for usage tracker config. Added `tempo_distributor_usage_tracker_errors_total` to surface errors in usage tracker. [#5981](https://github.com/grafana/tempo/pull/5981) (@electron0zero)
* [BUGFIX] Prevent slice panic when truncating series after topk() by adding bounds check in metrics query-range combiner [#6010](https://github.com/grafana/tempo/pull/6010) (@Syedowais312)
* [BUGFIX] Fix block-builder to more precisely validate block encoding on startup [#6037](https://github.com/grafana/tempo/pull/6037) (@mdisibio)
* [BUGFIX] Fix compactor to properly consider SSE-KMS information during metadata copy [#5774](https://github.com/grafana/tempo/pull/5774) (@steffsas)
* [BUGFIX] Fix `spss=0` parameter to properly mean unlimited spans instead of being rejected, and respect `max_spans_per_span_set=0` configuration [#5858](https://github.com/grafana/tempo/pull/5858) (@iamrajiv)
* [BUGFIX] Fix incorrect results in TraceQL compare() caused by potential hash collision of string array attributes [#5835](https://github.com/grafana/tempo/pull/5835) (@mdisibio)
Expand Down
23 changes: 17 additions & 6 deletions modules/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,10 @@
func (b *BlockBuilder) starting(ctx context.Context) (err error) {
level.Info(b.logger).Log("msg", "block builder starting")
topic := b.cfg.IngestStorageConfig.Kafka.Topic
b.enc = encoding.DefaultEncoding()
if version := b.cfg.BlockConfig.BlockCfg.Version; version != "" {
b.enc, err = encoding.FromVersion(version)
if err != nil {
return fmt.Errorf("failed to create encoding: %w", err)
}

b.enc, err = encoding.FromVersionForWrites(b.cfg.BlockConfig.BlockCfg.Version)
if err != nil {
return fmt.Errorf("failed to create encoding: %w", err)

Check notice on line 183 in modules/blockbuilder/blockbuilder.go

View workflow job for this annotation

GitHub Actions / Coverage Annotations

Uncovered line

Line 183 is not covered by tests
}

b.wal, err = wal.New(&b.cfg.WAL)
Expand Down Expand Up @@ -222,6 +220,9 @@
defer close(b.consumeStopped)
for {
// Create a detached context for consume
// This is so that when the parent context is canceled and the block builder is stopping,
// we still finish the current consumption and flush of blocks. That is preferred than
// to starting over after a restart.
consumeCtx, cancel := context.WithCancel(context.Background())

waitTime, err := b.consume(consumeCtx)
Expand All @@ -231,6 +232,16 @@
level.Error(b.logger).Log("msg", "consumeCycle failed", "err", err)
}

// Always check for cancellation before going to next cycle.
// There are cases like when the queue is lagged, that waitTime could be zero.
// In this case it's non-deterministic which select statement will be executed below,
// so we do a specific check here first.
if ctx.Err() != nil {
// Parent context canceled, return
return nil
}

// Now wait for next cycle or cancellation.
select {
case <-time.After(waitTime): // Continue with next cycle
case <-ctx.Done():
Expand Down
46 changes: 25 additions & 21 deletions modules/blockbuilder/blockbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ const (
testPartition = int32(0)
)

func TestMain(*testing.M) {
func TestMain(m *testing.M) {
pollTimeout = 2 * time.Second // speed up the tests
m.Run()
}

// When the partition starts with no existing commit,
Expand All @@ -60,7 +61,7 @@ func TestBlockbuilder_lookbackOnNoCommit(t *testing.T) {
store := newStore(ctx, t)
cfg := blockbuilderConfig(t, address, []int32{0})

b, err := New(cfg, test.NewTestingLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
b, err := New(cfg, testLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, b))
t.Cleanup(func() {
Expand Down Expand Up @@ -99,7 +100,7 @@ func TestBlockbuilder_without_partitions_assigned_returns_an_error(t *testing.T)
store := newStore(ctx, t)
cfg := blockbuilderConfig(t, address, []int32{})

b, err := New(cfg, test.NewTestingLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
b, err := New(cfg, testLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
require.NoError(t, err)
_, err = b.consume(ctx)
require.ErrorIs(t, err, errNoPartitionsAssigned)
Expand All @@ -117,7 +118,7 @@ func TestBlockbuilder_getAssignedPartitions(t *testing.T) {
20: {Id: 20, State: ring.PartitionActive},
})

b, err := New(cfg, test.NewTestingLogger(t), partitionRing, &mockOverrides{}, nil)
b, err := New(cfg, testLogger(t), partitionRing, &mockOverrides{}, nil)
require.NoError(t, err)
partitions := b.getAssignedPartitions()
assert.Equal(t, []int32{0, 2}, partitions)
Expand Down Expand Up @@ -155,7 +156,7 @@ func TestBlockbuilder_startWithCommit(t *testing.T) {
admClient := kadm.NewClient(client)
require.NoError(t, admClient.CommitAllOffsets(ctx, cfg.IngestStorageConfig.Kafka.ConsumerGroup, offsets))

b, err := New(cfg, test.NewTestingLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
b, err := New(cfg, testLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, b))
t.Cleanup(func() {
Expand Down Expand Up @@ -201,12 +202,11 @@ func TestBlockbuilder_flushingFails(t *testing.T) {
return store.WriteBlock(ctx, block)
})
cfg := blockbuilderConfig(t, address, []int32{0})
logger := test.NewTestingLogger(t)

client := testkafka.NewKafkaClient(t, cfg.IngestStorageConfig.Kafka.Address, cfg.IngestStorageConfig.Kafka.Topic)
producedRecords := testkafka.SendTracesFor(t, ctx, client, time.Second, 100*time.Millisecond, ingest.Encode) // Send for 1 second, <1 consumption cycles

b, err := New(cfg, logger, newPartitionRingReader(), &mockOverrides{}, store)
b, err := New(cfg, testLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, b))
t.Cleanup(func() {
Expand Down Expand Up @@ -242,7 +242,7 @@ func TestBlockbuilder_receivesOldRecords(t *testing.T) {
store := newStore(ctx, t)
cfg := blockbuilderConfig(t, address, []int32{0})

b, err := New(cfg, test.NewTestingLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
b, err := New(cfg, testLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, b))
t.Cleanup(func() {
Expand Down Expand Up @@ -328,12 +328,11 @@ func TestBlockbuilder_committingFails(t *testing.T) {

store := newStore(ctx, t)
cfg := blockbuilderConfig(t, address, []int32{0})
logger := test.NewTestingLogger(t)

client := testkafka.NewKafkaClient(t, cfg.IngestStorageConfig.Kafka.Address, cfg.IngestStorageConfig.Kafka.Topic)
producedRecords := testkafka.SendTracesFor(t, ctx, client, time.Second, 100*time.Millisecond, ingest.Encode) // Send for 1 second, <1 consumption cycle

b, err := New(cfg, logger, newPartitionRingReader(), &mockOverrides{}, store)
b, err := New(cfg, testLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, b))
t.Cleanup(func() {
Expand Down Expand Up @@ -386,7 +385,7 @@ func TestBlockbuilder_retries_on_retriable_commit_error(t *testing.T) {

store := newStore(ctx, t)
cfg := blockbuilderConfig(t, address, []int32{0})
logger := test.NewTestingLogger(t)
logger := testLogger(t)

client := testkafka.NewKafkaClient(t, cfg.IngestStorageConfig.Kafka.Address, cfg.IngestStorageConfig.Kafka.Topic)
producedRecords := testkafka.SendReq(ctx, t, client, ingest.Encode, util.FakeTenantID)
Expand Down Expand Up @@ -444,13 +443,12 @@ func TestBlockbuilder_retries_on_commit_error(t *testing.T) {

store := newStore(ctx, t)
cfg := blockbuilderConfig(t, address, []int32{0})
logger := test.NewTestingLogger(t)

client := testkafka.NewKafkaClient(t, cfg.IngestStorageConfig.Kafka.Address, cfg.IngestStorageConfig.Kafka.Topic)
producedRecords := testkafka.SendReq(ctx, t, client, ingest.Encode, util.FakeTenantID)
lastRecordOffset := producedRecords[len(producedRecords)-1].Offset

b, err := New(cfg, logger, newPartitionRingReader(), &mockOverrides{}, store)
b, err := New(cfg, testLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
require.NoError(t, err)

require.NoError(t, services.StartAndAwaitRunning(ctx, b))
Expand Down Expand Up @@ -498,7 +496,7 @@ func TestBlockbuilder_noDoubleConsumption(t *testing.T) {
lastRecordOffset := producedRecords[len(producedRecords)-1].Offset

// Create the block builder
b, err := New(cfg, test.NewTestingLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
b, err := New(cfg, testLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, b))
t.Cleanup(func() {
Expand Down Expand Up @@ -582,7 +580,7 @@ func TestBlockBuilder_honor_maxBytesPerCycle(t *testing.T) {
cfg := blockbuilderConfig(t, address, []int32{0})
cfg.MaxBytesPerCycle = uint64(tc.maxBytesPerCycle)

b, err := New(cfg, test.NewTestingLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
b, err := New(cfg, testLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, b))
t.Cleanup(func() {
Expand Down Expand Up @@ -684,7 +682,7 @@ func TestBlockbuilder_usesRecordTimestampForBlockStartAndEnd(t *testing.T) {
res := client.ProduceSync(ctx, records...)
require.NoError(t, res.FirstErr())

b, err := New(cfg, test.NewTestingLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
b, err := New(cfg, testLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, b))
t.Cleanup(func() {
Expand Down Expand Up @@ -766,7 +764,7 @@ func TestBlockbuilder_marksOldBlocksCompacted(t *testing.T) {
t.Cleanup(func() { require.NoError(t, services.StopAndAwaitTerminated(context.Background(), store)) })

// Create the block builder
b, err := New(cfg, test.NewTestingLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
b, err := New(cfg, testLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, b))
t.Cleanup(func() {
Expand Down Expand Up @@ -838,7 +836,7 @@ func TestBlockbuilder_gracefulShutdown(t *testing.T) {
testkafka.SendTracesFor(t, ctx, client, 60*time.Second, time.Second, ingest.Encode)
}()

b, err := New(cfg, test.NewTestingLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
b, err := New(cfg, testLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, b))

Expand Down Expand Up @@ -901,7 +899,7 @@ type ownEverythingSharder struct{}
func (o *ownEverythingSharder) Owns(string) bool { return true }

func newStore(ctx context.Context, t testing.TB) storage.Store {
store := newStoreWithLogger(ctx, t, test.NewTestingLogger(t), false)
store := newStoreWithLogger(ctx, t, testLogger(t), false)
t.Cleanup(func() {
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), store))
})
Expand Down Expand Up @@ -1138,7 +1136,7 @@ func TestBlockbuilder_twoPartitions_secondEmpty(t *testing.T) {
testkafka.SendReqWithOpts(ctx, t, client, ingest.Encode, testkafka.ReqOpts{Partition: 0, Time: reqTime, TenantID: util.FakeTenantID})

// And only then create block builder
b, err := New(cfg, test.NewTestingLogger(t), partitionRing, &mockOverrides{}, store)
b, err := New(cfg, testLogger(t), partitionRing, &mockOverrides{}, store)
require.NoError(t, err)

// Verify builder is listening to both partitions
Expand Down Expand Up @@ -1232,7 +1230,7 @@ func TestBlockbuilder_PartitionWithNoLag(t *testing.T) {
require.NoError(t, admClient.CommitAllOffsets(ctx, cfg.IngestStorageConfig.Kafka.ConsumerGroup, offsets))

// Create and start the block builder
b, err := New(cfg, test.NewTestingLogger(t), partitionRing, &mockOverrides{}, store)
b, err := New(cfg, testLogger(t), partitionRing, &mockOverrides{}, store)
require.NoError(t, err)

// Verify builder is listening to all partitions
Expand Down Expand Up @@ -1261,3 +1259,9 @@ func TestBlockbuilder_PartitionWithNoLag(t *testing.T) {
}
assert.Equal(t, int32(2), storageWrites.Load(), "unexpected number of storage writes")
}

func testLogger(_ testing.TB) log.Logger {
// Uncomment when we need full detail.
// return test.NewTestingLogger(t)
return log.NewNopLogger()
}
4 changes: 4 additions & 0 deletions modules/blockbuilder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ type Config struct {
}

func (c *Config) Validate() error {
if _, err := encoding.FromVersionForWrites(c.BlockConfig.BlockCfg.Version); err != nil {
return fmt.Errorf("block version validation failed: %w", err)
}

if c.BlockConfig.BlockCfg.Version != c.WAL.Version {
c.WAL.Version = c.BlockConfig.BlockCfg.Version
}
Expand Down
37 changes: 32 additions & 5 deletions modules/blockbuilder/config_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package blockbuilder

import (
"errors"
"flag"
"testing"

"github.com/grafana/tempo/tempodb/backend"
Expand All @@ -9,15 +11,24 @@ import (
v2 "github.com/grafana/tempo/tempodb/encoding/v2"
"github.com/grafana/tempo/tempodb/encoding/vparquet4"
"github.com/grafana/tempo/tempodb/wal"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestConfig_validate(t *testing.T) {
tests := []struct {
name string
cfg Config
expectedErr bool
expectedErr error
}{
{
name: "Default",
cfg: func() Config {
cfg := Config{}
cfg.RegisterFlagsAndApplyDefaults("", flag.NewFlagSet("", flag.ContinueOnError))
return cfg
}(),
expectedErr: nil,
},
{
name: "ValidConfig",
cfg: Config{
Expand All @@ -37,7 +48,7 @@ func TestConfig_validate(t *testing.T) {
Version: encoding.LatestEncoding().Version(),
},
},
expectedErr: false,
expectedErr: nil,
},
{
name: "InvalidBlockConfig",
Expand All @@ -52,13 +63,29 @@ func TestConfig_validate(t *testing.T) {
Version: v2.VersionString,
},
},
expectedErr: true,
expectedErr: errors.New("block config validation failed: positive index downsample required"),
},
{
name: "InvalidBlockVersion",
cfg: Config{
BlockConfig: BlockConfig{
BlockCfg: common.BlockConfig{
// This parses for reads but not for writes
Version: "vParquet5-preview1",
},
},
},
expectedErr: errors.New("block version validation failed: vParquet5-preview1 is not a valid block version for creating blocks"),
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
err := tc.cfg.Validate()
assert.Equal(t, tc.expectedErr, err != nil, "unexpected error: %v", err)
if tc.expectedErr == nil {
require.NoError(t, err)
} else {
require.Equal(t, tc.expectedErr.Error(), err.Error())
}
})
}
}
17 changes: 17 additions & 0 deletions tempodb/encoding/versioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,23 @@
}
}

// FromVersionForWrites returns a versioned encoding for the provided string, but only for
// encodings that are supported for creating new blocks. Deprecated or readonly encodings will return an error.
func FromVersionForWrites(v string) (VersionedEncoding, error) {
switch v {
case v2.VersionString:
return v2.Encoding{}, nil
case vparquet3.VersionString:
return vparquet3.Encoding{}, nil
case vparquet4.VersionString:
return vparquet4.Encoding{}, nil
case vparquet5.VersionString:
return vparquet5.Encoding{}, nil
default:
return nil, fmt.Errorf("%s is not a valid block version for creating blocks", v)

Check notice on line 103 in tempodb/encoding/versioned.go

View workflow job for this annotation

GitHub Actions / Coverage Annotations

Uncovered lines

Lines 92-103 are not covered by tests
}
}

// DefaultEncoding for newly written blocks.
func DefaultEncoding() VersionedEncoding {
return vparquet4.Encoding{}
Expand Down
Loading