diff --git a/CHANGELOG.md b/CHANGELOG.md index 854e8fc1451..e2948eadc7b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -49,6 +49,7 @@ * [ENHANCEMENT] Validate metrics-generator histogram buckets [#5991](https://github.com/grafana/tempo/pull/5991) (@carles-grafana) * [ENHANCEMENT] Removed MustNewConstMetric to prevent panic and added validation for usage tracker config. Added `tempo_distributor_usage_tracker_errors_total` to surface errors in usage tracker. [#5981](https://github.com/grafana/tempo/pull/5981) (@electron0zero) * [BUGFIX] Prevent slice panic when truncating series after topk() by adding bounds check in metrics query-range combiner [#6010](https://github.com/grafana/tempo/pull/6010) (@Syedowais312) +* [BUGFIX] Fix block-builder to more precisely validate block encoding on startup [#6037](https://github.com/grafana/tempo/pull/6037) (@mdisibio) * [BUGFIX] Fix compactor to properly consider SSE-KMS information during metadata copy [#5774](https://github.com/grafana/tempo/pull/5774) (@steffsas) * [BUGFIX] Fix `spss=0` parameter to properly mean unlimited spans instead of being rejected, and respect `max_spans_per_span_set=0` configuration [#5858](https://github.com/grafana/tempo/pull/5858) (@iamrajiv) * [BUGFIX] Fix incorrect results in TraceQL compare() caused by potential hash collision of string array attributes [#5835](https://github.com/grafana/tempo/pull/5835) (@mdisibio) diff --git a/modules/blockbuilder/blockbuilder.go b/modules/blockbuilder/blockbuilder.go index df3d346ecac..78838300e48 100644 --- a/modules/blockbuilder/blockbuilder.go +++ b/modules/blockbuilder/blockbuilder.go @@ -177,12 +177,10 @@ func New( func (b *BlockBuilder) starting(ctx context.Context) (err error) { level.Info(b.logger).Log("msg", "block builder starting") topic := b.cfg.IngestStorageConfig.Kafka.Topic - b.enc = encoding.DefaultEncoding() - if version := b.cfg.BlockConfig.BlockCfg.Version; version != "" { - b.enc, err = encoding.FromVersion(version) - if err != nil { - return fmt.Errorf("failed to create encoding: %w", err) - } + + b.enc, err = encoding.FromVersionForWrites(b.cfg.BlockConfig.BlockCfg.Version) + if err != nil { + return fmt.Errorf("failed to create encoding: %w", err) } b.wal, err = wal.New(&b.cfg.WAL) @@ -222,6 +220,9 @@ func (b *BlockBuilder) running(ctx context.Context) error { defer close(b.consumeStopped) for { // Create a detached context for consume + // This is so that when the parent context is canceled and the block builder is stopping, + // we still finish the current consumption and flush of blocks. That is preferred than + // to starting over after a restart. consumeCtx, cancel := context.WithCancel(context.Background()) waitTime, err := b.consume(consumeCtx) @@ -231,6 +232,16 @@ func (b *BlockBuilder) running(ctx context.Context) error { level.Error(b.logger).Log("msg", "consumeCycle failed", "err", err) } + // Always check for cancellation before going to next cycle. + // There are cases like when the queue is lagged, that waitTime could be zero. + // In this case it's non-deterministic which select statement will be executed below, + // so we do a specific check here first. + if ctx.Err() != nil { + // Parent context canceled, return + return nil + } + + // Now wait for next cycle or cancellation. select { case <-time.After(waitTime): // Continue with next cycle case <-ctx.Done(): diff --git a/modules/blockbuilder/blockbuilder_test.go b/modules/blockbuilder/blockbuilder_test.go index 549fc70bda7..bd6b652298e 100644 --- a/modules/blockbuilder/blockbuilder_test.go +++ b/modules/blockbuilder/blockbuilder_test.go @@ -39,8 +39,9 @@ const ( testPartition = int32(0) ) -func TestMain(*testing.M) { +func TestMain(m *testing.M) { pollTimeout = 2 * time.Second // speed up the tests + m.Run() } // When the partition starts with no existing commit, @@ -60,7 +61,7 @@ func TestBlockbuilder_lookbackOnNoCommit(t *testing.T) { store := newStore(ctx, t) cfg := blockbuilderConfig(t, address, []int32{0}) - b, err := New(cfg, test.NewTestingLogger(t), newPartitionRingReader(), &mockOverrides{}, store) + b, err := New(cfg, testLogger(t), newPartitionRingReader(), &mockOverrides{}, store) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(ctx, b)) t.Cleanup(func() { @@ -99,7 +100,7 @@ func TestBlockbuilder_without_partitions_assigned_returns_an_error(t *testing.T) store := newStore(ctx, t) cfg := blockbuilderConfig(t, address, []int32{}) - b, err := New(cfg, test.NewTestingLogger(t), newPartitionRingReader(), &mockOverrides{}, store) + b, err := New(cfg, testLogger(t), newPartitionRingReader(), &mockOverrides{}, store) require.NoError(t, err) _, err = b.consume(ctx) require.ErrorIs(t, err, errNoPartitionsAssigned) @@ -117,7 +118,7 @@ func TestBlockbuilder_getAssignedPartitions(t *testing.T) { 20: {Id: 20, State: ring.PartitionActive}, }) - b, err := New(cfg, test.NewTestingLogger(t), partitionRing, &mockOverrides{}, nil) + b, err := New(cfg, testLogger(t), partitionRing, &mockOverrides{}, nil) require.NoError(t, err) partitions := b.getAssignedPartitions() assert.Equal(t, []int32{0, 2}, partitions) @@ -155,7 +156,7 @@ func TestBlockbuilder_startWithCommit(t *testing.T) { admClient := kadm.NewClient(client) require.NoError(t, admClient.CommitAllOffsets(ctx, cfg.IngestStorageConfig.Kafka.ConsumerGroup, offsets)) - b, err := New(cfg, test.NewTestingLogger(t), newPartitionRingReader(), &mockOverrides{}, store) + b, err := New(cfg, testLogger(t), newPartitionRingReader(), &mockOverrides{}, store) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(ctx, b)) t.Cleanup(func() { @@ -201,12 +202,11 @@ func TestBlockbuilder_flushingFails(t *testing.T) { return store.WriteBlock(ctx, block) }) cfg := blockbuilderConfig(t, address, []int32{0}) - logger := test.NewTestingLogger(t) client := testkafka.NewKafkaClient(t, cfg.IngestStorageConfig.Kafka.Address, cfg.IngestStorageConfig.Kafka.Topic) producedRecords := testkafka.SendTracesFor(t, ctx, client, time.Second, 100*time.Millisecond, ingest.Encode) // Send for 1 second, <1 consumption cycles - b, err := New(cfg, logger, newPartitionRingReader(), &mockOverrides{}, store) + b, err := New(cfg, testLogger(t), newPartitionRingReader(), &mockOverrides{}, store) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(ctx, b)) t.Cleanup(func() { @@ -242,7 +242,7 @@ func TestBlockbuilder_receivesOldRecords(t *testing.T) { store := newStore(ctx, t) cfg := blockbuilderConfig(t, address, []int32{0}) - b, err := New(cfg, test.NewTestingLogger(t), newPartitionRingReader(), &mockOverrides{}, store) + b, err := New(cfg, testLogger(t), newPartitionRingReader(), &mockOverrides{}, store) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(ctx, b)) t.Cleanup(func() { @@ -328,12 +328,11 @@ func TestBlockbuilder_committingFails(t *testing.T) { store := newStore(ctx, t) cfg := blockbuilderConfig(t, address, []int32{0}) - logger := test.NewTestingLogger(t) client := testkafka.NewKafkaClient(t, cfg.IngestStorageConfig.Kafka.Address, cfg.IngestStorageConfig.Kafka.Topic) producedRecords := testkafka.SendTracesFor(t, ctx, client, time.Second, 100*time.Millisecond, ingest.Encode) // Send for 1 second, <1 consumption cycle - b, err := New(cfg, logger, newPartitionRingReader(), &mockOverrides{}, store) + b, err := New(cfg, testLogger(t), newPartitionRingReader(), &mockOverrides{}, store) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(ctx, b)) t.Cleanup(func() { @@ -386,7 +385,7 @@ func TestBlockbuilder_retries_on_retriable_commit_error(t *testing.T) { store := newStore(ctx, t) cfg := blockbuilderConfig(t, address, []int32{0}) - logger := test.NewTestingLogger(t) + logger := testLogger(t) client := testkafka.NewKafkaClient(t, cfg.IngestStorageConfig.Kafka.Address, cfg.IngestStorageConfig.Kafka.Topic) producedRecords := testkafka.SendReq(ctx, t, client, ingest.Encode, util.FakeTenantID) @@ -444,13 +443,12 @@ func TestBlockbuilder_retries_on_commit_error(t *testing.T) { store := newStore(ctx, t) cfg := blockbuilderConfig(t, address, []int32{0}) - logger := test.NewTestingLogger(t) client := testkafka.NewKafkaClient(t, cfg.IngestStorageConfig.Kafka.Address, cfg.IngestStorageConfig.Kafka.Topic) producedRecords := testkafka.SendReq(ctx, t, client, ingest.Encode, util.FakeTenantID) lastRecordOffset := producedRecords[len(producedRecords)-1].Offset - b, err := New(cfg, logger, newPartitionRingReader(), &mockOverrides{}, store) + b, err := New(cfg, testLogger(t), newPartitionRingReader(), &mockOverrides{}, store) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(ctx, b)) @@ -498,7 +496,7 @@ func TestBlockbuilder_noDoubleConsumption(t *testing.T) { lastRecordOffset := producedRecords[len(producedRecords)-1].Offset // Create the block builder - b, err := New(cfg, test.NewTestingLogger(t), newPartitionRingReader(), &mockOverrides{}, store) + b, err := New(cfg, testLogger(t), newPartitionRingReader(), &mockOverrides{}, store) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(ctx, b)) t.Cleanup(func() { @@ -582,7 +580,7 @@ func TestBlockBuilder_honor_maxBytesPerCycle(t *testing.T) { cfg := blockbuilderConfig(t, address, []int32{0}) cfg.MaxBytesPerCycle = uint64(tc.maxBytesPerCycle) - b, err := New(cfg, test.NewTestingLogger(t), newPartitionRingReader(), &mockOverrides{}, store) + b, err := New(cfg, testLogger(t), newPartitionRingReader(), &mockOverrides{}, store) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(ctx, b)) t.Cleanup(func() { @@ -684,7 +682,7 @@ func TestBlockbuilder_usesRecordTimestampForBlockStartAndEnd(t *testing.T) { res := client.ProduceSync(ctx, records...) require.NoError(t, res.FirstErr()) - b, err := New(cfg, test.NewTestingLogger(t), newPartitionRingReader(), &mockOverrides{}, store) + b, err := New(cfg, testLogger(t), newPartitionRingReader(), &mockOverrides{}, store) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(ctx, b)) t.Cleanup(func() { @@ -766,7 +764,7 @@ func TestBlockbuilder_marksOldBlocksCompacted(t *testing.T) { t.Cleanup(func() { require.NoError(t, services.StopAndAwaitTerminated(context.Background(), store)) }) // Create the block builder - b, err := New(cfg, test.NewTestingLogger(t), newPartitionRingReader(), &mockOverrides{}, store) + b, err := New(cfg, testLogger(t), newPartitionRingReader(), &mockOverrides{}, store) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(ctx, b)) t.Cleanup(func() { @@ -838,7 +836,7 @@ func TestBlockbuilder_gracefulShutdown(t *testing.T) { testkafka.SendTracesFor(t, ctx, client, 60*time.Second, time.Second, ingest.Encode) }() - b, err := New(cfg, test.NewTestingLogger(t), newPartitionRingReader(), &mockOverrides{}, store) + b, err := New(cfg, testLogger(t), newPartitionRingReader(), &mockOverrides{}, store) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(ctx, b)) @@ -901,7 +899,7 @@ type ownEverythingSharder struct{} func (o *ownEverythingSharder) Owns(string) bool { return true } func newStore(ctx context.Context, t testing.TB) storage.Store { - store := newStoreWithLogger(ctx, t, test.NewTestingLogger(t), false) + store := newStoreWithLogger(ctx, t, testLogger(t), false) t.Cleanup(func() { require.NoError(t, services.StopAndAwaitTerminated(context.Background(), store)) }) @@ -1138,7 +1136,7 @@ func TestBlockbuilder_twoPartitions_secondEmpty(t *testing.T) { testkafka.SendReqWithOpts(ctx, t, client, ingest.Encode, testkafka.ReqOpts{Partition: 0, Time: reqTime, TenantID: util.FakeTenantID}) // And only then create block builder - b, err := New(cfg, test.NewTestingLogger(t), partitionRing, &mockOverrides{}, store) + b, err := New(cfg, testLogger(t), partitionRing, &mockOverrides{}, store) require.NoError(t, err) // Verify builder is listening to both partitions @@ -1232,7 +1230,7 @@ func TestBlockbuilder_PartitionWithNoLag(t *testing.T) { require.NoError(t, admClient.CommitAllOffsets(ctx, cfg.IngestStorageConfig.Kafka.ConsumerGroup, offsets)) // Create and start the block builder - b, err := New(cfg, test.NewTestingLogger(t), partitionRing, &mockOverrides{}, store) + b, err := New(cfg, testLogger(t), partitionRing, &mockOverrides{}, store) require.NoError(t, err) // Verify builder is listening to all partitions @@ -1261,3 +1259,9 @@ func TestBlockbuilder_PartitionWithNoLag(t *testing.T) { } assert.Equal(t, int32(2), storageWrites.Load(), "unexpected number of storage writes") } + +func testLogger(_ testing.TB) log.Logger { + // Uncomment when we need full detail. + // return test.NewTestingLogger(t) + return log.NewNopLogger() +} diff --git a/modules/blockbuilder/config.go b/modules/blockbuilder/config.go index be405d7609c..dc0212e54cd 100644 --- a/modules/blockbuilder/config.go +++ b/modules/blockbuilder/config.go @@ -46,6 +46,10 @@ type Config struct { } func (c *Config) Validate() error { + if _, err := encoding.FromVersionForWrites(c.BlockConfig.BlockCfg.Version); err != nil { + return fmt.Errorf("block version validation failed: %w", err) + } + if c.BlockConfig.BlockCfg.Version != c.WAL.Version { c.WAL.Version = c.BlockConfig.BlockCfg.Version } diff --git a/modules/blockbuilder/config_test.go b/modules/blockbuilder/config_test.go index c37a88b5c5e..b2cbd25d35e 100644 --- a/modules/blockbuilder/config_test.go +++ b/modules/blockbuilder/config_test.go @@ -1,6 +1,8 @@ package blockbuilder import ( + "errors" + "flag" "testing" "github.com/grafana/tempo/tempodb/backend" @@ -9,15 +11,24 @@ import ( v2 "github.com/grafana/tempo/tempodb/encoding/v2" "github.com/grafana/tempo/tempodb/encoding/vparquet4" "github.com/grafana/tempo/tempodb/wal" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestConfig_validate(t *testing.T) { tests := []struct { name string cfg Config - expectedErr bool + expectedErr error }{ + { + name: "Default", + cfg: func() Config { + cfg := Config{} + cfg.RegisterFlagsAndApplyDefaults("", flag.NewFlagSet("", flag.ContinueOnError)) + return cfg + }(), + expectedErr: nil, + }, { name: "ValidConfig", cfg: Config{ @@ -37,7 +48,7 @@ func TestConfig_validate(t *testing.T) { Version: encoding.LatestEncoding().Version(), }, }, - expectedErr: false, + expectedErr: nil, }, { name: "InvalidBlockConfig", @@ -52,13 +63,29 @@ func TestConfig_validate(t *testing.T) { Version: v2.VersionString, }, }, - expectedErr: true, + expectedErr: errors.New("block config validation failed: positive index downsample required"), + }, + { + name: "InvalidBlockVersion", + cfg: Config{ + BlockConfig: BlockConfig{ + BlockCfg: common.BlockConfig{ + // This parses for reads but not for writes + Version: "vParquet5-preview1", + }, + }, + }, + expectedErr: errors.New("block version validation failed: vParquet5-preview1 is not a valid block version for creating blocks"), }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { err := tc.cfg.Validate() - assert.Equal(t, tc.expectedErr, err != nil, "unexpected error: %v", err) + if tc.expectedErr == nil { + require.NoError(t, err) + } else { + require.Equal(t, tc.expectedErr.Error(), err.Error()) + } }) } } diff --git a/tempodb/encoding/versioned.go b/tempodb/encoding/versioned.go index bb6898dae0d..ba370c43cf7 100644 --- a/tempodb/encoding/versioned.go +++ b/tempodb/encoding/versioned.go @@ -87,6 +87,23 @@ func FromVersion(v string) (VersionedEncoding, error) { } } +// FromVersionForWrites returns a versioned encoding for the provided string, but only for +// encodings that are supported for creating new blocks. Deprecated or readonly encodings will return an error. +func FromVersionForWrites(v string) (VersionedEncoding, error) { + switch v { + case v2.VersionString: + return v2.Encoding{}, nil + case vparquet3.VersionString: + return vparquet3.Encoding{}, nil + case vparquet4.VersionString: + return vparquet4.Encoding{}, nil + case vparquet5.VersionString: + return vparquet5.Encoding{}, nil + default: + return nil, fmt.Errorf("%s is not a valid block version for creating blocks", v) + } +} + // DefaultEncoding for newly written blocks. func DefaultEncoding() VersionedEncoding { return vparquet4.Encoding{}