Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* [ENHANCEMENT] Align traceql attribute struct for better performance [#5240](https://github.com/grafana/tempo/pull/5240) (@mdisibio)
* [BUGFIX] Add nil check to partitionAssignmentVar [#5198](https://github.com/grafana/tempo/pull/5198) (@mapno)
* [BUGFIX] Fix ingester issue where a hash collision could lead to spans stored incorrectly [#5276](https://github.com/grafana/tempo/pull/5276) (@carles-grafana)
* [BUGFIX] Correct instant query calculation [#5252](https://github.com/grafana/tempo/pull/5252) (@ruslan-mikhailov)

# v2.8.0

Expand Down
254 changes: 231 additions & 23 deletions integration/e2e/api/query_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import (
"context"
"fmt"
"io"
"math"
"net/http"
"net/url"
"strings"
"testing"
"time"

"github.com/gogo/protobuf/jsonpb"
"github.com/google/uuid"

"github.com/grafana/e2e"
"github.com/grafana/tempo/integration/util"
Expand All @@ -26,6 +28,29 @@ const (
configQueryRangeMaxSeriesDisabledQuerier = "config-query-range-max-series-disabled-querier.yaml"
)

type queryRangeRequest struct {
Query string `json:"query"`
Start time.Time `json:"start"` // default: now - 5m
End time.Time `json:"end"` // default: now + 1m
Step string `json:"step"` // default: 5s
Exemplars int `json:"exemplars"` // default: 100
}

func (r *queryRangeRequest) SetDefaults() {
if r.Start.IsZero() {
r.Start = time.Now().Add(-5 * time.Minute)
}
if r.End.IsZero() {
r.End = time.Now().Add(time.Minute)
}
if r.Step == "" {
r.Step = "5s"
}
if r.Exemplars == 0 {
r.Exemplars = 100
}
}

// Set debugMode to true to print the response body
var debugMode = false

Expand Down Expand Up @@ -67,6 +92,13 @@ sendLoop:
"res_attr", "span_attr",
),
))
require.NoError(t, jaegerClient.EmitBatch(context.Background(),
util.MakeThriftBatchWithSpanCountAttributeAndName(
1, "operation with high cardinality",
uuid.New().String(), uuid.New().String(),
"res_high_cardinality", "span_high_cardinality",
),
))
case <-timer.C:
break sendLoop
}
Expand Down Expand Up @@ -123,7 +155,11 @@ sendLoop:
"{status != error} | count_over_time() by (status)",
} {
t.Run(fmt.Sprintf("%s: %s", exeplarsCase.name, query), func(t *testing.T) {
queryRangeRes := callQueryRange(t, tempo.Endpoint(tempoPort), query, exeplarsCase.exemplars, debugMode)
req := queryRangeRequest{
Query: query,
Exemplars: exeplarsCase.exemplars,
}
queryRangeRes := callQueryRange(t, tempo.Endpoint(tempoPort), req)
require.NotNil(t, queryRangeRes)
require.GreaterOrEqual(t, len(queryRangeRes.GetSeries()), 1)
if query == "{} | quantile_over_time(duration, .5, 0.9, 0.99)" {
Expand Down Expand Up @@ -200,12 +236,17 @@ sendLoop:
},
} {
t.Run(testCase.query, func(t *testing.T) {
queryRangeRes := callQueryRange(t, tempo.Endpoint(tempoPort), testCase.query, 100, debugMode)
req := queryRangeRequest{
Query: testCase.query,
Exemplars: 100,
}
queryRangeRes := callQueryRange(t, tempo.Endpoint(tempoPort), req)
require.NotNil(t, queryRangeRes)
require.Equal(t, len(queryRangeRes.GetSeries()), 2)
require.Equal(t, len(queryRangeRes.GetSeries()), 3) // value 1, value 2 and nil (high cardinality's span has no such attribute)

// Verify that all exemplars in this series belongs to the right series
// by matching attribute values
var skippedForNilAttr bool
for _, series := range queryRangeRes.Series {
// search attribute value for the series
var expectedAttrValue string
Expand All @@ -215,6 +256,10 @@ sendLoop:
break
}
}
if (expectedAttrValue == "" || expectedAttrValue == "nil") && !skippedForNilAttr { // one attribute is empty, so we skip it
skippedForNilAttr = true
continue
}
require.NotEmpty(t, expectedAttrValue)

// check attribute value in exemplars
Expand All @@ -233,8 +278,12 @@ sendLoop:
}

// invalid query
res := doRequest(t, tempo.Endpoint(tempoPort), "{. a}", 100)
require.Equal(t, 400, res.StatusCode)
t.Run("invalid query", func(t *testing.T) {
req := queryRangeRequest{Query: "{. a}"}
req.SetDefaults()
res := doRequest(t, tempo.Endpoint(tempoPort), "api/metrics/query_range", queryRangeRequest{Query: "{. a}"})
require.Equal(t, 400, res.StatusCode)
})

// query with empty results
for _, query := range []string{
Expand All @@ -244,7 +293,7 @@ sendLoop:
`{span.randomattr = "doesnotexist"} | count_over_time()`,
} {
t.Run(query, func(t *testing.T) {
queryRangeRes := callQueryRange(t, tempo.Endpoint(tempoPort), query, 100, debugMode)
queryRangeRes := callQueryRange(t, tempo.Endpoint(tempoPort), queryRangeRequest{Query: query, Exemplars: 100})
require.NotNil(t, queryRangeRes)
// it has time series but they are empty and has no exemplars
require.GreaterOrEqual(t, len(queryRangeRes.GetSeries()), 1)
Expand All @@ -255,6 +304,144 @@ sendLoop:
require.Equal(t, 0, exemplarCount)
})
}

for _, testCase := range []struct {
name string
query string
step string
converter func([]tempopb.Sample) float64
}{
{
name: "count_over_time",
query: "{} | count_over_time()",
step: "1s",
converter: sumSamples,
},
{
name: "sum_over_time",
query: "{} | sum_over_time(duration)",
step: "1s",
converter: sumSamples,
},
{
name: "max_over_time",
query: "{} | max_over_time(duration)",
step: "1s",
converter: maxSamples,
},
{
name: "min_over_time",
query: "{} | min_over_time(duration)",
step: "1s",
converter: minSamples,
},
{
name: "1m step",
query: "{} | count_over_time()",
step: "1m",
converter: sumSamples,
},
} {
t.Run(testCase.name, func(t *testing.T) {
req := queryRangeRequest{
Query: testCase.query,
// Query range truncates the start and end to the step, while instant query does not.
// We need to truncate the start and end to the step to align the interval for query range and instant query.
Start: time.Now().Add(-5 * time.Minute).Truncate(time.Minute),
End: time.Now().Add(time.Minute).Truncate(time.Minute),
Step: testCase.step,
}

queryRangeRes := callQueryRange(t, tempo.Endpoint(tempoPort), req)
require.NotNil(t, queryRangeRes)
require.Equal(t, 1, len(queryRangeRes.GetSeries()))

expectedValue := testCase.converter(queryRangeRes.Series[0].Samples)

instantQueryRes := callInstantQuery(t, tempo.Endpoint(tempoPort), req)
require.NotNil(t, instantQueryRes)
require.Equal(t, 1, len(instantQueryRes.GetSeries()))
require.InDelta(t, expectedValue, instantQueryRes.GetSeries()[0].Value, 0.000001)
})
}

for _, testCase := range []struct {
name string
query string
expectedNum int
}{
{
name: "top 1 by span attribute",
query: "{ } | rate() by (span.span_high_cardinality) | topk(1)",
expectedNum: 1,
},
{
name: "top 10 by span attribute",
query: "{ } | rate() by (span.span_high_cardinality) | topk(10)",
expectedNum: 10,
},
{
name: "top 2 by resource attribute",
query: "{ } | rate() by (resource.res_high_cardinality) | topk(2)",
expectedNum: 2,
},
{
name: "bottom 1 by resource attribute",
query: "{ } | rate() by (resource.res_high_cardinality) | bottomk(1)",
expectedNum: 1,
},
{
name: "bootom 10 by resource attribute",
query: "{ } | rate() by (resource.res_high_cardinality) | bottomk(10)",
expectedNum: 10,
},
{
name: "bottom 2 by span attribute",
query: "{ } | rate() by (span.span_high_cardinality) | bottomk(2)",
expectedNum: 2,
},
} {
t.Run(testCase.name, func(t *testing.T) {
req := queryRangeRequest{
Query: testCase.query,
Start: time.Now().Add(-5 * time.Minute),
End: time.Now().Add(time.Minute),
Step: "1m",
}

instantQueryRes := callInstantQuery(t, tempo.Endpoint(tempoPort), req)
require.NotNil(t, instantQueryRes)
require.Equal(t, testCase.expectedNum, len(instantQueryRes.GetSeries()))
})
}
}

func sumSamples(samples []tempopb.Sample) float64 {
var sum float64
for _, sample := range samples {
sum += sample.Value
}
return sum
}

func maxSamples(samples []tempopb.Sample) float64 {
maxValue := math.Inf(-1)
for _, sample := range samples {
if sample.Value > maxValue {
maxValue = sample.Value
}
}
return maxValue
}

func minSamples(samples []tempopb.Sample) float64 {
minValue := math.Inf(1)
for _, sample := range samples {
if sample.Value < minValue {
minValue = sample.Value
}
}
return minValue
}

// TestQueryRangeSingleTrace checks count for a single trace
Expand Down Expand Up @@ -284,7 +471,7 @@ func TestQueryRangeSingleTrace(t *testing.T) {

// Query the trace by count. As we have only one trace, we should get one dot with value 1
query := "{} | count_over_time()"
queryRangeRes := callQueryRange(t, tempo.Endpoint(tempoPort), query, 100, debugMode)
queryRangeRes := callQueryRange(t, tempo.Endpoint(tempoPort), queryRangeRequest{Query: query, Exemplars: 100})
require.NotNil(t, queryRangeRes)
require.Equal(t, len(queryRangeRes.GetSeries()), 1)

Expand Down Expand Up @@ -501,15 +688,35 @@ sendLoop:
require.Equal(t, spanCount, len(queryRangeRes.GetSeries()))
}

func callQueryRange(t *testing.T, endpoint, query string, exemplars int, printBody bool) tempopb.QueryRangeResponse {
res := doRequest(t, endpoint, query, exemplars)
func callInstantQuery(t *testing.T, endpoint string, req queryRangeRequest) tempopb.QueryInstantResponse {
req.SetDefaults()
res := doRequest(t, endpoint, "api/metrics/query", req)
require.Equal(t, http.StatusOK, res.StatusCode)

// Read body and print it
body, err := io.ReadAll(res.Body)
require.NoError(t, err)
if printBody {
fmt.Println(string(body))
if debugMode {
t.Logf("Response body: %s", string(body))
}

instantQueryRes := tempopb.QueryInstantResponse{}
readBody := strings.NewReader(string(body))
err = new(jsonpb.Unmarshaler).Unmarshal(readBody, &instantQueryRes)
require.NoError(t, err)
return instantQueryRes
}

func callQueryRange(t *testing.T, endpoint string, req queryRangeRequest) tempopb.QueryRangeResponse {
req.SetDefaults()
res := doRequest(t, endpoint, "api/metrics/query_range", req)
require.Equal(t, http.StatusOK, res.StatusCode)

// Read body and print it
body, err := io.ReadAll(res.Body)
require.NoError(t, err)
if debugMode {
t.Logf("Response body: %s", string(body))
}

queryRangeRes := tempopb.QueryRangeResponse{}
Expand All @@ -519,24 +726,25 @@ func callQueryRange(t *testing.T, endpoint, query string, exemplars int, printBo
return queryRangeRes
}

func doRequest(t *testing.T, endpoint, query string, exemplars int) *http.Response {
url := buildURL(endpoint, fmt.Sprintf("%s with(exemplars=true)", query), exemplars)
req, err := http.NewRequest(http.MethodGet, url, nil)
func doRequest(t *testing.T, host, endpoint string, req queryRangeRequest) *http.Response {
req.Query = fmt.Sprintf("%s with(exemplars=true)", req.Query)
url := buildURL(host, endpoint, req)
rawReq, err := http.NewRequest(http.MethodGet, url, nil)
require.NoError(t, err)

res, err := http.DefaultClient.Do(req)
res, err := http.DefaultClient.Do(rawReq)
require.NoError(t, err)
return res
}

func buildURL(endpoint, query string, exemplars int) string {
func buildURL(host, endpoint string, req queryRangeRequest) string {
return fmt.Sprintf(
"http://%s/api/metrics/query_range?query=%s&start=%d&end=%d&step=%s&exemplars=%d",
endpoint,
url.QueryEscape(query),
time.Now().Add(-5*time.Minute).UnixNano(),
time.Now().Add(time.Minute).UnixNano(),
"5s",
exemplars,
"http://%s/%s?query=%s&start=%d&end=%d&step=%s&exemplars=%d",
host, endpoint,
url.QueryEscape(req.Query),
req.Start.UnixNano(),
req.End.UnixNano(),
req.Step,
req.Exemplars,
)
}
6 changes: 5 additions & 1 deletion modules/frontend/metrics_query_range_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestQueryRangeHandlerSucceeds(t *testing.T) {
httpReq = api.BuildQueryRangeRequest(httpReq, &tempopb.QueryRangeRequest{
Query: "{} | rate()",
Start: uint64(1100 * time.Second),
End: uint64(1200 * time.Second),
End: uint64(1300 * time.Second),
Step: uint64(100 * time.Second),
}, "")

Expand Down Expand Up @@ -100,6 +100,10 @@ func TestQueryRangeHandlerSucceeds(t *testing.T) {
TimestampMs: 1200_000,
Value: 8,
},
{
TimestampMs: 1300_000,
Value: 0,
},
},
},
},
Expand Down
Loading
Loading