Skip to content

Commit 47427ae

Browse files
mdisibiooleg-kozlyuk-grafana
authored andcommitted
Tighten up block builder encoding validation, fix tests to actually run (grafana#6037)
* Tighten up block builder encoding validation, fix tests to actually run * changelog * Better shutdown checks?
1 parent 8dd986c commit 47427ae

6 files changed

Lines changed: 96 additions & 32 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
* [ENHANCEMENT] Validate metrics-generator histogram buckets [#5991](https://github.com/grafana/tempo/pull/5991) (@carles-grafana)
5050
* [ENHANCEMENT] Removed MustNewConstMetric to prevent panic and added validation for usage tracker config. Added `tempo_distributor_usage_tracker_errors_total` to surface errors in usage tracker. [#5981](https://github.com/grafana/tempo/pull/5981) (@electron0zero)
5151
* [BUGFIX] Prevent slice panic when truncating series after topk() by adding bounds check in metrics query-range combiner [#6010](https://github.com/grafana/tempo/pull/6010) (@Syedowais312)
52+
* [BUGFIX] Fix block-builder to more precisely validate block encoding on startup [#6037](https://github.com/grafana/tempo/pull/6037) (@mdisibio)
5253
* [BUGFIX] Fix compactor to properly consider SSE-KMS information during metadata copy [#5774](https://github.com/grafana/tempo/pull/5774) (@steffsas)
5354
* [BUGFIX] Fix `spss=0` parameter to properly mean unlimited spans instead of being rejected, and respect `max_spans_per_span_set=0` configuration [#5858](https://github.com/grafana/tempo/pull/5858) (@iamrajiv)
5455
* [BUGFIX] Fix incorrect results in TraceQL compare() caused by potential hash collision of string array attributes [#5835](https://github.com/grafana/tempo/pull/5835) (@mdisibio)

modules/blockbuilder/blockbuilder.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -177,12 +177,10 @@ func New(
177177
func (b *BlockBuilder) starting(ctx context.Context) (err error) {
178178
level.Info(b.logger).Log("msg", "block builder starting")
179179
topic := b.cfg.IngestStorageConfig.Kafka.Topic
180-
b.enc = encoding.DefaultEncoding()
181-
if version := b.cfg.BlockConfig.BlockCfg.Version; version != "" {
182-
b.enc, err = encoding.FromVersion(version)
183-
if err != nil {
184-
return fmt.Errorf("failed to create encoding: %w", err)
185-
}
180+
181+
b.enc, err = encoding.FromVersionForWrites(b.cfg.BlockConfig.BlockCfg.Version)
182+
if err != nil {
183+
return fmt.Errorf("failed to create encoding: %w", err)
186184
}
187185

188186
b.wal, err = wal.New(&b.cfg.WAL)
@@ -222,6 +220,9 @@ func (b *BlockBuilder) running(ctx context.Context) error {
222220
defer close(b.consumeStopped)
223221
for {
224222
// Create a detached context for consume
223+
// This is so that when the parent context is canceled and the block builder is stopping,
224+
// we still finish the current consumption and flush of blocks. That is preferred than
225+
// to starting over after a restart.
225226
consumeCtx, cancel := context.WithCancel(context.Background())
226227

227228
waitTime, err := b.consume(consumeCtx)
@@ -231,6 +232,16 @@ func (b *BlockBuilder) running(ctx context.Context) error {
231232
level.Error(b.logger).Log("msg", "consumeCycle failed", "err", err)
232233
}
233234

235+
// Always check for cancellation before going to next cycle.
236+
// There are cases like when the queue is lagged, that waitTime could be zero.
237+
// In this case it's non-deterministic which select statement will be executed below,
238+
// so we do a specific check here first.
239+
if ctx.Err() != nil {
240+
// Parent context canceled, return
241+
return nil
242+
}
243+
244+
// Now wait for next cycle or cancellation.
234245
select {
235246
case <-time.After(waitTime): // Continue with next cycle
236247
case <-ctx.Done():

modules/blockbuilder/blockbuilder_test.go

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,9 @@ const (
3939
testPartition = int32(0)
4040
)
4141

42-
func TestMain(*testing.M) {
42+
func TestMain(m *testing.M) {
4343
pollTimeout = 2 * time.Second // speed up the tests
44+
m.Run()
4445
}
4546

4647
// When the partition starts with no existing commit,
@@ -60,7 +61,7 @@ func TestBlockbuilder_lookbackOnNoCommit(t *testing.T) {
6061
store := newStore(ctx, t)
6162
cfg := blockbuilderConfig(t, address, []int32{0})
6263

63-
b, err := New(cfg, test.NewTestingLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
64+
b, err := New(cfg, testLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
6465
require.NoError(t, err)
6566
require.NoError(t, services.StartAndAwaitRunning(ctx, b))
6667
t.Cleanup(func() {
@@ -99,7 +100,7 @@ func TestBlockbuilder_without_partitions_assigned_returns_an_error(t *testing.T)
99100
store := newStore(ctx, t)
100101
cfg := blockbuilderConfig(t, address, []int32{})
101102

102-
b, err := New(cfg, test.NewTestingLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
103+
b, err := New(cfg, testLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
103104
require.NoError(t, err)
104105
_, err = b.consume(ctx)
105106
require.ErrorIs(t, err, errNoPartitionsAssigned)
@@ -117,7 +118,7 @@ func TestBlockbuilder_getAssignedPartitions(t *testing.T) {
117118
20: {Id: 20, State: ring.PartitionActive},
118119
})
119120

120-
b, err := New(cfg, test.NewTestingLogger(t), partitionRing, &mockOverrides{}, nil)
121+
b, err := New(cfg, testLogger(t), partitionRing, &mockOverrides{}, nil)
121122
require.NoError(t, err)
122123
partitions := b.getAssignedPartitions()
123124
assert.Equal(t, []int32{0, 2}, partitions)
@@ -155,7 +156,7 @@ func TestBlockbuilder_startWithCommit(t *testing.T) {
155156
admClient := kadm.NewClient(client)
156157
require.NoError(t, admClient.CommitAllOffsets(ctx, cfg.IngestStorageConfig.Kafka.ConsumerGroup, offsets))
157158

158-
b, err := New(cfg, test.NewTestingLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
159+
b, err := New(cfg, testLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
159160
require.NoError(t, err)
160161
require.NoError(t, services.StartAndAwaitRunning(ctx, b))
161162
t.Cleanup(func() {
@@ -201,12 +202,11 @@ func TestBlockbuilder_flushingFails(t *testing.T) {
201202
return store.WriteBlock(ctx, block)
202203
})
203204
cfg := blockbuilderConfig(t, address, []int32{0})
204-
logger := test.NewTestingLogger(t)
205205

206206
client := testkafka.NewKafkaClient(t, cfg.IngestStorageConfig.Kafka.Address, cfg.IngestStorageConfig.Kafka.Topic)
207207
producedRecords := testkafka.SendTracesFor(t, ctx, client, time.Second, 100*time.Millisecond, ingest.Encode) // Send for 1 second, <1 consumption cycles
208208

209-
b, err := New(cfg, logger, newPartitionRingReader(), &mockOverrides{}, store)
209+
b, err := New(cfg, testLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
210210
require.NoError(t, err)
211211
require.NoError(t, services.StartAndAwaitRunning(ctx, b))
212212
t.Cleanup(func() {
@@ -242,7 +242,7 @@ func TestBlockbuilder_receivesOldRecords(t *testing.T) {
242242
store := newStore(ctx, t)
243243
cfg := blockbuilderConfig(t, address, []int32{0})
244244

245-
b, err := New(cfg, test.NewTestingLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
245+
b, err := New(cfg, testLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
246246
require.NoError(t, err)
247247
require.NoError(t, services.StartAndAwaitRunning(ctx, b))
248248
t.Cleanup(func() {
@@ -328,12 +328,11 @@ func TestBlockbuilder_committingFails(t *testing.T) {
328328

329329
store := newStore(ctx, t)
330330
cfg := blockbuilderConfig(t, address, []int32{0})
331-
logger := test.NewTestingLogger(t)
332331

333332
client := testkafka.NewKafkaClient(t, cfg.IngestStorageConfig.Kafka.Address, cfg.IngestStorageConfig.Kafka.Topic)
334333
producedRecords := testkafka.SendTracesFor(t, ctx, client, time.Second, 100*time.Millisecond, ingest.Encode) // Send for 1 second, <1 consumption cycle
335334

336-
b, err := New(cfg, logger, newPartitionRingReader(), &mockOverrides{}, store)
335+
b, err := New(cfg, testLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
337336
require.NoError(t, err)
338337
require.NoError(t, services.StartAndAwaitRunning(ctx, b))
339338
t.Cleanup(func() {
@@ -386,7 +385,7 @@ func TestBlockbuilder_retries_on_retriable_commit_error(t *testing.T) {
386385

387386
store := newStore(ctx, t)
388387
cfg := blockbuilderConfig(t, address, []int32{0})
389-
logger := test.NewTestingLogger(t)
388+
logger := testLogger(t)
390389

391390
client := testkafka.NewKafkaClient(t, cfg.IngestStorageConfig.Kafka.Address, cfg.IngestStorageConfig.Kafka.Topic)
392391
producedRecords := testkafka.SendReq(ctx, t, client, ingest.Encode, util.FakeTenantID)
@@ -444,13 +443,12 @@ func TestBlockbuilder_retries_on_commit_error(t *testing.T) {
444443

445444
store := newStore(ctx, t)
446445
cfg := blockbuilderConfig(t, address, []int32{0})
447-
logger := test.NewTestingLogger(t)
448446

449447
client := testkafka.NewKafkaClient(t, cfg.IngestStorageConfig.Kafka.Address, cfg.IngestStorageConfig.Kafka.Topic)
450448
producedRecords := testkafka.SendReq(ctx, t, client, ingest.Encode, util.FakeTenantID)
451449
lastRecordOffset := producedRecords[len(producedRecords)-1].Offset
452450

453-
b, err := New(cfg, logger, newPartitionRingReader(), &mockOverrides{}, store)
451+
b, err := New(cfg, testLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
454452
require.NoError(t, err)
455453

456454
require.NoError(t, services.StartAndAwaitRunning(ctx, b))
@@ -498,7 +496,7 @@ func TestBlockbuilder_noDoubleConsumption(t *testing.T) {
498496
lastRecordOffset := producedRecords[len(producedRecords)-1].Offset
499497

500498
// Create the block builder
501-
b, err := New(cfg, test.NewTestingLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
499+
b, err := New(cfg, testLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
502500
require.NoError(t, err)
503501
require.NoError(t, services.StartAndAwaitRunning(ctx, b))
504502
t.Cleanup(func() {
@@ -582,7 +580,7 @@ func TestBlockBuilder_honor_maxBytesPerCycle(t *testing.T) {
582580
cfg := blockbuilderConfig(t, address, []int32{0})
583581
cfg.MaxBytesPerCycle = uint64(tc.maxBytesPerCycle)
584582

585-
b, err := New(cfg, test.NewTestingLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
583+
b, err := New(cfg, testLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
586584
require.NoError(t, err)
587585
require.NoError(t, services.StartAndAwaitRunning(ctx, b))
588586
t.Cleanup(func() {
@@ -684,7 +682,7 @@ func TestBlockbuilder_usesRecordTimestampForBlockStartAndEnd(t *testing.T) {
684682
res := client.ProduceSync(ctx, records...)
685683
require.NoError(t, res.FirstErr())
686684

687-
b, err := New(cfg, test.NewTestingLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
685+
b, err := New(cfg, testLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
688686
require.NoError(t, err)
689687
require.NoError(t, services.StartAndAwaitRunning(ctx, b))
690688
t.Cleanup(func() {
@@ -766,7 +764,7 @@ func TestBlockbuilder_marksOldBlocksCompacted(t *testing.T) {
766764
t.Cleanup(func() { require.NoError(t, services.StopAndAwaitTerminated(context.Background(), store)) })
767765

768766
// Create the block builder
769-
b, err := New(cfg, test.NewTestingLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
767+
b, err := New(cfg, testLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
770768
require.NoError(t, err)
771769
require.NoError(t, services.StartAndAwaitRunning(ctx, b))
772770
t.Cleanup(func() {
@@ -838,7 +836,7 @@ func TestBlockbuilder_gracefulShutdown(t *testing.T) {
838836
testkafka.SendTracesFor(t, ctx, client, 60*time.Second, time.Second, ingest.Encode)
839837
}()
840838

841-
b, err := New(cfg, test.NewTestingLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
839+
b, err := New(cfg, testLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
842840
require.NoError(t, err)
843841
require.NoError(t, services.StartAndAwaitRunning(ctx, b))
844842

@@ -901,7 +899,7 @@ type ownEverythingSharder struct{}
901899
func (o *ownEverythingSharder) Owns(string) bool { return true }
902900

903901
func newStore(ctx context.Context, t testing.TB) storage.Store {
904-
store := newStoreWithLogger(ctx, t, test.NewTestingLogger(t), false)
902+
store := newStoreWithLogger(ctx, t, testLogger(t), false)
905903
t.Cleanup(func() {
906904
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), store))
907905
})
@@ -1138,7 +1136,7 @@ func TestBlockbuilder_twoPartitions_secondEmpty(t *testing.T) {
11381136
testkafka.SendReqWithOpts(ctx, t, client, ingest.Encode, testkafka.ReqOpts{Partition: 0, Time: reqTime, TenantID: util.FakeTenantID})
11391137

11401138
// And only then create block builder
1141-
b, err := New(cfg, test.NewTestingLogger(t), partitionRing, &mockOverrides{}, store)
1139+
b, err := New(cfg, testLogger(t), partitionRing, &mockOverrides{}, store)
11421140
require.NoError(t, err)
11431141

11441142
// Verify builder is listening to both partitions
@@ -1232,7 +1230,7 @@ func TestBlockbuilder_PartitionWithNoLag(t *testing.T) {
12321230
require.NoError(t, admClient.CommitAllOffsets(ctx, cfg.IngestStorageConfig.Kafka.ConsumerGroup, offsets))
12331231

12341232
// Create and start the block builder
1235-
b, err := New(cfg, test.NewTestingLogger(t), partitionRing, &mockOverrides{}, store)
1233+
b, err := New(cfg, testLogger(t), partitionRing, &mockOverrides{}, store)
12361234
require.NoError(t, err)
12371235

12381236
// Verify builder is listening to all partitions
@@ -1261,3 +1259,9 @@ func TestBlockbuilder_PartitionWithNoLag(t *testing.T) {
12611259
}
12621260
assert.Equal(t, int32(2), storageWrites.Load(), "unexpected number of storage writes")
12631261
}
1262+
1263+
func testLogger(_ testing.TB) log.Logger {
1264+
// Uncomment when we need full detail.
1265+
// return test.NewTestingLogger(t)
1266+
return log.NewNopLogger()
1267+
}

modules/blockbuilder/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ type Config struct {
4646
}
4747

4848
func (c *Config) Validate() error {
49+
if _, err := encoding.FromVersionForWrites(c.BlockConfig.BlockCfg.Version); err != nil {
50+
return fmt.Errorf("block version validation failed: %w", err)
51+
}
52+
4953
if c.BlockConfig.BlockCfg.Version != c.WAL.Version {
5054
c.WAL.Version = c.BlockConfig.BlockCfg.Version
5155
}

modules/blockbuilder/config_test.go

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package blockbuilder
22

33
import (
4+
"errors"
5+
"flag"
46
"testing"
57

68
"github.com/grafana/tempo/tempodb/backend"
@@ -9,15 +11,24 @@ import (
911
v2 "github.com/grafana/tempo/tempodb/encoding/v2"
1012
"github.com/grafana/tempo/tempodb/encoding/vparquet4"
1113
"github.com/grafana/tempo/tempodb/wal"
12-
"github.com/stretchr/testify/assert"
14+
"github.com/stretchr/testify/require"
1315
)
1416

1517
func TestConfig_validate(t *testing.T) {
1618
tests := []struct {
1719
name string
1820
cfg Config
19-
expectedErr bool
21+
expectedErr error
2022
}{
23+
{
24+
name: "Default",
25+
cfg: func() Config {
26+
cfg := Config{}
27+
cfg.RegisterFlagsAndApplyDefaults("", flag.NewFlagSet("", flag.ContinueOnError))
28+
return cfg
29+
}(),
30+
expectedErr: nil,
31+
},
2132
{
2233
name: "ValidConfig",
2334
cfg: Config{
@@ -37,7 +48,7 @@ func TestConfig_validate(t *testing.T) {
3748
Version: encoding.LatestEncoding().Version(),
3849
},
3950
},
40-
expectedErr: false,
51+
expectedErr: nil,
4152
},
4253
{
4354
name: "InvalidBlockConfig",
@@ -52,13 +63,29 @@ func TestConfig_validate(t *testing.T) {
5263
Version: v2.VersionString,
5364
},
5465
},
55-
expectedErr: true,
66+
expectedErr: errors.New("block config validation failed: positive index downsample required"),
67+
},
68+
{
69+
name: "InvalidBlockVersion",
70+
cfg: Config{
71+
BlockConfig: BlockConfig{
72+
BlockCfg: common.BlockConfig{
73+
// This parses for reads but not for writes
74+
Version: "vParquet5-preview1",
75+
},
76+
},
77+
},
78+
expectedErr: errors.New("block version validation failed: vParquet5-preview1 is not a valid block version for creating blocks"),
5679
},
5780
}
5881
for _, tc := range tests {
5982
t.Run(tc.name, func(t *testing.T) {
6083
err := tc.cfg.Validate()
61-
assert.Equal(t, tc.expectedErr, err != nil, "unexpected error: %v", err)
84+
if tc.expectedErr == nil {
85+
require.NoError(t, err)
86+
} else {
87+
require.Equal(t, tc.expectedErr.Error(), err.Error())
88+
}
6289
})
6390
}
6491
}

tempodb/encoding/versioned.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,23 @@ func FromVersion(v string) (VersionedEncoding, error) {
8787
}
8888
}
8989

90+
// FromVersionForWrites returns a versioned encoding for the provided string, but only for
91+
// encodings that are supported for creating new blocks. Deprecated or readonly encodings will return an error.
92+
func FromVersionForWrites(v string) (VersionedEncoding, error) {
93+
switch v {
94+
case v2.VersionString:
95+
return v2.Encoding{}, nil
96+
case vparquet3.VersionString:
97+
return vparquet3.Encoding{}, nil
98+
case vparquet4.VersionString:
99+
return vparquet4.Encoding{}, nil
100+
case vparquet5.VersionString:
101+
return vparquet5.Encoding{}, nil
102+
default:
103+
return nil, fmt.Errorf("%s is not a valid block version for creating blocks", v)
104+
}
105+
}
106+
90107
// DefaultEncoding for newly written blocks.
91108
func DefaultEncoding() VersionedEncoding {
92109
return vparquet4.Encoding{}

0 commit comments

Comments
 (0)