Skip to content
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ configurable via the throughput_bytes_slo field, and it will populate op="traces
* [BUGFIX] Reset `SkipMetricsGeneration` before reuse. [#5117](https://github.com/grafana/tempo/pull/5117) (@flxbk)
* [BUGFIX] Fix metrics generator host info processor overrides config. [#5118](https://github.com/grafana/tempo/pull/5118) (@rlankfo)
* [BUGFIX] Fix metrics generator target_info to skip attributes with no name to prevent downstream errors [#5148](https://github.com/grafana/tempo/pull/5148) (@mdisibio)
* [BUGFIX] TraceQL Metrics: right exemplars for histogram and quantiles [#5145](https://github.com/grafana/tempo/pull/5145) (@ruslan-mikhailov)
* [BUGFIX] Fix for queried number of exemplars (TraceQL Metrics) [#5115](https://github.com/grafana/tempo/pull/5115) (@ruslan-mikhailov)

# v2.7.2
Expand Down
116 changes: 113 additions & 3 deletions integration/e2e/api/query_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package api

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -54,7 +53,20 @@ sendLoop:
for {
select {
case <-ticker.C:
require.NoError(t, jaegerClient.EmitBatch(context.Background(), util.MakeThriftBatch()))
require.NoError(t, jaegerClient.EmitBatch(context.Background(),
util.MakeThriftBatchWithSpanCountAttributeAndName(
1, "my operation",
"res_val", "span_val",
"res_attr", "span_attr",
),
))
require.NoError(t, jaegerClient.EmitBatch(context.Background(),
util.MakeThriftBatchWithSpanCountAttributeAndName(
1, "my operation",
"res_val2", "span_val2",
"res_attr", "span_attr",
),
))
case <-timer.C:
break sendLoop
}
Expand All @@ -74,6 +86,12 @@ sendLoop:
"{} | sum_over_time(duration)",
"{} | quantile_over_time(duration, .5)",
"{} | quantile_over_time(duration, .5, 0.9, 0.99)",

"{} | count_over_time() by (span.span_attr)",
"{} | count_over_time() by (resource.res_attr)",
"{} | count_over_time() by (.span_attr)",
"{} | count_over_time() by (.res_attr)",

"{} | histogram_over_time(duration)",
"{} | count_over_time() by (status)",
"{status != error} | count_over_time() by (status)",
Expand All @@ -90,6 +108,96 @@ sendLoop:
})
}

// check exemplars in more detail
for _, testCase := range []struct {
query string
targetAttribute string
targetExemplarAttribute string
}{
{
query: "{} | quantile_over_time(duration, .9) by (span.span_attr)",
targetAttribute: "span.span_attr",
targetExemplarAttribute: "span.span_attr",
},
{
query: "{} | quantile_over_time(duration, .9) by (resource.res_attr)",
targetAttribute: "resource.res_attr",
targetExemplarAttribute: "resource.res_attr",
},
{
query: "{} | quantile_over_time(duration, .9) by (.span_attr)",
targetAttribute: ".span_attr",
targetExemplarAttribute: "span.span_attr",
},
{
query: "{} | quantile_over_time(duration, .9) by (.res_attr)",
targetAttribute: ".res_attr",
targetExemplarAttribute: "resource.res_attr",
},
{
query: "{} | rate() by (span.span_attr)",
targetAttribute: "span.span_attr",
targetExemplarAttribute: "span.span_attr",
},
{
query: "{} | count_over_time() by (span.span_attr)",
targetAttribute: "span.span_attr",
targetExemplarAttribute: "span.span_attr",
},
{
query: "{} | min_over_time(duration) by (span.span_attr)",
targetAttribute: "span.span_attr",
targetExemplarAttribute: "span.span_attr",
},
{
query: "{} | max_over_time(duration) by (span.span_attr)",
targetAttribute: "span.span_attr",
targetExemplarAttribute: "span.span_attr",
},
{
query: "{} | avg_over_time(duration) by (span.span_attr)",
targetAttribute: "span.span_attr",
targetExemplarAttribute: "span.span_attr",
},
{
query: "{} | sum_over_time(duration) by (span.span_attr)",
targetAttribute: "span.span_attr",
targetExemplarAttribute: "span.span_attr",
},
} {
t.Run(testCase.query, func(t *testing.T) {
queryRangeRes := callQueryRange(t, tempo.Endpoint(tempoPort), testCase.query, debugMode)
require.NotNil(t, queryRangeRes)
require.Equal(t, len(queryRangeRes.GetSeries()), 2)

// Verify that all exemplars in this series belongs to the right series
// by matching attribute values
for _, series := range queryRangeRes.Series {
// search attribute value for the series
var expectedAttrValue string
for _, label := range series.Labels {
if label.Key == testCase.targetAttribute {
expectedAttrValue = label.Value.GetStringValue()
break
}
}
require.NotEmpty(t, expectedAttrValue)

// check attribute value in exemplars
for _, exemplar := range series.Exemplars {
var actualAttrValue string
for _, label := range exemplar.Labels {
if label.Key == testCase.targetExemplarAttribute {
actualAttrValue = label.Value.GetStringValue()
break
}
}
require.Equal(t, expectedAttrValue, actualAttrValue)
}
}
})
}

// invalid query
res := doRequest(t, tempo.Endpoint(tempoPort), "{. a}")
require.Equal(t, 400, res.StatusCode)
Expand Down Expand Up @@ -371,7 +479,9 @@ func callQueryRange(t *testing.T, endpoint, query string, printBody bool) tempop
}

queryRangeRes := tempopb.QueryRangeResponse{}
require.NoError(t, json.Unmarshal(body, &queryRangeRes))
readBody := strings.NewReader(string(body))
err = new(jsonpb.Unmarshaler).Unmarshal(readBody, &queryRangeRes)
require.NoError(t, err)
return queryRangeRes
}

Expand Down
13 changes: 6 additions & 7 deletions pkg/traceql/engine_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -1372,16 +1372,16 @@ func (h *Histogram) Record(bucket float64, count int) {
}

type histSeries struct {
labels Labels
hist []Histogram
labels Labels
hist []Histogram
exemplars []Exemplar
}

type HistogramAggregator struct {
ss map[string]histSeries
qs []float64
len int
start, end, step uint64
exemplars []Exemplar
exemplarBuckets *bucketSet
}

Expand All @@ -1408,7 +1408,6 @@ func (h *HistogramAggregator) Combine(in []*tempopb.TimeSeries) {
var bucket Static
for _, l := range ts.Labels {
if l.Key == internalLabelBucket {
// bucket = int(l.Value.GetIntValue())
bucket = StaticFromAnyValue(l.Value)
continue
}
Expand All @@ -1431,7 +1430,6 @@ func (h *HistogramAggregator) Combine(in []*tempopb.TimeSeries) {
labels: withoutBucket,
hist: make([]Histogram, h.len),
}
h.ss[withoutBucketStr] = existing
}

b := bucket.Float()
Expand Down Expand Up @@ -1462,12 +1460,13 @@ func (h *HistogramAggregator) Combine(in []*tempopb.TimeSeries) {
Value: StaticFromAnyValue(l.Value),
})
}
h.exemplars = append(h.exemplars, Exemplar{
existing.exemplars = append(existing.exemplars, Exemplar{
Labels: labels,
Value: exemplar.Value,
TimestampMs: uint64(exemplar.TimestampMs),
})
}
h.ss[withoutBucketStr] = existing
Comment thread
mdisibio marked this conversation as resolved.
}
}

Expand All @@ -1485,7 +1484,7 @@ func (h *HistogramAggregator) Results() SeriesSet {
ts := TimeSeries{
Labels: labels,
Values: make([]float64, len(in.hist)),
Exemplars: h.exemplars,
Exemplars: in.exemplars,
}
for i := range in.hist {

Expand Down
115 changes: 115 additions & 0 deletions pkg/traceql/engine_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/grafana/tempo/pkg/tempopb"
commonv1proto "github.com/grafana/tempo/pkg/tempopb/common/v1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -1883,3 +1884,117 @@ func BenchmarkSumOverTime(b *testing.B) {
_, _, _ = runTraceQLMetric(req, in, in2)
}
}

func BenchmarkHistogramAggregator_Combine(b *testing.B) {
// nolint:gosec // G115
req := &tempopb.QueryRangeRequest{
Start: uint64(time.Now().Add(-1 * time.Hour).UnixNano()),
End: uint64(time.Now().UnixNano()),
Step: uint64(15 * time.Second.Nanoseconds()),
Exemplars: maxExemplars,
}
const seriesCount = 6

benchmarks := []struct {
name string
samplesCount int
exemplarCount int
}{
{"Small", 10, 5},
{"Medium", 100, 20},
{"Large", 1000, 100},
}

for _, bm := range benchmarks {
b.Run(bm.name, func(b *testing.B) {
series := generateTestTimeSeries(seriesCount, bm.samplesCount, bm.exemplarCount, req.Start, req.End)

for b.Loop() {
agg := NewHistogramAggregator(req, []float64{0.5, 0.9, 0.99})
agg.Combine(series)
}
})
}
}

// generateTestTimeSeries creates test time series data for benchmarking
// nolint:gosec // G115
func generateTestTimeSeries(seriesCount, samplesCount, exemplarCount int, start, end uint64) []*tempopb.TimeSeries {
result := make([]*tempopb.TimeSeries, seriesCount)

timeRange := end - start

for i := 0; i < seriesCount; i++ {
// Create unique labels for each series
labels := []commonv1proto.KeyValue{
{
Key: "service",
Value: &commonv1proto.AnyValue{
Value: &commonv1proto.AnyValue_StringValue{
StringValue: "service-" + fmt.Sprintf("%d", i),
},
},
},
{
Key: internalLabelBucket,
Value: &commonv1proto.AnyValue{
Value: &commonv1proto.AnyValue_DoubleValue{
DoubleValue: math.Pow(2, float64(i%20)), // Power of 2 as bucket
},
},
},
}

samples := make([]tempopb.Sample, samplesCount)
for j := 0; j < samplesCount; j++ {
// Distribute samples evenly across the time range
offset := (uint64(j) * timeRange) / uint64(samplesCount)
ts := time.Unix(0, int64(start+offset)).UnixMilli()
samples[j] = tempopb.Sample{
TimestampMs: ts,
Value: float64(j % 100), // Simple pattern for test data
}
}

// Create exemplars
exemplars := make([]tempopb.Exemplar, exemplarCount)
for j := 0; j < exemplarCount; j++ {
// Distribute exemplars evenly across the time range
offset := (uint64(j) * timeRange) / uint64(exemplarCount)
ts := time.Unix(0, int64(start+offset)).UnixMilli()
exemplarLabels := []commonv1proto.KeyValue{
{
Key: "trace_id",
Value: &commonv1proto.AnyValue{
Value: &commonv1proto.AnyValue_StringValue{
StringValue: fmt.Sprintf("trace-%d", i*1000+j),
},
},
},
{
Key: "span_id",
Value: &commonv1proto.AnyValue{
Value: &commonv1proto.AnyValue_StringValue{
StringValue: fmt.Sprintf("span-%d", j),
},
},
},
}
exemplars[j] = tempopb.Exemplar{
Labels: exemplarLabels,

Value: float64(j % 100), // Simple pattern for test data
TimestampMs: ts,
}
}

result[i] = &tempopb.TimeSeries{
PromLabels: fmt.Sprintf("{service=\"service-%d\",bucket=\"%d\"}", i, i%20),
Labels: labels,
Samples: samples,
Exemplars: exemplars,
}
}

return result
}
Loading