Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* [BUGFIX] Correct avg_over_time calculation [#6252](https://github.com/grafana/tempo/pull/6252) (@ruslan-mikhailov)
* [BUGFIX] Correct instant query calculation for rate() [#6205](https://github.com/grafana/tempo/pull/6205) (@ruslan-mikhailov)
* [BUGFIX] Fix live-store deadlock occurring after a complete block failure [#6338](https://github.com/grafana/tempo/pull/6338) (@ruslan-mikhailov)
* [BUGFIX] Fix query_end_cutoff param for query range [#6360](https://github.com/grafana/tempo/pull/6360) (@ruslan-mikhailov)
* [BUGFIX] generator: fix dimension_mappings and target_info_excluded_dimensions being unconditionally overwritten even when overrides were nil [#6390](https://github.com/grafana/tempo/pull/6390) (@carles-grafana)
* [BUGFIX] generator: fix panic when `write_relabel_configs` is configured on remote write endpoints [#6396](https://github.com/grafana/tempo/pull/6396) (@carles-grafana)

Expand Down
3 changes: 3 additions & 0 deletions integration/api/config-query-range-end-cutoff.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
query_frontend:
query_end_cutoff: 5s

58 changes: 58 additions & 0 deletions integration/api/query_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (
configQueryRangeMaxSeries = "config-query-range-max-series.yaml"
configQueryRangeMaxSeriesDisabled = "config-query-range-max-series-disabled.yaml"
configQueryRangeExemplars = "config-query-range-exemplars.yaml"
configQueryRangeEndCutoff = "config-query-range-end-cutoff.yaml"
)

type queryRangeRequest struct {
Expand Down Expand Up @@ -695,6 +696,63 @@ func TestQueryRangeTypeHandling(t *testing.T) {
})
}

func TestQueryRangeEndCutoff(t *testing.T) {
util.RunIntegrationTests(t, util.TestHarnessConfig{
ConfigOverlay: configQueryRangeEndCutoff,
}, func(h *util.TempoHarness) {
h.WaitTracesWritable(t)

ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
timer := time.NewTimer(10 * time.Second)
defer timer.Stop()

tracesSent := 0
sendLoop:
for {
select {
case <-ticker.C:
require.NoError(t, h.WriteJaegerBatch(util.MakeThriftBatch(), ""))
tracesSent++
case <-timer.C:
break sendLoop
}
}

h.WaitTracesQueryable(t, tracesSent)

// Query with end=now, which should be adjusted by query_end_cutoff (5s)
cutoff := 5 * time.Second
now := time.Now()

req := queryRangeRequest{
Query: "{} | count_over_time()",
Start: now.Add(-15 * time.Second),
End: now,
Step: time.Second.String(),
Exemplars: 100,
}

callQueryRange(t, h, req, func(queryRangeRes *tempopb.QueryRangeResponse, err error) {
require.NoError(t, err)
require.NotNil(t, queryRangeRes)
require.Equal(t, 1, len(queryRangeRes.GetSeries()))

series := queryRangeRes.GetSeries()[0]
samples := series.GetSamples()
require.Greater(t, len(samples), 0, "Expected at least one sample")

// The cutoff should be applied, so the last sample should be at least 'cutoff' seconds before now
maxAllowedTimestamp := now.Add(-cutoff).UnixMilli()
for _, sample := range samples {
assert.LessOrEqual(t, sample.TimestampMs, maxAllowedTimestamp,
"Sample timestamp %d is after cutoff %d (diff: %d ms)",
sample.TimestampMs, maxAllowedTimestamp, sample.TimestampMs-maxAllowedTimestamp)
}
})
})
}

func callInstantQuery(apiClient *httpclient.Client, req queryRangeRequest) (*tempopb.QueryInstantResponse, error) {
req.SetDefaults()
return apiClient.MetricsQueryInstant(req.Query, req.Start.UnixNano(), req.End.UnixNano(), req.Exemplars)
Expand Down
4 changes: 3 additions & 1 deletion modules/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,9 @@ func New(cfg Config, next pipeline.RoundTripper, o overrides.Interface, reader t
queryRangePipeline := pipeline.Build(
[]pipeline.AsyncMiddleware[combiner.PipelineResponse]{
headerStripWare,
adjustEndWareNanos,
// due to alignments and combiner, it needs to be done in handler
// TODO: initialise combiner after middlewares and uncomment
// adjustEndWareNanos,
urlDenyListWare,
queryValidatorWare,
pipeline.NewWeightRequestWare(pipeline.TraceQLMetrics, cfg.Weights),
Expand Down
28 changes: 27 additions & 1 deletion modules/frontend/metrics_query_range_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,21 @@
if err := validateQueryRangeReq(cfg, req); err != nil {
return err
}

traceql.AlignRequest(req)

// the end time cutoff is applied here because it has to be done before combiner creation
// TODO: this is a copy of ClampDateRangeReq and needs to be removed after a proper fix
if cfg.QueryEndCutoff > 0 {
now := time.Now()
maxEnd := now.Add(-cfg.QueryEndCutoff)
reqEnd := time.Unix(0, int64(req.End))
if maxEnd.Before(reqEnd) {
req.End = uint64(maxEnd.UnixNano())
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's wrong with the existing reqEnd? The code does something similar, substracting the EndCutOff to time.Now()

traceql.AlignEndToLeft(req) // realign, but always to the left
}

Check notice on line 65 in modules/frontend/metrics_query_range_handler.go

View workflow job for this annotation

GitHub Actions / Coverage Annotations

Uncovered lines

Lines 56-65 are not covered by tests
}

httpReq := api.BuildQueryRangeRequest(&http.Request{
URL: &url.URL{Path: downstreamPath},
Header: headers,
Expand Down Expand Up @@ -119,9 +132,22 @@
if err := validateQueryRangeReq(cfg, queryRangeReq); err != nil {
return httpInvalidRequest(err), nil
}
req = api.BuildQueryRangeRequest(req, queryRangeReq, "")

traceql.AlignRequest(queryRangeReq)

// the end time cutoff is applied here because it has to be done before combiner creation
// TODO: this is a copy of ClampDateRangeReq and needs to be removed after a proper fix
if cfg.QueryEndCutoff > 0 {
now := time.Now()
maxEnd := now.Add(-cfg.QueryEndCutoff)
reqEnd := time.Unix(0, int64(queryRangeReq.End))
if maxEnd.Before(reqEnd) {
queryRangeReq.End = uint64(maxEnd.UnixNano())
traceql.AlignEndToLeft(queryRangeReq) // realign, but always to the left
}
}
req = api.BuildQueryRangeRequest(req, queryRangeReq, "")

// build and use roundtripper
combiner, err := combiner.NewTypedQueryRange(queryRangeReq, cfg.Metrics.Sharder.MaxResponseSeries)
if err != nil {
Expand Down
164 changes: 153 additions & 11 deletions modules/frontend/metrics_query_range_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,26 @@ import (
"context"
"fmt"
"io"
"net/http"
"net/http/httptest"
"net/url"
"sync"
"testing"
"testing/synctest"
"time"

"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"
"github.com/grafana/dskit/user"
"github.com/grafana/tempo/modules/frontend/pipeline"
"github.com/grafana/tempo/modules/overrides"
"github.com/grafana/tempo/pkg/api"
"github.com/grafana/tempo/pkg/cache"
"github.com/grafana/tempo/pkg/tempopb"
v1 "github.com/grafana/tempo/pkg/tempopb/common/v1"
"github.com/grafana/tempo/pkg/util/test"
"github.com/grafana/tempo/tempodb/backend"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -116,8 +121,8 @@ func TestQueryRangeHandlerSucceeds(t *testing.T) {
func TestQueryRangeAccessesCache(t *testing.T) {
tenant := "foo"
meta := &backend.BlockMeta{
StartTime: time.Unix(15, 0),
EndTime: time.Unix(16, 0),
StartTime: time.Unix(150, 0),
EndTime: time.Unix(160, 0),
Size_: defaultTargetBytesPerRequest,
TotalRecords: 1,
BlockID: backend.MustParse("00000000-0000-0000-0000-000000000123"),
Expand All @@ -135,11 +140,11 @@ func TestQueryRangeAccessesCache(t *testing.T) {
},
Samples: []tempopb.Sample{
{
TimestampMs: 1200_000,
TimestampMs: 12_000_000,
Value: 2,
},
{
TimestampMs: 1100_000,
TimestampMs: 11_000_000,
Value: 1,
},
},
Expand All @@ -166,8 +171,8 @@ func TestQueryRangeAccessesCache(t *testing.T) {
step := 1000000000
query := "{} | rate()"
hash := hashForQueryRangeRequest(&tempopb.QueryRangeRequest{Query: query, Step: uint64(step)})
startNS := 10 * time.Second
endNS := 20 * time.Second
startNS := 100 * time.Second
endNS := 200 * time.Second
cacheKey := queryRangeCacheKey(tenant, hash, time.Unix(0, int64(startNS)), time.Unix(0, int64(endNS)), meta, 0, 1)

// confirm cache key coesn't exist
Expand Down Expand Up @@ -283,8 +288,8 @@ func TestQueryRangeCachedMetrics(t *testing.T) {
// set up backend
tenant := "foo"
meta := &backend.BlockMeta{
StartTime: time.Unix(15, 0),
EndTime: time.Unix(16, 0),
StartTime: time.Unix(150, 0),
EndTime: time.Unix(160, 0),
Size_: defaultTargetBytesPerRequest,
TotalRecords: 1,
BlockID: backend.MustParse("00000000-0000-0000-0000-000000000123"),
Expand Down Expand Up @@ -313,7 +318,7 @@ func TestQueryRangeCachedMetrics(t *testing.T) {
},
Samples: []tempopb.Sample{
{
TimestampMs: 1100_000,
TimestampMs: 11_000_000,
Value: 1,
},
},
Expand All @@ -329,8 +334,8 @@ func TestQueryRangeCachedMetrics(t *testing.T) {
query := "{} | rate()"
var step uint64 = 1000000000
hash := hashForQueryRangeRequest(&tempopb.QueryRangeRequest{Query: query, Step: step})
startNS := uint64(10 * time.Second)
endNS := uint64(20 * time.Second)
startNS := uint64(100 * time.Second)
endNS := uint64(200 * time.Second)
cacheKey := queryRangeCacheKey(tenant, hash, time.Unix(0, int64(startNS)), time.Unix(0, int64(endNS)), meta, 0, 1)

// confirm cache key doesn't exist
Expand Down Expand Up @@ -385,3 +390,140 @@ func TestQueryRangeCachedMetrics(t *testing.T) {
require.Equal(t, uint32(1), actualResp.Metrics.TotalBlocks)
require.Equal(t, uint64(defaultTargetBytesPerRequest), actualResp.Metrics.TotalBlockBytes)
}

func TestQueryRangeHandlerWithEndCutoff(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
step := 10 * time.Second

time.Sleep(123 * time.Millisecond) // to make default time for synctest imperfect
now := time.Now()
alignedNow := now.Truncate(step) // align back to step

start := now.Add(-100 * time.Second).UnixNano()
cutoff := 30 * time.Second

tenant := "foo"
meta := &backend.BlockMeta{
StartTime: now.Add(-100 * time.Second),
EndTime: now,
Size_: defaultTargetBytesPerRequest,
TotalRecords: 1,
BlockID: backend.MustParse("00000000-0000-0000-0000-000000000123"),
ReplicationFactor: 1,
}
retResp := &tempopb.QueryRangeResponse{
Metrics: &tempopb.SearchMetrics{
InspectedTraces: 1,
InspectedBytes: 1,
},
Series: []*tempopb.TimeSeries{
{
Labels: []v1.KeyValue{
{Key: "foo", Value: &v1.AnyValue{Value: &v1.AnyValue_StringValue{StringValue: "bar"}}},
},
Samples: []tempopb.Sample{
{
TimestampMs: 12_000_000,
Value: 2,
},
{
TimestampMs: 11_000_000,
Value: 1,
},
},
},
},
}

rdr := &mockReader{
metas: []*backend.BlockMeta{meta},
}
rt := &mockRoundTripperWithCapture{
rt: mockRoundTripper{
responseFn: func() proto.Message {
return retResp
},
},
}

for _, tc := range []struct {
name string
end time.Time
expectedEnd time.Time
}{
{
name: "now",
end: now,
expectedEnd: alignedNow.Add(-cutoff),
},
{
name: "unaligned",
end: now.Add(-100 * time.Millisecond),
expectedEnd: alignedNow.Add(-cutoff),
},
{
name: "before cutoff",
end: now.Add(-cutoff - 20*time.Second),
expectedEnd: alignedNow.Add(-cutoff - 10*time.Second), // aligned to right
},
{
name: "before unaligned",
end: now.Add(-cutoff - 3*time.Second),
expectedEnd: alignedNow.Add(-cutoff),
},
} {
f := frontendWithSettings(t, rt, rdr, nil, nil, func(c *Config, _ *overrides.Config) {
c.Metrics.Sharder.Interval = time.Hour
c.QueryEndCutoff = cutoff
})

httpReq := httptest.NewRequest("GET", api.PathMetricsQueryRange, nil)
httpReq = api.BuildQueryRangeRequest(httpReq, &tempopb.QueryRangeRequest{
Query: "{} | rate()",
Start: uint64(start),
End: uint64(tc.end.UnixNano()),
Step: uint64(step),
}, "")

ctx := user.InjectOrgID(httpReq.Context(), tenant)
httpReq = httpReq.WithContext(ctx)

httpResp := httptest.NewRecorder()

f.MetricsQueryRangeHandler.ServeHTTP(httpResp, httpReq)

resp := httpResp.Result()
require.Equal(t, 200, resp.StatusCode)

actualResp := &tempopb.QueryRangeResponse{}
bytesResp, err := io.ReadAll(resp.Body)
require.NoError(t, err)
err = jsonpb.Unmarshal(bytes.NewReader(bytesResp), actualResp)
require.NoError(t, err)

require.NotNil(t, rt.req)
actualEnd := time.Unix(0, int64(rt.req.End))
assert.Equal(t, tc.expectedEnd, actualEnd, "[%s] actual end %s is not equal to expected end %s", tc.name, actualEnd, tc.expectedEnd)
}
})
}

// mockRoundTripperWithCapture is a mitm helper that captures query range requests
type mockRoundTripperWithCapture struct {
rt mockRoundTripper
req *tempopb.QueryRangeRequest
mx sync.Mutex
}

func (m *mockRoundTripperWithCapture) RoundTrip(req pipeline.Request) (*http.Response, error) {
qrReq, err := api.ParseQueryRangeRequest(req.HTTPRequest())
if err != nil {
panic("wrong test setup")
}
m.mx.Lock()
defer m.mx.Unlock()
m.req = qrReq

res, err := m.rt.RoundTrip(req)
return res, err
}
13 changes: 13 additions & 0 deletions pkg/traceql/engine_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,19 @@
}
}

func AlignEndToLeft(req *tempopb.QueryRangeRequest) {
if IsInstant(req) {
return
}

Check notice on line 164 in pkg/traceql/engine_metrics.go

View workflow job for this annotation

GitHub Actions / Coverage Annotations

Uncovered lines

Lines 161-164 are not covered by tests

if req.Step == 0 {
return
}

Check notice on line 168 in pkg/traceql/engine_metrics.go

View workflow job for this annotation

GitHub Actions / Coverage Annotations

Uncovered lines

Lines 166-168 are not covered by tests

mod := req.End % req.Step
req.End -= mod

Check notice on line 171 in pkg/traceql/engine_metrics.go

View workflow job for this annotation

GitHub Actions / Coverage Annotations

Uncovered lines

Lines 170-171 are not covered by tests
}

// Start time is rounded down to next step
func alignStart(start, _, step uint64, instant bool) uint64 {
if step == 0 {
Expand Down
Loading