Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
* [BUGFIX] Pushes a 0 to classic histogram's counter when the series is new to allow Prometheus to start from a non-null value. [#4140](https://github.com/grafana/tempo/pull/4140) (@mapno)
* [BUGFIX] Fix counter samples being downsampled by backdate to the previous minute the initial sample when the series is new [#4236](https://github.com/grafana/tempo/pull/4236) (@javiermolinar)
* [BUGFIX] Fix traceql metrics time range handling at the cutoff between recent and backend data [#4257](https://github.com/grafana/tempo/issues/4257) (@mdisibio)
* [BUGFIX] Fix several issues with exemplar values for traceql metrics [#4366](https://github.com/grafana/tempo/pull/4366) (@mdisibio)
* [BUGFIX] Skip computing exemplars for instant queries. [#4204](https://github.com/grafana/tempo/pull/4204) (@javiermolinar)
* [BUGFIX] Gave context to orphaned spans related to various maintenance processes. [#4260](https://github.com/grafana/tempo/pull/4260) (@joe-elliott)
* [BUGFIX] Utilize S3Pass and S3User parameters in tempo-cli options, which were previously unused in the code. [#4259](https://github.com/grafana/tempo/pull/4259) (@faridtmammadov)
Expand Down
62 changes: 37 additions & 25 deletions pkg/traceql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -1106,33 +1106,23 @@ func (a *MetricsAggregate) init(q *tempopb.QueryRangeRequest, mode AggregateMode
var innerAgg func() VectorAggregator
var byFunc func(Span) (Static, bool)
var byFuncLabel string
var exemplarFn getExemplar

switch a.op {
case metricsAggregateCountOverTime:
innerAgg = func() VectorAggregator { return NewCountOverTimeAggregator() }
a.simpleAggregationOp = sumAggregation
exemplarFn = func(s Span) (float64, uint64) {
return math.NaN(), a.spanStartTimeMs(s)
}

case metricsAggregateMinOverTime:
innerAgg = func() VectorAggregator { return NewOverTimeAggregator(a.attr, minAggregation) }
a.simpleAggregationOp = minAggregation
exemplarFn = func(s Span) (float64, uint64) {
return math.NaN(), a.spanStartTimeMs(s)
}

case metricsAggregateMaxOverTime:
innerAgg = func() VectorAggregator { return NewOverTimeAggregator(a.attr, maxAggregation) }
a.simpleAggregationOp = maxAggregation
exemplarFn = func(s Span) (float64, uint64) {
return math.NaN(), a.spanStartTimeMs(s)
}

case metricsAggregateRate:
innerAgg = func() VectorAggregator { return NewRateAggregator(1.0 / time.Duration(q.Step).Seconds()) }
a.simpleAggregationOp = sumAggregation
exemplarFn = func(s Span) (float64, uint64) {
return math.NaN(), a.spanStartTimeMs(s)
}

case metricsAggregateHistogramOverTime, metricsAggregateQuantileOverTime:
// Histograms and quantiles are implemented as count_over_time() by(2^log2(attr)) for now
Expand All @@ -1144,16 +1134,9 @@ func (a *MetricsAggregate) init(q *tempopb.QueryRangeRequest, mode AggregateMode
case IntrinsicDurationAttribute:
// Optimal implementation for duration attribute
byFunc = a.bucketizeSpanDuration
exemplarFn = func(s Span) (float64, uint64) {
return float64(s.DurationNanos()), a.spanStartTimeMs(s)
}
default:
// Basic implementation for all other attributes
byFunc = a.bucketizeAttribute
exemplarFn = func(s Span) (float64, uint64) {
v, _ := FloatizeAttribute(s, a.attr)
return v, a.spanStartTimeMs(s)
}
}
}

Expand All @@ -1170,11 +1153,7 @@ func (a *MetricsAggregate) init(q *tempopb.QueryRangeRequest, mode AggregateMode
a.agg = NewGroupingAggregator(a.op.String(), func() RangeAggregator {
return NewStepAggregator(q.Start, q.End, q.Step, innerAgg)
}, a.by, byFunc, byFuncLabel)
a.exemplarFn = exemplarFn
}

func (a *MetricsAggregate) spanStartTimeMs(s Span) uint64 {
return s.StartTimeUnixNanos() / uint64(time.Millisecond)
a.exemplarFn = exemplarFnFor(a.attr)
}

func (a *MetricsAggregate) bucketizeSpanDuration(s Span) (Static, bool) {
Expand Down Expand Up @@ -1209,6 +1188,39 @@ func (a *MetricsAggregate) bucketizeAttribute(s Span) (Static, bool) {
}
}

func exemplarFnFor(a Attribute) func(Span) (float64, uint64) {
switch a {
case IntrinsicDurationAttribute:
return exemplarDuration
case Attribute{}:
// This records exemplars without a value, and they
// are attached to the series at the end.
return exemplarNaN
default:
return exemplarAttribute(a)
}
}

func exemplarNaN(s Span) (float64, uint64) {
return math.NaN(), s.StartTimeUnixNanos() / uint64(time.Millisecond)
}

func exemplarDuration(s Span) (float64, uint64) {
v := float64(s.DurationNanos()) / float64(time.Second)
t := s.StartTimeUnixNanos() / uint64(time.Millisecond)
return v, t
}

// exemplarAttribute captures a closure around the attribute so it doesn't have to be passed along with every span.
// should be more efficient.
func exemplarAttribute(a Attribute) func(Span) (float64, uint64) {
return func(s Span) (float64, uint64) {
v, _ := FloatizeAttribute(s, a)
t := s.StartTimeUnixNanos() / uint64(time.Millisecond)
return v, t
}
}

func (a *MetricsAggregate) initSum(q *tempopb.QueryRangeRequest) {
// Currently all metrics are summed by job to produce
// intermediate results. This will change when adding min/max/topk/etc
Expand Down
19 changes: 13 additions & 6 deletions pkg/traceql/engine_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"math"
"math/rand"
"sort"
"sync"
"time"
Expand Down Expand Up @@ -1053,7 +1054,7 @@ func (e *MetricsEvalulator) Do(ctx context.Context, f SpansetFetcher, fetcherSta
}

if len(ss.Spans) > 0 && e.sampleExemplar(ss.TraceID) {
e.metricsPipeline.observeExemplar(ss.Spans[0]) // Randomly sample the first span
e.metricsPipeline.observeExemplar(ss.Spans[rand.Intn(len(ss.Spans))])
}

e.mtx.Unlock()
Expand Down Expand Up @@ -1232,19 +1233,25 @@ func (b *SimpleAggregator) aggregateExemplars(ts *tempopb.TimeSeries, existing *
Value: StaticFromAnyValue(l.Value),
})
}
value := exemplar.Value
if math.IsNaN(value) {
value = 0 // TODO: Use the value of the series at the same timestamp
}
existing.Exemplars = append(existing.Exemplars, Exemplar{
Labels: labels,
Value: value,
Value: exemplar.Value,
TimestampMs: uint64(exemplar.TimestampMs),
})
}
}

func (b *SimpleAggregator) Results() SeriesSet {
// Attach placeholder exemplars to the output
for _, ts := range b.ss {
for i, e := range ts.Exemplars {
if math.IsNaN(e.Value) {
interval := IntervalOfMs(int64(e.TimestampMs), b.start, b.end, b.step)
ts.Exemplars[i].Value = ts.Values[interval]
}
}
}

return b.ss
}

Expand Down
19 changes: 6 additions & 13 deletions pkg/traceql/engine_metrics_average.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,8 @@ func newAverageOverTimeMetricsAggregator(attr Attribute, by []Attribute) *averag
}

func (a *averageOverTimeAggregator) init(q *tempopb.QueryRangeRequest, mode AggregateMode) {
exemplarFn := func(s Span) (float64, uint64) {
return math.NaN(), a.spanStartTimeMs(s)
}

a.seriesAgg = &averageOverTimeSeriesAggregator{
weightedAverageSeries: make(map[string]averageSeries),
weightedAverageSeries: make(map[string]*averageSeries),
len: IntervalCount(q.Start, q.End, q.Step),
start: q.Start,
end: q.End,
Expand All @@ -50,8 +46,8 @@ func (a *averageOverTimeAggregator) init(q *tempopb.QueryRangeRequest, mode Aggr
a.agg = newAvgOverTimeSpanAggregator(a.attr, a.by, q.Start, q.End, q.Step)
}

a.exemplarFn = exemplarFn
a.mode = mode
a.exemplarFn = exemplarFnFor(a.attr)
}

func (a *averageOverTimeAggregator) observe(span Span) {
Expand Down Expand Up @@ -110,10 +106,6 @@ func (a *averageOverTimeAggregator) validate() error {
return nil
}

func (a *averageOverTimeAggregator) spanStartTimeMs(s Span) uint64 {
return s.StartTimeUnixNanos() / uint64(time.Millisecond)
}

func (a *averageOverTimeAggregator) String() string {
s := strings.Builder{}

Expand All @@ -138,7 +130,7 @@ func (a *averageOverTimeAggregator) String() string {
}

type averageOverTimeSeriesAggregator struct {
weightedAverageSeries map[string]averageSeries
weightedAverageSeries map[string]*averageSeries
len int
start, end, step uint64
exemplarBuckets *bucketSet
Expand Down Expand Up @@ -279,7 +271,8 @@ func (b *averageOverTimeSeriesAggregator) Combine(in []*tempopb.TimeSeries) {
countPosMapper[avgSeriesPromLabel] = i
} else if !ok {
promLabels := getLabels(ts.Labels, "")
b.weightedAverageSeries[ts.PromLabels] = newAverageSeries(b.len, len(ts.Exemplars), promLabels)
s := newAverageSeries(b.len, len(ts.Exemplars), promLabels)
b.weightedAverageSeries[ts.PromLabels] = &s
}
}
for _, ts := range in {
Expand All @@ -302,7 +295,7 @@ func (b *averageOverTimeSeriesAggregator) Combine(in []*tempopb.TimeSeries) {
}
}

func (b *averageOverTimeSeriesAggregator) aggregateExemplars(ts *tempopb.TimeSeries, existing averageSeries) {
func (b *averageOverTimeSeriesAggregator) aggregateExemplars(ts *tempopb.TimeSeries, existing *averageSeries) {
for _, exemplar := range ts.Exemplars {
if b.exemplarBuckets.testTotal() {
break
Expand Down