diff --git a/CHANGELOG.md b/CHANGELOG.md index bd186254d36..24a1190d947 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ * [BUGFIX] Correct avg_over_time calculation [#6252](https://github.com/grafana/tempo/pull/6252) (@ruslan-mikhailov) * [BUGFIX] Correct instant query calculation for rate() [#6205](https://github.com/grafana/tempo/pull/6205) (@ruslan-mikhailov) * [BUGFIX] Fix live-store deadlock occurring after a complete block failure [#6338](https://github.com/grafana/tempo/pull/6338) (@ruslan-mikhailov) +* [BUGFIX] Fix query_end_cutoff param for query range [#6360](https://github.com/grafana/tempo/pull/6360) (@ruslan-mikhailov) * [BUGFIX] generator: fix dimension_mappings and target_info_excluded_dimensions being unconditionally overwritten even when overrides were nil [#6390](https://github.com/grafana/tempo/pull/6390) (@carles-grafana) * [BUGFIX] generator: fix panic when `write_relabel_configs` is configured on remote write endpoints [#6396](https://github.com/grafana/tempo/pull/6396) (@carles-grafana) diff --git a/integration/api/config-query-range-end-cutoff.yaml b/integration/api/config-query-range-end-cutoff.yaml new file mode 100644 index 00000000000..10d2e704cb6 --- /dev/null +++ b/integration/api/config-query-range-end-cutoff.yaml @@ -0,0 +1,3 @@ +query_frontend: + query_end_cutoff: 5s + diff --git a/integration/api/query_range_test.go b/integration/api/query_range_test.go index b1f06c42dd3..8a9ddac5d3b 100644 --- a/integration/api/query_range_test.go +++ b/integration/api/query_range_test.go @@ -23,6 +23,7 @@ const ( configQueryRangeMaxSeries = "config-query-range-max-series.yaml" configQueryRangeMaxSeriesDisabled = "config-query-range-max-series-disabled.yaml" configQueryRangeExemplars = "config-query-range-exemplars.yaml" + configQueryRangeEndCutoff = "config-query-range-end-cutoff.yaml" ) type queryRangeRequest struct { @@ -695,6 +696,63 @@ func TestQueryRangeTypeHandling(t *testing.T) { }) } +func TestQueryRangeEndCutoff(t *testing.T) { + util.RunIntegrationTests(t, util.TestHarnessConfig{ + ConfigOverlay: configQueryRangeEndCutoff, + }, func(h *util.TempoHarness) { + h.WaitTracesWritable(t) + + ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() + timer := time.NewTimer(10 * time.Second) + defer timer.Stop() + + tracesSent := 0 + sendLoop: + for { + select { + case <-ticker.C: + require.NoError(t, h.WriteJaegerBatch(util.MakeThriftBatch(), "")) + tracesSent++ + case <-timer.C: + break sendLoop + } + } + + h.WaitTracesQueryable(t, tracesSent) + + // Query with end=now, which should be adjusted by query_end_cutoff (5s) + cutoff := 5 * time.Second + now := time.Now() + + req := queryRangeRequest{ + Query: "{} | count_over_time()", + Start: now.Add(-15 * time.Second), + End: now, + Step: time.Second.String(), + Exemplars: 100, + } + + callQueryRange(t, h, req, func(queryRangeRes *tempopb.QueryRangeResponse, err error) { + require.NoError(t, err) + require.NotNil(t, queryRangeRes) + require.Equal(t, 1, len(queryRangeRes.GetSeries())) + + series := queryRangeRes.GetSeries()[0] + samples := series.GetSamples() + require.Greater(t, len(samples), 0, "Expected at least one sample") + + // The cutoff should be applied, so the last sample should be at least 'cutoff' seconds before now + maxAllowedTimestamp := now.Add(-cutoff).UnixMilli() + for _, sample := range samples { + assert.LessOrEqual(t, sample.TimestampMs, maxAllowedTimestamp, + "Sample timestamp %d is after cutoff %d (diff: %d ms)", + sample.TimestampMs, maxAllowedTimestamp, sample.TimestampMs-maxAllowedTimestamp) + } + }) + }) +} + func callInstantQuery(apiClient *httpclient.Client, req queryRangeRequest) (*tempopb.QueryInstantResponse, error) { req.SetDefaults() return apiClient.MetricsQueryInstant(req.Query, req.Start.UnixNano(), req.End.UnixNano(), req.Exemplars) diff --git a/modules/frontend/frontend.go b/modules/frontend/frontend.go index 8e480aa800e..6d7658cc151 100644 --- a/modules/frontend/frontend.go +++ b/modules/frontend/frontend.go @@ -229,7 +229,9 @@ func New(cfg Config, next pipeline.RoundTripper, o overrides.Interface, reader t queryRangePipeline := pipeline.Build( []pipeline.AsyncMiddleware[combiner.PipelineResponse]{ headerStripWare, - adjustEndWareNanos, + // due to alignments and combiner, it needs to be done in handler + // TODO: initialise combiner after middlewares and uncomment + // adjustEndWareNanos, urlDenyListWare, queryValidatorWare, pipeline.NewWeightRequestWare(pipeline.TraceQLMetrics, cfg.Weights), diff --git a/modules/frontend/metrics_query_range_handler.go b/modules/frontend/metrics_query_range_handler.go index 139dadef812..f86f9e1ad82 100644 --- a/modules/frontend/metrics_query_range_handler.go +++ b/modules/frontend/metrics_query_range_handler.go @@ -50,8 +50,21 @@ func newQueryRangeStreamingGRPCHandler(cfg Config, next pipeline.AsyncRoundTripp if err := validateQueryRangeReq(cfg, req); err != nil { return err } + traceql.AlignRequest(req) + // the end time cutoff is applied here because it has to be done before combiner creation + // TODO: this is a copy of ClampDateRangeReq and needs to be removed after a proper fix + if cfg.QueryEndCutoff > 0 { + now := time.Now() + maxEnd := now.Add(-cfg.QueryEndCutoff) + reqEnd := time.Unix(0, int64(req.End)) + if maxEnd.Before(reqEnd) { + req.End = uint64(maxEnd.UnixNano()) + traceql.AlignEndToLeft(req) // realign, but always to the left + } + } + httpReq := api.BuildQueryRangeRequest(&http.Request{ URL: &url.URL{Path: downstreamPath}, Header: headers, @@ -119,9 +132,22 @@ func newMetricsQueryRangeHTTPHandler(cfg Config, next pipeline.AsyncRoundTripper if err := validateQueryRangeReq(cfg, queryRangeReq); err != nil { return httpInvalidRequest(err), nil } - req = api.BuildQueryRangeRequest(req, queryRangeReq, "") + traceql.AlignRequest(queryRangeReq) + // the end time cutoff is applied here because it has to be done before combiner creation + // TODO: this is a copy of ClampDateRangeReq and needs to be removed after a proper fix + if cfg.QueryEndCutoff > 0 { + now := time.Now() + maxEnd := now.Add(-cfg.QueryEndCutoff) + reqEnd := time.Unix(0, int64(queryRangeReq.End)) + if maxEnd.Before(reqEnd) { + queryRangeReq.End = uint64(maxEnd.UnixNano()) + traceql.AlignEndToLeft(queryRangeReq) // realign, but always to the left + } + } + req = api.BuildQueryRangeRequest(req, queryRangeReq, "") + // build and use roundtripper combiner, err := combiner.NewTypedQueryRange(queryRangeReq, cfg.Metrics.Sharder.MaxResponseSeries) if err != nil { diff --git a/modules/frontend/metrics_query_range_handler_test.go b/modules/frontend/metrics_query_range_handler_test.go index 446a99f041f..eb1e17e504c 100644 --- a/modules/frontend/metrics_query_range_handler_test.go +++ b/modules/frontend/metrics_query_range_handler_test.go @@ -5,14 +5,18 @@ import ( "context" "fmt" "io" + "net/http" "net/http/httptest" "net/url" + "sync" "testing" + "testing/synctest" "time" "github.com/gogo/protobuf/jsonpb" "github.com/gogo/protobuf/proto" "github.com/grafana/dskit/user" + "github.com/grafana/tempo/modules/frontend/pipeline" "github.com/grafana/tempo/modules/overrides" "github.com/grafana/tempo/pkg/api" "github.com/grafana/tempo/pkg/cache" @@ -20,6 +24,7 @@ import ( v1 "github.com/grafana/tempo/pkg/tempopb/common/v1" "github.com/grafana/tempo/pkg/util/test" "github.com/grafana/tempo/tempodb/backend" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -116,8 +121,8 @@ func TestQueryRangeHandlerSucceeds(t *testing.T) { func TestQueryRangeAccessesCache(t *testing.T) { tenant := "foo" meta := &backend.BlockMeta{ - StartTime: time.Unix(15, 0), - EndTime: time.Unix(16, 0), + StartTime: time.Unix(150, 0), + EndTime: time.Unix(160, 0), Size_: defaultTargetBytesPerRequest, TotalRecords: 1, BlockID: backend.MustParse("00000000-0000-0000-0000-000000000123"), @@ -135,11 +140,11 @@ func TestQueryRangeAccessesCache(t *testing.T) { }, Samples: []tempopb.Sample{ { - TimestampMs: 1200_000, + TimestampMs: 12_000_000, Value: 2, }, { - TimestampMs: 1100_000, + TimestampMs: 11_000_000, Value: 1, }, }, @@ -166,8 +171,8 @@ func TestQueryRangeAccessesCache(t *testing.T) { step := 1000000000 query := "{} | rate()" hash := hashForQueryRangeRequest(&tempopb.QueryRangeRequest{Query: query, Step: uint64(step)}) - startNS := 10 * time.Second - endNS := 20 * time.Second + startNS := 100 * time.Second + endNS := 200 * time.Second cacheKey := queryRangeCacheKey(tenant, hash, time.Unix(0, int64(startNS)), time.Unix(0, int64(endNS)), meta, 0, 1) // confirm cache key coesn't exist @@ -283,8 +288,8 @@ func TestQueryRangeCachedMetrics(t *testing.T) { // set up backend tenant := "foo" meta := &backend.BlockMeta{ - StartTime: time.Unix(15, 0), - EndTime: time.Unix(16, 0), + StartTime: time.Unix(150, 0), + EndTime: time.Unix(160, 0), Size_: defaultTargetBytesPerRequest, TotalRecords: 1, BlockID: backend.MustParse("00000000-0000-0000-0000-000000000123"), @@ -313,7 +318,7 @@ func TestQueryRangeCachedMetrics(t *testing.T) { }, Samples: []tempopb.Sample{ { - TimestampMs: 1100_000, + TimestampMs: 11_000_000, Value: 1, }, }, @@ -329,8 +334,8 @@ func TestQueryRangeCachedMetrics(t *testing.T) { query := "{} | rate()" var step uint64 = 1000000000 hash := hashForQueryRangeRequest(&tempopb.QueryRangeRequest{Query: query, Step: step}) - startNS := uint64(10 * time.Second) - endNS := uint64(20 * time.Second) + startNS := uint64(100 * time.Second) + endNS := uint64(200 * time.Second) cacheKey := queryRangeCacheKey(tenant, hash, time.Unix(0, int64(startNS)), time.Unix(0, int64(endNS)), meta, 0, 1) // confirm cache key doesn't exist @@ -385,3 +390,140 @@ func TestQueryRangeCachedMetrics(t *testing.T) { require.Equal(t, uint32(1), actualResp.Metrics.TotalBlocks) require.Equal(t, uint64(defaultTargetBytesPerRequest), actualResp.Metrics.TotalBlockBytes) } + +func TestQueryRangeHandlerWithEndCutoff(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + step := 10 * time.Second + + time.Sleep(123 * time.Millisecond) // to make default time for synctest imperfect + now := time.Now() + alignedNow := now.Truncate(step) // align back to step + + start := now.Add(-100 * time.Second).UnixNano() + cutoff := 30 * time.Second + + tenant := "foo" + meta := &backend.BlockMeta{ + StartTime: now.Add(-100 * time.Second), + EndTime: now, + Size_: defaultTargetBytesPerRequest, + TotalRecords: 1, + BlockID: backend.MustParse("00000000-0000-0000-0000-000000000123"), + ReplicationFactor: 1, + } + retResp := &tempopb.QueryRangeResponse{ + Metrics: &tempopb.SearchMetrics{ + InspectedTraces: 1, + InspectedBytes: 1, + }, + Series: []*tempopb.TimeSeries{ + { + Labels: []v1.KeyValue{ + {Key: "foo", Value: &v1.AnyValue{Value: &v1.AnyValue_StringValue{StringValue: "bar"}}}, + }, + Samples: []tempopb.Sample{ + { + TimestampMs: 12_000_000, + Value: 2, + }, + { + TimestampMs: 11_000_000, + Value: 1, + }, + }, + }, + }, + } + + rdr := &mockReader{ + metas: []*backend.BlockMeta{meta}, + } + rt := &mockRoundTripperWithCapture{ + rt: mockRoundTripper{ + responseFn: func() proto.Message { + return retResp + }, + }, + } + + for _, tc := range []struct { + name string + end time.Time + expectedEnd time.Time + }{ + { + name: "now", + end: now, + expectedEnd: alignedNow.Add(-cutoff), + }, + { + name: "unaligned", + end: now.Add(-100 * time.Millisecond), + expectedEnd: alignedNow.Add(-cutoff), + }, + { + name: "before cutoff", + end: now.Add(-cutoff - 20*time.Second), + expectedEnd: alignedNow.Add(-cutoff - 10*time.Second), // aligned to right + }, + { + name: "before unaligned", + end: now.Add(-cutoff - 3*time.Second), + expectedEnd: alignedNow.Add(-cutoff), + }, + } { + f := frontendWithSettings(t, rt, rdr, nil, nil, func(c *Config, _ *overrides.Config) { + c.Metrics.Sharder.Interval = time.Hour + c.QueryEndCutoff = cutoff + }) + + httpReq := httptest.NewRequest("GET", api.PathMetricsQueryRange, nil) + httpReq = api.BuildQueryRangeRequest(httpReq, &tempopb.QueryRangeRequest{ + Query: "{} | rate()", + Start: uint64(start), + End: uint64(tc.end.UnixNano()), + Step: uint64(step), + }, "") + + ctx := user.InjectOrgID(httpReq.Context(), tenant) + httpReq = httpReq.WithContext(ctx) + + httpResp := httptest.NewRecorder() + + f.MetricsQueryRangeHandler.ServeHTTP(httpResp, httpReq) + + resp := httpResp.Result() + require.Equal(t, 200, resp.StatusCode) + + actualResp := &tempopb.QueryRangeResponse{} + bytesResp, err := io.ReadAll(resp.Body) + require.NoError(t, err) + err = jsonpb.Unmarshal(bytes.NewReader(bytesResp), actualResp) + require.NoError(t, err) + + require.NotNil(t, rt.req) + actualEnd := time.Unix(0, int64(rt.req.End)) + assert.Equal(t, tc.expectedEnd, actualEnd, "[%s] actual end %s is not equal to expected end %s", tc.name, actualEnd, tc.expectedEnd) + } + }) +} + +// mockRoundTripperWithCapture is a mitm helper that captures query range requests +type mockRoundTripperWithCapture struct { + rt mockRoundTripper + req *tempopb.QueryRangeRequest + mx sync.Mutex +} + +func (m *mockRoundTripperWithCapture) RoundTrip(req pipeline.Request) (*http.Response, error) { + qrReq, err := api.ParseQueryRangeRequest(req.HTTPRequest()) + if err != nil { + panic("wrong test setup") + } + m.mx.Lock() + defer m.mx.Unlock() + m.req = qrReq + + res, err := m.rt.RoundTrip(req) + return res, err +} diff --git a/pkg/traceql/engine_metrics.go b/pkg/traceql/engine_metrics.go index 354601540e5..4a03c3ca82d 100644 --- a/pkg/traceql/engine_metrics.go +++ b/pkg/traceql/engine_metrics.go @@ -158,6 +158,19 @@ func AlignRequest(req *tempopb.QueryRangeRequest) { } } +func AlignEndToLeft(req *tempopb.QueryRangeRequest) { + if IsInstant(req) { + return + } + + if req.Step == 0 { + return + } + + mod := req.End % req.Step + req.End -= mod +} + // Start time is rounded down to next step func alignStart(start, _, step uint64, instant bool) uint64 { if step == 0 {