Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ Additionally the `compaction_tenant_backoff_total` metric has been renamed to `c
* [FEATURE] Add histograms `spans_distance_in_future_seconds` / `spans_distance_in_past_seconds` that count spans with end timestamp in the future / past. While spans in the future are accepted, they are invalid and may not be found using the Search API. [#4936](https://github.com/grafana/tempo/pull/4936) (@carles-grafana)
* [FEATURE] Add MCP Server support. [#5212](https://github.com/grafana/tempo/pull/5212) (@joe-elliott)
* [FEATURE] Add counter `query_frontend_bytes_inspected_total`, which shows the total number of bytes read from disk and object storage [#5310](https://github.com/grafana/tempo/pull/5310) (@carles-grafana)
* [FEATURE] Add query hints sample=true and sample=0.xx which can speed up TraceQL metrics queries by sampling a subset of the data to provide an approximate result. [#5469](https://github.com/grafana/tempo/pull/5469) (@mdisibio)
* [ENHANCEMENT] Include backendwork dashboard and include additional alert [#5159](https://github.com/grafana/tempo/pull/5159) (@zalegrala)
* [BUGFIX] fix tempo configuration options that are always overrided with config overrides section [#5202](https://github.com/grafana/tempo/pull/5202) (@KyriosGN0)
* [ENHANCEMENT] Add endpoint for partition downscaling [#4913](https://github.com/grafana/tempo/pull/4913) (@mapno)
Expand Down
51 changes: 51 additions & 0 deletions pkg/parquetquery/iters.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,12 @@ func (t RowNumber) Preceding() RowNumber {
return t
}

// Releaser can be implemented by types stored in OtherEntries. If the entry
// implements Releaser, iterators will call it when discarding unused values when Seeking.
type Releaser interface {
Release()
}

// IteratorResult is a row of data with a row number and named columns of data.
// Internally it has an unstructured list for efficient collection. The ToMap()
// function can be used to make inspection easier.
Expand All @@ -189,6 +195,14 @@ func (r *IteratorResult) Reset() {
r.OtherEntries = r.OtherEntries[:0]
}

func (r *IteratorResult) Release() {
for _, e := range r.OtherEntries {
if releaser, ok := e.Value.(Releaser); ok {
releaser.Release()
}
}
}

func (r *IteratorResult) Append(rr *IteratorResult) {
if len(rr.Entries) > 0 {
r.Entries = append(r.Entries, rr.Entries...)
Expand Down Expand Up @@ -971,6 +985,14 @@ func (j *JoinIterator) seek(iterNum int, t RowNumber, d int) error {
var err error
t = TruncateRowNumber(d, t)
if j.peeks[iterNum] == nil || CompareRowNumbers(d, j.peeks[iterNum].RowNumber, t) == -1 {

// Release peek if present
// These results have been collected but never returned upstream,
// so we know it is safe to release them.
if j.peeks[iterNum] != nil {
j.peeks[iterNum].Release()
}

j.peeks[iterNum], err = j.iters[iterNum].SeekTo(t, d)
if err != nil {
return err
Expand All @@ -984,6 +1006,13 @@ func (j *JoinIterator) seekAll(t RowNumber, d int) error {
t = TruncateRowNumber(d, t)
for iterNum, iter := range j.iters {
if j.peeks[iterNum] == nil || CompareRowNumbers(d, j.peeks[iterNum].RowNumber, t) == -1 {
// Release peek if present
// These results have been collected but never returned upstream,
// so we know it is safe to release them.
if j.peeks[iterNum] != nil {
j.peeks[iterNum].Release()
}

j.peeks[iterNum], err = iter.SeekTo(t, d)
if err != nil {
return err
Expand Down Expand Up @@ -1171,6 +1200,13 @@ func (j *LeftJoinIterator) SeekTo(t RowNumber, d int) (*IteratorResult, error) {

func (j *LeftJoinIterator) seek(iterNum int, t RowNumber, d int) (err error) {
if j.peeksRequired[iterNum] == nil || CompareRowNumbers(d, j.peeksRequired[iterNum].RowNumber, t) == -1 {
// Release peek if present
// These results have been collected but never returned upstream,
// so we know it is safe to release them.
if j.peeksRequired[iterNum] != nil {
Comment thread
mdisibio marked this conversation as resolved.
j.peeksRequired[iterNum].Release()
}

j.peeksRequired[iterNum], err = j.required[iterNum].SeekTo(t, d)
if err != nil {
return
Expand All @@ -1182,6 +1218,14 @@ func (j *LeftJoinIterator) seek(iterNum int, t RowNumber, d int) (err error) {
func (j *LeftJoinIterator) seekAllRequired(t RowNumber, d int) (done bool, err error) {
for iterNum, iter := range j.required {
if j.peeksRequired[iterNum] == nil || CompareRowNumbers(d, j.peeksRequired[iterNum].RowNumber, t) == -1 {

// Release peek if present
// These results have been collected but never returned upstream,
// so we know it is safe to release them.
if j.peeksRequired[iterNum] != nil {
j.peeksRequired[iterNum].Release()
}

j.peeksRequired[iterNum], err = iter.SeekTo(t, d)
if err != nil {
return
Expand Down Expand Up @@ -1363,6 +1407,13 @@ func (u *UnionIterator) SeekTo(t RowNumber, d int) (*IteratorResult, error) {
t = TruncateRowNumber(d, t)
for iterNum, iter := range u.iters {
if p := u.peeks[iterNum]; p == nil || CompareRowNumbers(d, p.RowNumber, t) == -1 {
// Release peek if present
// These results have been collected but never returned upstream,
// so we know it is safe to release them.
if p != nil {
p.Release()
}

u.peeks[iterNum], err = iter.SeekTo(t, d)
if err != nil {
return nil, fmt.Errorf("union iterator seek to failed: %w", err)
Expand Down
35 changes: 35 additions & 0 deletions pkg/traceql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,41 @@ type RootExpr struct {
Hints *Hints
}

func NeedsFullTrace(e ...Element) bool {
Comment thread
mdisibio marked this conversation as resolved.
for _, e := range e {
switch x := e.(type) {
case Pipeline:
// Sub-pipelines: Example: ({} | count()) = ({} | count())
for _, e := range x.Elements {
if NeedsFullTrace(e) {
return true
}
}
case SpansetOperation:
// Example: {} >> {}
return true
case Aggregate:
// Example: {} | count() > 123
return true
case ScalarFilter:
if NeedsFullTrace(x.lhs, x.rhs) {
return true
}
}
}

return false
}

func (r *RootExpr) NeedsFullTrace() bool {
for _, e := range r.Pipeline.Elements {
if NeedsFullTrace(e) {
return true
}
}
return false
}

func newRootExpr(e pipelineElement) *RootExpr {
p, ok := e.(Pipeline)
if !ok {
Expand Down
21 changes: 18 additions & 3 deletions pkg/traceql/ast_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type firstStageElement interface {
observe(Span) // TODO - batching?
observeExemplar(Span)
observeSeries([]*tempopb.TimeSeries) // Re-entrant metrics on the query-frontend. Using proto version for efficiency
result() SeriesSet
result(multiplier float64) SeriesSet
length() int
}

Expand Down Expand Up @@ -266,9 +266,24 @@ func (a *MetricsAggregate) observeSeries(ss []*tempopb.TimeSeries) {
a.seriesAgg.Combine(ss)
}

func (a *MetricsAggregate) result() SeriesSet {
func (a *MetricsAggregate) result(multiplier float64) SeriesSet {
if a.agg != nil {
return a.agg.Series()
ss := a.agg.Series()

// These operations don't get scaled by the multiplier.
switch a.op {
case metricsAggregateMinOverTime, metricsAggregateMaxOverTime:
Comment thread
joe-elliott marked this conversation as resolved.
return ss
}

if multiplier > 1.0 {
for _, s := range ss {
for i := range s.Values {
s.Values[i] *= multiplier
}
}
}
return ss
}

// In the frontend-version the results come from
Expand Down
30 changes: 30 additions & 0 deletions pkg/traceql/ast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1070,3 +1070,33 @@ func testName(val any) string {
return fmt.Sprintf("%v", v)
}
}

func TestNeedsFullTrace(t *testing.T) {
dontNeedFullTrace := []string{
"{}",
"{} | rate()",
}

needFullTrace := []string{
"{} >> {}",
"{} && {}",
"{} | count() > 100",
"({} | count()) = ({} | count())",
}

for _, test := range dontNeedFullTrace {
t.Run(test, func(t *testing.T) {
expr, err := Parse(test)
require.NoError(t, err)
require.Equal(t, false, expr.NeedsFullTrace())
})
}

for _, test := range needFullTrace {
t.Run(test, func(t *testing.T) {
expr, err := Parse(test)
require.NoError(t, err)
require.Equal(t, true, expr.NeedsFullTrace())
})
}
}
69 changes: 67 additions & 2 deletions pkg/traceql/engine_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -922,6 +922,45 @@ func (e *Engine) CompileMetricsQueryRange(req *tempopb.QueryRangeRequest, exempl
exemplars = v
}

// Debug sampling hints, remove once we settle on approach.
Comment thread
joe-elliott marked this conversation as resolved.
if traceSample, traceSampleOk := expr.Hints.GetFloat(HintTraceSample, allowUnsafeQueryHints); traceSampleOk {
storageReq.TraceSampler = newProbablisticSampler(traceSample)
}
if spanSample, spanSampleOk := expr.Hints.GetFloat(HintSpanSample, allowUnsafeQueryHints); spanSampleOk {
storageReq.SpanSampler = newProbablisticSampler(spanSample)
}

if sample, sampleOk := expr.Hints.GetBool(HintSample, allowUnsafeQueryHints); sampleOk && sample {
// Automatic sampling
// Get other params
s := newAdaptiveSampler()
if debug, ok := expr.Hints.GetBool(HintDebug, allowUnsafeQueryHints); ok {
s.debug = debug
}
if info, ok := expr.Hints.GetBool(HintInfo, allowUnsafeQueryHints); ok {
s.info = info
}

// Classify the query and determine if it needs to be at the trace-level or can be at span-level (better)
if expr.NeedsFullTrace() {
storageReq.TraceSampler = s
} else {
storageReq.SpanSampler = s
}
}

if sampleFraction, ok := expr.Hints.GetFloat(HintSample, allowUnsafeQueryHints); ok && sampleFraction > 0 && sampleFraction < 1 {
// Fixed sampling rate.
s := newProbablisticSampler(sampleFraction)

// Classify the query and determine if it needs to be at the trace-level or can be at span-level (better)
if expr.NeedsFullTrace() {
storageReq.TraceSampler = s
} else {
storageReq.SpanSampler = s
}
}

// This initializes all step buffers, counters, etc
metricsPipeline.init(req, AggregateModeRaw)

Expand Down Expand Up @@ -1148,19 +1187,28 @@ func (e *MetricsEvaluator) Do(ctx context.Context, f SpansetFetcher, fetcherStar

e.mtx.Lock()

if e.storageReq.TraceSampler != nil {
e.storageReq.TraceSampler.Measured()
}

var validSpansCount int
var randomSpanIndex int

needExemplar := e.maxExemplars > 0 && e.sampleExemplar(ss.TraceID)

for i, s := range ss.Spans {

if e.checkTime {
st := s.StartTimeUnixNanos()
if st <= e.start || st > e.end {
continue
}
}

if e.storageReq.SpanSampler != nil {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does calling Measured() after checking the start/end time impact the final results? does this mean that blocks that barely overlap the time range will be downsampled less? so the edges of the graph will have more accurate results?

should we call measured before we filter due to time range?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, this was something I tried both ways. Calling Measured() after the check is more accurate, because it's capturing the true rate of data being found. The edges of a block are where this most often occurs. It prevents the false signal of data looking more common than it really is. So this way the sampling probability stays high (starts at 100%), until there are enough spans passing the time check to make a real determination.

e.storageReq.SpanSampler.Measured()
}

validSpansCount++
e.metricsPipeline.observe(s)

Expand Down Expand Up @@ -1209,12 +1257,28 @@ func (e *MetricsEvaluator) Metrics() (uint64, uint64, uint64) {
}

func (e *MetricsEvaluator) Results() SeriesSet {
e.mtx.Lock()
defer e.mtx.Unlock()

spanMultiplier := 1.0
if e.storageReq.SpanSampler != nil {
spanMultiplier = e.storageReq.SpanSampler.FinalScalingFactor()
}
traceMultiplier := 1.0
if e.storageReq.TraceSampler != nil {
traceMultiplier = e.storageReq.TraceSampler.FinalScalingFactor()
}

multiplier := spanMultiplier * traceMultiplier

// NOTE: skip processing of second stage because not all first stage functions can't be pushed down.
// for example: if query has avg_over_time(), then we can't push it down to second stage, and second stage
// can only be processed on the frontend.
// we could do this but it would require knowing if the first stage functions
// can be pushed down to second stage or not so we are skipping it for now, and will handle it later.
return e.metricsPipeline.result()
ss := e.metricsPipeline.result(multiplier)

return ss
}

func (e *MetricsEvaluator) sampleExemplar(id []byte) bool {
Expand Down Expand Up @@ -1255,7 +1319,8 @@ func (m *MetricsFrontendEvaluator) Results() SeriesSet {
m.mtx.Lock()
defer m.mtx.Unlock()

results := m.metricsPipeline.result()
// Job results are not scaled by sampling, but this is here for the interface.
results := m.metricsPipeline.result(1.0)

if m.metricsSecondStage != nil {
// metrics second stage is only set when query has second stage function and mode = final
Expand Down
27 changes: 25 additions & 2 deletions pkg/traceql/engine_metrics_average.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,31 @@ func (a *averageOverTimeAggregator) observeSeries(ss []*tempopb.TimeSeries) {
a.seriesAgg.Combine(ss)
}

func (a *averageOverTimeAggregator) result() SeriesSet {
func (a *averageOverTimeAggregator) result(multiplier float64) SeriesSet {
if a.agg != nil {
return a.agg.Series()
ss := a.agg.Series()
if multiplier > 1.0 {
countLabel := NewStaticString(internalMetaTypeCount)
for _, s := range ss {
// Skip non-count series.
found := false
for _, l := range s.Labels {
if l.Name == internalLabelMetaType && l.Value.Equals(&countLabel) {
Comment thread
joe-elliott marked this conversation as resolved.
found = true
break
}
}
if !found {
continue
}

// Found a count series, scale the values by the multiplier.
for i := range s.Values {
s.Values[i] *= multiplier
}
}
}
return ss
}

// In the frontend-version the results come from
Expand Down Expand Up @@ -511,6 +533,7 @@ func (g *avgOverTimeSpanAggregator[F, S]) Series() SeriesSet {
s.average.labels = labels
// Average series
averageSeries := s.average.getAvgSeries()

// Count series
countSeries := s.average.getCountSeries()

Expand Down
Loading
Loading