Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
* [ENHANCEMENT] Add local disk caching of metrics queries in local-blocks processor [#3799](https://github.com/grafana/tempo/pull/3799) (@mdisibio)
* [ENHANCEMENT] Improve use of OTEL semantic conventions on the service graph [#3711](https://github.com/grafana/tempo/pull/3711) (@zalegrala)
* [ENHANCEMENT] Performance improvement for `rate() by ()` queries [#3719](https://github.com/grafana/tempo/pull/3719) (@mapno)
* [ENHANCEMENT] Performance improvement for queries using trace-level intrinsics [#3920](https://github.com/grafana/tempo/pull/3920) (@mdisibio)
* [ENHANCEMENT] Use multiple goroutines to unmarshal responses in parallel in the query frontend. [#3713](https://github.com/grafana/tempo/pull/3713) (@joe-elliott)
* [ENHANCEMENT] Protect ingesters from panics by adding defer/recover to all read path methods. [#3790](https://github.com/grafana/tempo/pull/3790) (@joe-elliott)
* [ENHANCEMENT] Added a boolean flag to enable or disable dualstack mode on Storage block config for S3 [#3721](https://github.com/grafana/tempo/pull/3721) (@sid-jar, @mapno)
Expand Down
4 changes: 2 additions & 2 deletions pkg/traceql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@ func (e *Engine) ExecuteSearch(ctx context.Context, searchReq *tempopb.SearchReq
span.SetTag("fetchSpansRequest", fetchSpansRequest)

// calculate search meta conditions.
metaConditions := SearchMetaConditionsWithout(fetchSpansRequest.Conditions)
meta := SearchMetaConditionsWithout(fetchSpansRequest.Conditions, fetchSpansRequest.AllConditions)
fetchSpansRequest.SecondPassConditions = append(fetchSpansRequest.SecondPassConditions, meta...)

spansetsEvaluated := 0
// set up the expression evaluation as a filter to reduce data pulled
fetchSpansRequest.SecondPassConditions = append(fetchSpansRequest.SecondPassConditions, metaConditions...)
fetchSpansRequest.SecondPass = func(inSS *Spanset) ([]*Spanset, error) {
if len(inSS.Spans) == 0 {
return nil, nil
Expand Down
17 changes: 12 additions & 5 deletions pkg/traceql/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,23 @@ func SearchMetaConditions() []Condition {
}
}

func SearchMetaConditionsWithout(remove []Condition) []Condition {
func SearchMetaConditionsWithout(remove []Condition, allConditions bool) []Condition {
metaConds := SearchMetaConditions()
retConds := make([]Condition, 0, len(metaConds))
for _, c := range metaConds {
// if we can't find c in the remove conditions then add it to retConds
found := false
for _, e := range remove {
if e.Attribute == c.Attribute {
found = true
break
for _, r := range remove {
if r.Attribute == c.Attribute {
// We can reuse the existing condition of a metadata field in two cases:
// (1) OpNone, since it has no filtering it will always return a value, there is no need to read it again.
// (2) AllConditions - No matter the operation, it will return a value for all results.
// If neither of those apply then we have to select
// the metadata field again without filtering.
if r.Op == OpNone || allConditions {
found = true
break
}
}
}
if !found {
Expand Down
31 changes: 17 additions & 14 deletions pkg/traceql/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,37 +48,34 @@ func TestSpansetClone(t *testing.T) {
}

func TestMetaConditionsWithout(t *testing.T) {
conditionsFor := func(q string) []Condition {
req, err := ExtractFetchSpansRequest(q)
require.NoError(t, err)

return req.Conditions
}

tcs := []struct {
remove []Condition
query string
expect []Condition
}{
{
remove: []Condition{},
// No meta fields present in query, all are selected.
query: "{ status=error}",
expect: SearchMetaConditions(),
},
{
remove: conditionsFor("{ duration > 1s}"),
// Service name, span name are able to be reused
query: "{ rootServiceName = `foo` && rootName = `bar`}",
expect: []Condition{
{NewIntrinsic(IntrinsicTraceRootService), OpNone, nil},
{NewIntrinsic(IntrinsicTraceRootSpan), OpNone, nil},
{NewIntrinsic(IntrinsicTraceDuration), OpNone, nil},
{NewIntrinsic(IntrinsicTraceID), OpNone, nil},
{NewIntrinsic(IntrinsicTraceStartTime), OpNone, nil},
{NewIntrinsic(IntrinsicSpanID), OpNone, nil},
{NewIntrinsic(IntrinsicSpanStartTime), OpNone, nil},
{NewIntrinsic(IntrinsicDuration), OpNone, nil},
{NewIntrinsic(IntrinsicServiceStats), OpNone, nil},
},
},
{
remove: conditionsFor("{ rootServiceName = `foo` && rootName = `bar`} | avg(duration) > 1s"),
// Duration is the only one able to be reused because it has no filtering
query: "{ rootServiceName = `foo` && rootName = `bar`} | avg(duration) > 1s",
expect: []Condition{
{NewIntrinsic(IntrinsicTraceRootService), OpNone, nil},
{NewIntrinsic(IntrinsicTraceRootSpan), OpNone, nil},
{NewIntrinsic(IntrinsicTraceDuration), OpNone, nil},
{NewIntrinsic(IntrinsicTraceID), OpNone, nil},
{NewIntrinsic(IntrinsicTraceStartTime), OpNone, nil},
Expand All @@ -87,9 +84,15 @@ func TestMetaConditionsWithout(t *testing.T) {
{NewIntrinsic(IntrinsicServiceStats), OpNone, nil},
},
},
{
// None are reused because the values are filtered and allConditions=false
query: "{ rootServiceName = `foo` || rootName = `bar`}",
expect: SearchMetaConditions(),
},
}

for _, tc := range tcs {
require.Equal(t, tc.expect, SearchMetaConditionsWithout(tc.remove))
req, _ := ExtractFetchSpansRequest(tc.query)
require.Equal(t, tc.expect, SearchMetaConditionsWithout(req.Conditions, req.AllConditions))
}
}
68 changes: 31 additions & 37 deletions tempodb/encoding/vparquet2/block_traceql.go
Original file line number Diff line number Diff line change
Expand Up @@ -1561,61 +1561,56 @@ func createResourceIterator(makeIter makeIterFn, spanIterator parquetquery.Itera
}

func createTraceIterator(makeIter makeIterFn, resourceIter parquetquery.Iterator, conds []traceql.Condition, start, end uint64, shardID, shardCount uint32, allConditions bool) (parquetquery.Iterator, error) {
traceIters := make([]parquetquery.Iterator, 0, 3)

var err error
iters := make([]parquetquery.Iterator, 0, 3)

// add conditional iterators first. this way if someone searches for { traceDuration > 1s && span.foo = "bar"} the query will
// be sped up by searching for traceDuration first. note that we can only set the predicates if all conditions is true.
// otherwise we just pass the info up to the engine to make a choice
for _, cond := range conds {
switch cond.Attribute.Intrinsic {
case traceql.IntrinsicTraceID:
var pred parquetquery.Predicate
if allConditions {
pred, err = createBytesPredicate(cond.Op, cond.Operands, false)
if err != nil {
return nil, err
}
pred, err := createBytesPredicate(cond.Op, cond.Operands, false)
if err != nil {
return nil, err
}
traceIters = append(traceIters, makeIter(columnPathTraceID, pred, columnPathTraceID))
iters = append(iters, makeIter(columnPathTraceID, pred, columnPathTraceID))
case traceql.IntrinsicTraceDuration:
var pred parquetquery.Predicate
if allConditions {
pred, err = createIntPredicate(cond.Op, cond.Operands)
if err != nil {
return nil, err
}
pred, err := createIntPredicate(cond.Op, cond.Operands)
if err != nil {
return nil, err
}
traceIters = append(traceIters, makeIter(columnPathDurationNanos, pred, columnPathDurationNanos))
iters = append(iters, makeIter(columnPathDurationNanos, pred, columnPathDurationNanos))
case traceql.IntrinsicTraceStartTime:
if start == 0 && end == 0 {
traceIters = append(traceIters, makeIter(columnPathStartTimeUnixNano, nil, columnPathStartTimeUnixNano))
iters = append(iters, makeIter(columnPathStartTimeUnixNano, nil, columnPathStartTimeUnixNano))
}
case traceql.IntrinsicTraceRootSpan:
var pred parquetquery.Predicate
if allConditions {
pred, err = createStringPredicate(cond.Op, cond.Operands)
if err != nil {
return nil, err
}
pred, err := createStringPredicate(cond.Op, cond.Operands)
if err != nil {
return nil, err
}
traceIters = append(traceIters, makeIter(columnPathRootSpanName, pred, columnPathRootSpanName))
iters = append(iters, makeIter(columnPathRootSpanName, pred, columnPathRootSpanName))
case traceql.IntrinsicTraceRootService:
var pred parquetquery.Predicate
if allConditions {
pred, err = createStringPredicate(cond.Op, cond.Operands)
if err != nil {
return nil, err
}
pred, err := createStringPredicate(cond.Op, cond.Operands)
if err != nil {
return nil, err
}
traceIters = append(traceIters, makeIter(columnPathRootServiceName, pred, columnPathRootServiceName))
iters = append(iters, makeIter(columnPathRootServiceName, pred, columnPathRootServiceName))
}
}

var required []parquetquery.Iterator

// This is an optimization for when all of the conditions must be met.
// We simply move all iterators into the required list.
if allConditions {
required = append(required, iters...)
iters = nil
}

// order is interesting here. would it be more efficient to grab the span/resource conditions first
// or the time range filtering first?
traceIters = append(traceIters, resourceIter)
required = append(required, resourceIter)

// evaluate time range
// Time range filtering?
Expand All @@ -1627,14 +1622,13 @@ func createTraceIterator(makeIter makeIterFn, resourceIter parquetquery.Iterator
startFilter = parquetquery.NewIntBetweenPredicate(0, int64(end))
endFilter = parquetquery.NewIntBetweenPredicate(int64(start), math.MaxInt64)

traceIters = append(traceIters, makeIter(columnPathStartTimeUnixNano, startFilter, columnPathStartTimeUnixNano))
traceIters = append(traceIters, makeIter(columnPathEndTimeUnixNano, endFilter, columnPathEndTimeUnixNano))
required = append(required, makeIter(columnPathStartTimeUnixNano, startFilter, columnPathStartTimeUnixNano))
required = append(required, makeIter(columnPathEndTimeUnixNano, endFilter, columnPathEndTimeUnixNano))
}

// Final trace iterator
// Join iterator means it requires matching resources to have been found
// TraceCollor adds trace-level data to the spansets
return parquetquery.NewJoinIterator(DefinitionLevelTrace, traceIters, newTraceCollector()), nil
return parquetquery.NewLeftJoinIterator(DefinitionLevelTrace, required, iters, newTraceCollector())
}

func createPredicate(op traceql.Operator, operands traceql.Operands) (parquetquery.Predicate, error) {
Expand Down
69 changes: 32 additions & 37 deletions tempodb/encoding/vparquet3/block_traceql.go
Original file line number Diff line number Diff line change
Expand Up @@ -1859,55 +1859,41 @@ func createResourceIterator(makeIter makeIterFn, spanIterator parquetquery.Itera
}

func createTraceIterator(makeIter makeIterFn, resourceIter parquetquery.Iterator, conds []traceql.Condition, start, end uint64, _, _ uint32, allConditions bool, selectAll bool) (parquetquery.Iterator, error) {
traceIters := make([]parquetquery.Iterator, 0, 3)

var err error
iters := make([]parquetquery.Iterator, 0, 3)

// add conditional iterators first. this way if someone searches for { traceDuration > 1s && span.foo = "bar"} the query will
// be sped up by searching for traceDuration first. note that we can only set the predicates if all conditions is true.
// otherwise we just pass the info up to the engine to make a choice
for _, cond := range conds {
switch cond.Attribute.Intrinsic {
case traceql.IntrinsicTraceID:
var pred parquetquery.Predicate
if allConditions {
pred, err = createBytesPredicate(cond.Op, cond.Operands, false)
if err != nil {
return nil, err
}
pred, err := createBytesPredicate(cond.Op, cond.Operands, false)
if err != nil {
return nil, err
}
traceIters = append(traceIters, makeIter(columnPathTraceID, pred, columnPathTraceID))
iters = append(iters, makeIter(columnPathTraceID, pred, columnPathTraceID))
case traceql.IntrinsicTraceDuration:
var pred parquetquery.Predicate
if allConditions {
pred, err = createIntPredicate(cond.Op, cond.Operands)
if err != nil {
return nil, err
}
pred, err := createIntPredicate(cond.Op, cond.Operands)
if err != nil {
return nil, err
}
traceIters = append(traceIters, makeIter(columnPathDurationNanos, pred, columnPathDurationNanos))
iters = append(iters, makeIter(columnPathDurationNanos, pred, columnPathDurationNanos))
case traceql.IntrinsicTraceStartTime:
if start == 0 && end == 0 {
traceIters = append(traceIters, makeIter(columnPathStartTimeUnixNano, nil, columnPathStartTimeUnixNano))
iters = append(iters, makeIter(columnPathStartTimeUnixNano, nil, columnPathStartTimeUnixNano))
}
case traceql.IntrinsicTraceRootSpan:
var pred parquetquery.Predicate
if allConditions {
pred, err = createStringPredicate(cond.Op, cond.Operands)
if err != nil {
return nil, err
}
pred, err := createStringPredicate(cond.Op, cond.Operands)
if err != nil {
return nil, err
}
traceIters = append(traceIters, makeIter(columnPathRootSpanName, pred, columnPathRootSpanName))
iters = append(iters, makeIter(columnPathRootSpanName, pred, columnPathRootSpanName))
case traceql.IntrinsicTraceRootService:
var pred parquetquery.Predicate
if allConditions {
pred, err = createStringPredicate(cond.Op, cond.Operands)
if err != nil {
return nil, err
}
pred, err := createStringPredicate(cond.Op, cond.Operands)
if err != nil {
return nil, err
}
traceIters = append(traceIters, makeIter(columnPathRootServiceName, pred, columnPathRootServiceName))
iters = append(iters, makeIter(columnPathRootServiceName, pred, columnPathRootServiceName))
}
}

Expand All @@ -1922,13 +1908,22 @@ func createTraceIterator(makeIter makeIterFn, resourceIter parquetquery.Iterator
traceql.IntrinsicServiceStats:
continue
}
traceIters = append(traceIters, makeIter(entry.columnPath, nil, entry.columnPath))
iters = append(iters, makeIter(entry.columnPath, nil, entry.columnPath))
}
}

var required []parquetquery.Iterator

// This is an optimization for when all of the conditions must be met.
// We simply move all iterators into the required list.
if allConditions {
required = append(required, iters...)
iters = nil
}

// order is interesting here. would it be more efficient to grab the span/resource conditions first
// or the time range filtering first?
traceIters = append(traceIters, resourceIter)
required = append(required, resourceIter)

// evaluate time range
// Time range filtering?
Expand All @@ -1940,14 +1935,14 @@ func createTraceIterator(makeIter makeIterFn, resourceIter parquetquery.Iterator
startFilter = parquetquery.NewIntBetweenPredicate(0, int64(end))
endFilter = parquetquery.NewIntBetweenPredicate(int64(start), math.MaxInt64)

traceIters = append(traceIters, makeIter(columnPathStartTimeUnixNano, startFilter, columnPathStartTimeUnixNano))
traceIters = append(traceIters, makeIter(columnPathEndTimeUnixNano, endFilter, columnPathEndTimeUnixNano))
required = append(required, makeIter(columnPathStartTimeUnixNano, startFilter, columnPathStartTimeUnixNano))
required = append(required, makeIter(columnPathEndTimeUnixNano, endFilter, columnPathEndTimeUnixNano))
}

// Final trace iterator
// Join iterator means it requires matching resources to have been found
// TraceCollor adds trace-level data to the spansets
return parquetquery.NewJoinIterator(DefinitionLevelTrace, traceIters, newTraceCollector(), parquetquery.WithPool(pqTracePool)), nil
return parquetquery.NewLeftJoinIterator(DefinitionLevelTrace, required, iters, newTraceCollector(), parquetquery.WithPool(pqTracePool))
}

func createPredicate(op traceql.Operator, operands traceql.Operands) (parquetquery.Predicate, error) {
Expand Down
4 changes: 4 additions & 0 deletions tempodb/encoding/vparquet3/block_traceql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,10 @@ func BenchmarkBackendBlockTraceQL(b *testing.B) {
{"resourceAttIntrinsicMatch", "{ resource.service.name = `tempo-gateway` }"},
{"resourceAttIntrinsicMatch", "{ resource.service.name = `does-not-exit-6c2408325a45` }"},

// trace
{"traceOrMatch", "{ rootServiceName = `tempo-gateway` && (status = error || span.http.status_code = 500)}"},
{"traceOrNoMatch", "{ rootServiceName = `doesntexist` && (status = error || span.http.status_code = 500)}"},

// mixed
{"mixedValNoMatch", "{ .bloom = `does-not-exit-6c2408325a45` }"},
{"mixedValMixedMatchAnd", "{ resource.foo = `bar` && name = `gcs.ReadRange` }"},
Expand Down
Loading