Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
* [BUGFIX] fix: skip per-label limiter and sanitizer for target_info and host_info metrics in metrics-generator [#6660](https://github.com/grafana/tempo/pull/6660) (@electron0zero)
* [BUGFIX] fix(traceql): err on division by zero [#6580](https://github.com/grafana/tempo/pull/6580) (@Proximyst)
* [BUGFIX] Return 400 instead of 500 when query_range or query_instant requests have unparseable start/end parameters [#6694](https://github.com/grafana/tempo/pull/6694) (@ruslan-mikhailov)
* [BUGFIX] fix: correct block-builder fetch metrics to use counters instead of gauges.

### 3.0 Cleanup

Expand Down
2 changes: 2 additions & 0 deletions integration/operations/operational_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ func TestWriteMetrics(t *testing.T) {
blockbuilder := h.Services[util.ServiceBlockBuilder]
assertMetricEquals(t, blockbuilder, "tempo_block_builder_flushed_blocks", float64(1), nil)
assertMetricEquals(t, blockbuilder, "tempo_block_builder_owned_partitions", float64(1), nil)
assertMetricGreater(t, blockbuilder, "tempo_block_builder_fetch_records_total", float64(0), nil)
assertMetricGreater(t, blockbuilder, "tempo_block_builder_fetch_bytes_total", float64(0), nil)
})
}

Expand Down
4 changes: 2 additions & 2 deletions modules/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ var (
Help: "Time spent fetching from Kafka.",
NativeHistogramBucketFactor: 1.1,
}, []string{"partition"})
metricFetchBytesTotal = promauto.NewGaugeVec(prometheus.GaugeOpts{
metricFetchBytesTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "tempo",
Subsystem: "block_builder",
Name: "fetch_bytes_total",
Help: "Total number of bytes fetched from Kafka",
}, []string{"partition"})
metricFetchRecordsTotal = promauto.NewGaugeVec(prometheus.GaugeOpts{
metricFetchRecordsTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "tempo",
Subsystem: "block_builder",
Name: "fetch_records_total",
Expand Down
42 changes: 42 additions & 0 deletions modules/blockbuilder/blockbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,48 @@ func TestBlockbuilder_getAssignedPartitions(t *testing.T) {
assert.Equal(t, []int32{0, 2}, partitions)
}

func TestBlockbuilder_fetchMetricsIncrement(t *testing.T) {
ctx, cancel := context.WithCancelCause(context.Background())
t.Cleanup(func() { cancel(errors.New("test done")) })

k, address := testkafka.CreateCluster(t, 1, testTopic)
kafkaCommits := atomic.NewInt32(0)
k.ControlKey(kmsg.OffsetCommit, func(kmsg.Request) (kmsg.Response, error, bool) {
kafkaCommits.Inc()
return nil, nil, false
})

store := newStore(ctx, t)
cfg := blockbuilderConfig(t, address, []int32{0})

recordsStart, err := test.GetCounterVecValue(metricFetchRecordsTotal, "0")
require.NoError(t, err)
bytesStart, err := test.GetCounterVecValue(metricFetchBytesTotal, "0")
require.NoError(t, err)

b, err := New(cfg, testLogger(t), newPartitionRingReader(), &mockOverrides{}, store)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(ctx, b))
t.Cleanup(func() {
require.NoError(t, services.StopAndAwaitTerminated(ctx, b))
})

client := testkafka.NewKafkaClient(t, cfg.IngestStorageConfig.Kafka.Address, cfg.IngestStorageConfig.Kafka.Topic)
testkafka.SendReq(ctx, t, client, ingest.Encode, util.FakeTenantID)

require.Eventually(t, func() bool {
return kafkaCommits.Load() > 0
}, time.Minute, time.Second)

recordsEnd, err := test.GetCounterVecValue(metricFetchRecordsTotal, "0")
require.NoError(t, err)
require.Greater(t, recordsEnd-recordsStart, float64(0))

bytesEnd, err := test.GetCounterVecValue(metricFetchBytesTotal, "0")
require.NoError(t, err)
require.Greater(t, bytesEnd-bytesStart, float64(0))
}

// Starting with a pre-existing commit,
// the block-builder resumes from the last known position, consuming new records,
// and ensures all of them are properly committed and flushed into blocks.
Expand Down
Loading