Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ configurable via the throughput_bytes_slo field, and it will populate op="traces
* [BUGFIX] Reset `SkipMetricsGeneration` before reuse. [#5117](https://github.com/grafana/tempo/pull/5117) (@flxbk)
* [BUGFIX] Fix metrics generator host info processor overrides config. [#5118](https://github.com/grafana/tempo/pull/5118) (@rlankfo)
* [BUGFIX] Fix metrics generator target_info to skip attributes with no name to prevent downstream errors [#5148](https://github.com/grafana/tempo/pull/5148) (@mdisibio)
* [BUGFIX] TraceQL Metrics: right exemplars for histogram and quantiles [#5145](https://github.com/grafana/tempo/pull/5145) (@ruslan-mikhailov)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't have well defined process with regard to the changelog. ideally we'd show this being in rc.1 but i see that main already has this merged in 2.8.0-rc.0.

for now it's fine to leave this here. it will make my life easier when i fix the changelog in main and in the release branch.

* [BUGFIX] Fix for queried number of exemplars (TraceQL Metrics) [#5115](https://github.com/grafana/tempo/pull/5115) (@ruslan-mikhailov)

# v2.7.2
Expand Down
116 changes: 113 additions & 3 deletions integration/e2e/api/query_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package api

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -54,7 +53,20 @@ sendLoop:
for {
select {
case <-ticker.C:
require.NoError(t, jaegerClient.EmitBatch(context.Background(), util.MakeThriftBatch()))
require.NoError(t, jaegerClient.EmitBatch(context.Background(),
util.MakeThriftBatchWithSpanCountAttributeAndName(
1, "my operation",
"res_val", "span_val",
"res_attr", "span_attr",
),
))
require.NoError(t, jaegerClient.EmitBatch(context.Background(),
util.MakeThriftBatchWithSpanCountAttributeAndName(
1, "my operation",
"res_val2", "span_val2",
"res_attr", "span_attr",
),
))
case <-timer.C:
break sendLoop
}
Expand All @@ -74,6 +86,12 @@ sendLoop:
"{} | sum_over_time(duration)",
"{} | quantile_over_time(duration, .5)",
"{} | quantile_over_time(duration, .5, 0.9, 0.99)",

"{} | count_over_time() by (span.span_attr)",
"{} | count_over_time() by (resource.res_attr)",
"{} | count_over_time() by (.span_attr)",
"{} | count_over_time() by (.res_attr)",

"{} | histogram_over_time(duration)",
"{} | count_over_time() by (status)",
"{status != error} | count_over_time() by (status)",
Expand All @@ -90,6 +108,96 @@ sendLoop:
})
}

// check exemplars in more detail
for _, testCase := range []struct {
query string
targetAttribute string
targetExemplarAttribute string
}{
{
query: "{} | quantile_over_time(duration, .9) by (span.span_attr)",
targetAttribute: "span.span_attr",
targetExemplarAttribute: "span.span_attr",
},
{
query: "{} | quantile_over_time(duration, .9) by (resource.res_attr)",
targetAttribute: "resource.res_attr",
targetExemplarAttribute: "resource.res_attr",
},
{
query: "{} | quantile_over_time(duration, .9) by (.span_attr)",
targetAttribute: ".span_attr",
targetExemplarAttribute: "span.span_attr",
},
{
query: "{} | quantile_over_time(duration, .9) by (.res_attr)",
targetAttribute: ".res_attr",
targetExemplarAttribute: "resource.res_attr",
},
{
query: "{} | rate() by (span.span_attr)",
targetAttribute: "span.span_attr",
targetExemplarAttribute: "span.span_attr",
},
{
query: "{} | count_over_time() by (span.span_attr)",
targetAttribute: "span.span_attr",
targetExemplarAttribute: "span.span_attr",
},
{
query: "{} | min_over_time(duration) by (span.span_attr)",
targetAttribute: "span.span_attr",
targetExemplarAttribute: "span.span_attr",
},
{
query: "{} | max_over_time(duration) by (span.span_attr)",
targetAttribute: "span.span_attr",
targetExemplarAttribute: "span.span_attr",
},
{
query: "{} | avg_over_time(duration) by (span.span_attr)",
targetAttribute: "span.span_attr",
targetExemplarAttribute: "span.span_attr",
},
{
query: "{} | sum_over_time(duration) by (span.span_attr)",
targetAttribute: "span.span_attr",
targetExemplarAttribute: "span.span_attr",
},
} {
t.Run(testCase.query, func(t *testing.T) {
queryRangeRes := callQueryRange(t, tempo.Endpoint(tempoPort), testCase.query, debugMode)
require.NotNil(t, queryRangeRes)
require.Equal(t, len(queryRangeRes.GetSeries()), 2)

// Verify that all exemplars in this series belongs to the right series
// by matching attribute values
for _, series := range queryRangeRes.Series {
// search attribute value for the series
var expectedAttrValue string
for _, label := range series.Labels {
if label.Key == testCase.targetAttribute {
expectedAttrValue = label.Value.GetStringValue()
break
}
}
require.NotEmpty(t, expectedAttrValue)

// check attribute value in exemplars
for _, exemplar := range series.Exemplars {
var actualAttrValue string
for _, label := range exemplar.Labels {
if label.Key == testCase.targetExemplarAttribute {
actualAttrValue = label.Value.GetStringValue()
break
}
}
require.Equal(t, expectedAttrValue, actualAttrValue)
}
}
})
}

// invalid query
res := doRequest(t, tempo.Endpoint(tempoPort), "{. a}")
require.Equal(t, 400, res.StatusCode)
Expand Down Expand Up @@ -371,7 +479,9 @@ func callQueryRange(t *testing.T, endpoint, query string, printBody bool) tempop
}

queryRangeRes := tempopb.QueryRangeResponse{}
require.NoError(t, json.Unmarshal(body, &queryRangeRes))
readBody := strings.NewReader(string(body))
err = new(jsonpb.Unmarshaler).Unmarshal(readBody, &queryRangeRes)
require.NoError(t, err)
return queryRangeRes
}

Expand Down
13 changes: 6 additions & 7 deletions pkg/traceql/engine_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -1372,16 +1372,16 @@ func (h *Histogram) Record(bucket float64, count int) {
}

type histSeries struct {
labels Labels
hist []Histogram
labels Labels
hist []Histogram
exemplars []Exemplar
}

type HistogramAggregator struct {
ss map[string]histSeries
qs []float64
len int
start, end, step uint64
exemplars []Exemplar
exemplarBuckets *bucketSet
}

Expand All @@ -1408,7 +1408,6 @@ func (h *HistogramAggregator) Combine(in []*tempopb.TimeSeries) {
var bucket Static
for _, l := range ts.Labels {
if l.Key == internalLabelBucket {
// bucket = int(l.Value.GetIntValue())
bucket = StaticFromAnyValue(l.Value)
continue
}
Expand All @@ -1431,7 +1430,6 @@ func (h *HistogramAggregator) Combine(in []*tempopb.TimeSeries) {
labels: withoutBucket,
hist: make([]Histogram, h.len),
}
h.ss[withoutBucketStr] = existing
}

b := bucket.Float()
Expand Down Expand Up @@ -1462,12 +1460,13 @@ func (h *HistogramAggregator) Combine(in []*tempopb.TimeSeries) {
Value: StaticFromAnyValue(l.Value),
})
}
h.exemplars = append(h.exemplars, Exemplar{
existing.exemplars = append(existing.exemplars, Exemplar{
Labels: labels,
Value: exemplar.Value,
TimestampMs: uint64(exemplar.TimestampMs),
})
}
h.ss[withoutBucketStr] = existing
}
}

Expand All @@ -1485,7 +1484,7 @@ func (h *HistogramAggregator) Results() SeriesSet {
ts := TimeSeries{
Labels: labels,
Values: make([]float64, len(in.hist)),
Exemplars: h.exemplars,
Exemplars: in.exemplars,
}
for i := range in.hist {

Expand Down
115 changes: 115 additions & 0 deletions pkg/traceql/engine_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/grafana/tempo/pkg/tempopb"
commonv1proto "github.com/grafana/tempo/pkg/tempopb/common/v1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -1883,3 +1884,117 @@ func BenchmarkSumOverTime(b *testing.B) {
_, _, _ = runTraceQLMetric(req, in, in2)
}
}

func BenchmarkHistogramAggregator_Combine(b *testing.B) {
// nolint:gosec // G115
req := &tempopb.QueryRangeRequest{
Start: uint64(time.Now().Add(-1 * time.Hour).UnixNano()),
End: uint64(time.Now().UnixNano()),
Step: uint64(15 * time.Second.Nanoseconds()),
Exemplars: maxExemplars,
}
const seriesCount = 6

benchmarks := []struct {
name string
samplesCount int
exemplarCount int
}{
{"Small", 10, 5},
{"Medium", 100, 20},
{"Large", 1000, 100},
}

for _, bm := range benchmarks {
b.Run(bm.name, func(b *testing.B) {
series := generateTestTimeSeries(seriesCount, bm.samplesCount, bm.exemplarCount, req.Start, req.End)

for b.Loop() {
agg := NewHistogramAggregator(req, []float64{0.5, 0.9, 0.99})
agg.Combine(series)
}
})
}
}

// generateTestTimeSeries creates test time series data for benchmarking
// nolint:gosec // G115
func generateTestTimeSeries(seriesCount, samplesCount, exemplarCount int, start, end uint64) []*tempopb.TimeSeries {
result := make([]*tempopb.TimeSeries, seriesCount)

timeRange := end - start

for i := 0; i < seriesCount; i++ {
// Create unique labels for each series
labels := []commonv1proto.KeyValue{
{
Key: "service",
Value: &commonv1proto.AnyValue{
Value: &commonv1proto.AnyValue_StringValue{
StringValue: "service-" + fmt.Sprintf("%d", i),
},
},
},
{
Key: internalLabelBucket,
Value: &commonv1proto.AnyValue{
Value: &commonv1proto.AnyValue_DoubleValue{
DoubleValue: math.Pow(2, float64(i%20)), // Power of 2 as bucket
},
},
},
}

samples := make([]tempopb.Sample, samplesCount)
for j := 0; j < samplesCount; j++ {
// Distribute samples evenly across the time range
offset := (uint64(j) * timeRange) / uint64(samplesCount)
ts := time.Unix(0, int64(start+offset)).UnixMilli()
samples[j] = tempopb.Sample{
TimestampMs: ts,
Value: float64(j % 100), // Simple pattern for test data
}
}

// Create exemplars
exemplars := make([]tempopb.Exemplar, exemplarCount)
for j := 0; j < exemplarCount; j++ {
// Distribute exemplars evenly across the time range
offset := (uint64(j) * timeRange) / uint64(exemplarCount)
ts := time.Unix(0, int64(start+offset)).UnixMilli()
exemplarLabels := []commonv1proto.KeyValue{
{
Key: "trace_id",
Value: &commonv1proto.AnyValue{
Value: &commonv1proto.AnyValue_StringValue{
StringValue: fmt.Sprintf("trace-%d", i*1000+j),
},
},
},
{
Key: "span_id",
Value: &commonv1proto.AnyValue{
Value: &commonv1proto.AnyValue_StringValue{
StringValue: fmt.Sprintf("span-%d", j),
},
},
},
}
exemplars[j] = tempopb.Exemplar{
Labels: exemplarLabels,

Value: float64(j % 100), // Simple pattern for test data
TimestampMs: ts,
}
}

result[i] = &tempopb.TimeSeries{
PromLabels: fmt.Sprintf("{service=\"service-%d\",bucket=\"%d\"}", i, i%20),
Labels: labels,
Samples: samples,
Exemplars: exemplars,
}
}

return result
}
Loading