diff --git a/CHANGELOG.md b/CHANGELOG.md index 640fa6cc0cb..c183fd23298 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ * [ENHANCEMENT] Protect ingesters from panics by adding defer/recover to all read path methods. [#3790](https://github.com/grafana/tempo/pull/3790) (@joe-elliott) * [ENHANCEMENT] Added a boolean flag to enable or disable dualstack mode on Storage block config for S3 [#3721](https://github.com/grafana/tempo/pull/3721) (@sid-jar, @mapno) * [ENHANCEMENT] Add caching to query range queries [#3796](https://github.com/grafana/tempo/pull/3796) (@mapno) +* [ENHANCEMENT] Only stream diffs on metrics queries [#3808](https://github.com/grafana/tempo/pull/3808) (@joe-elliott) * [ENHANCEMENT] Add data quality metric to measure traces without a root [#3812](https://github.com/grafana/tempo/pull/3812) (@mapno) * [BUGFIX] Fix metrics queries when grouping by attributes that may not exist [#3734](https://github.com/grafana/tempo/pull/3734) (@mdisibio) * [BUGFIX] Fix frontend parsing error on cached responses [#3759](https://github.com/grafana/tempo/pull/3759) (@mdisibio) diff --git a/cmd/tempo-cli/cmd-query-metrics-query-range.go b/cmd/tempo-cli/cmd-query-metrics-query-range.go index 02de9a32054..ad2d6ddac7f 100644 --- a/cmd/tempo-cli/cmd-query-metrics-query-range.go +++ b/cmd/tempo-cli/cmd-query-metrics-query-range.go @@ -34,13 +34,13 @@ func (cmd *metricsQueryRangeCmd) Run(_ *globalOptions) error { if err != nil { return err } - start := startDate.Unix() + start := startDate.UnixNano() endDate, err := time.Parse(time.RFC3339, cmd.End) if err != nil { return err } - end := endDate.Unix() + end := endDate.UnixNano() req := &tempopb.QueryRangeRequest{ Query: cmd.TraceQL, diff --git a/modules/frontend/combiner/metrics_query_range.go b/modules/frontend/combiner/metrics_query_range.go index 81cf18649a8..caa89bcf63a 100644 --- a/modules/frontend/combiner/metrics_query_range.go +++ b/modules/frontend/combiner/metrics_query_range.go @@ -11,8 +11,8 @@ import ( var _ GRPCCombiner[*tempopb.QueryRangeResponse] = (*genericCombiner[*tempopb.QueryRangeResponse])(nil) // NewQueryRange returns a query range combiner. -func NewQueryRange(req *tempopb.QueryRangeRequest) (Combiner, error) { - combiner, err := traceql.QueryRangeCombinerFor(req, traceql.AggregateModeFinal) +func NewQueryRange(req *tempopb.QueryRangeRequest, trackDiffs bool) (Combiner, error) { + combiner, err := traceql.QueryRangeCombinerFor(req, traceql.AggregateModeFinal, trackDiffs) if err != nil { return nil, err } @@ -59,9 +59,8 @@ func NewQueryRange(req *tempopb.QueryRangeRequest) (Combiner, error) { sortResponse(resp) return resp, nil }, - // todo: the diff method still returns the full response every time. find a way to diff diff: func(_ *tempopb.QueryRangeResponse) (*tempopb.QueryRangeResponse, error) { - resp := combiner.Response() + resp := combiner.Diff() if resp == nil { resp = &tempopb.QueryRangeResponse{} } @@ -71,8 +70,8 @@ func NewQueryRange(req *tempopb.QueryRangeRequest) (Combiner, error) { }, nil } -func NewTypedQueryRange(req *tempopb.QueryRangeRequest) (GRPCCombiner[*tempopb.QueryRangeResponse], error) { - c, err := NewQueryRange(req) +func NewTypedQueryRange(req *tempopb.QueryRangeRequest, trackDiffs bool) (GRPCCombiner[*tempopb.QueryRangeResponse], error) { + c, err := NewQueryRange(req, trackDiffs) if err != nil { return nil, err } diff --git a/modules/frontend/metrics_query_range_handler.go b/modules/frontend/metrics_query_range_handler.go index 57ba0754b0b..ce42b51f0af 100644 --- a/modules/frontend/metrics_query_range_handler.go +++ b/modules/frontend/metrics_query_range_handler.go @@ -37,7 +37,7 @@ func newQueryRangeStreamingGRPCHandler(cfg Config, next pipeline.AsyncRoundTripp start := time.Now() var finalResponse *tempopb.QueryRangeResponse - c, err := combiner.NewTypedQueryRange(req) + c, err := combiner.NewTypedQueryRange(req, true) if err != nil { return err } @@ -83,7 +83,7 @@ func newMetricsQueryRangeHTTPHandler(cfg Config, next pipeline.AsyncRoundTripper logQueryRangeRequest(logger, tenant, queryRangeReq) // build and use roundtripper - combiner, err := combiner.NewTypedQueryRange(queryRangeReq) + combiner, err := combiner.NewTypedQueryRange(queryRangeReq, false) if err != nil { level.Error(logger).Log("msg", "query range: query range combiner failed", "err", err) return &http.Response{ @@ -154,7 +154,7 @@ func logQueryRangeRequest(logger log.Logger, tenantID string, req *tempopb.Query "msg", "query range request", "tenant", tenantID, "query", req.Query, - "range_seconds", req.End-req.Start, + "range_nanos", req.End-req.Start, "mode", req.QueryMode, "step", req.Step) } diff --git a/modules/querier/querier_query_range.go b/modules/querier/querier_query_range.go index cd1b21e3535..a8672335078 100644 --- a/modules/querier/querier_query_range.go +++ b/modules/querier/querier_query_range.go @@ -52,7 +52,7 @@ func (q *Querier) queryRangeRecent(ctx context.Context, req *tempopb.QueryRangeR return nil, fmt.Errorf("error querying generators in Querier.MetricsQueryRange: %w", err) } - c, err := traceql.QueryRangeCombinerFor(req, traceql.AggregateModeSum) + c, err := traceql.QueryRangeCombinerFor(req, traceql.AggregateModeSum, false) if err != nil { return nil, err } diff --git a/pkg/traceql/combine.go b/pkg/traceql/combine.go index 7fbe772a168..3638a852db0 100644 --- a/pkg/traceql/combine.go +++ b/pkg/traceql/combine.go @@ -3,6 +3,7 @@ package traceql import ( "sort" "strings" + "time" "github.com/grafana/tempo/pkg/tempopb" ) @@ -141,22 +142,37 @@ func spansetID(ss *tempopb.SpanSet) string { return id } +type tsRange struct { + minTS, maxTS int64 +} + type QueryRangeCombiner struct { req *tempopb.QueryRangeRequest eval *MetricsFrontendEvaluator metrics *tempopb.SearchMetrics + + // used to track which series were updated since the previous diff + // todo: it may not be worth it to track the diffs per series. it would be simpler (and possibly nearly as effective) to just calculate a global + // max/min for all series + seriesUpdated map[string]tsRange } -func QueryRangeCombinerFor(req *tempopb.QueryRangeRequest, mode AggregateMode) (*QueryRangeCombiner, error) { +func QueryRangeCombinerFor(req *tempopb.QueryRangeRequest, mode AggregateMode, trackDiffs bool) (*QueryRangeCombiner, error) { eval, err := NewEngine().CompileMetricsQueryRangeNonRaw(req, mode) if err != nil { return nil, err } + var seriesUpdated map[string]tsRange + if trackDiffs { + seriesUpdated = map[string]tsRange{} + } + return &QueryRangeCombiner{ - req: req, - eval: eval, - metrics: &tempopb.SearchMetrics{}, + req: req, + eval: eval, + metrics: &tempopb.SearchMetrics{}, + seriesUpdated: seriesUpdated, }, nil } @@ -165,6 +181,9 @@ func (q *QueryRangeCombiner) Combine(resp *tempopb.QueryRangeResponse) { return } + // mark min/max for all series + q.markUpdatedRanges(resp) + // Here is where the job results are reentered into the pipeline q.eval.ObserveSeries(resp.Series) @@ -185,3 +204,61 @@ func (q *QueryRangeCombiner) Response() *tempopb.QueryRangeResponse { Metrics: q.metrics, } } + +func (q *QueryRangeCombiner) Diff() *tempopb.QueryRangeResponse { + if q.seriesUpdated == nil { + return q.Response() + } + + seriesRangeFn := func(promLabels string) (uint64, uint64, bool) { + tsr, ok := q.seriesUpdated[promLabels] + return uint64(tsr.minTS), uint64(tsr.maxTS), ok + } + + // filter out series that haven't change + resp := &tempopb.QueryRangeResponse{ + Series: q.eval.Results().ToProtoDiff(q.req, seriesRangeFn), + Metrics: q.metrics, + } + + // wipe out the diff for the next call + clear(q.seriesUpdated) + + return resp +} + +func (q *QueryRangeCombiner) markUpdatedRanges(resp *tempopb.QueryRangeResponse) { + if q.seriesUpdated == nil { + return + } + + // mark all ranges that changed + for _, series := range resp.Series { + if len(series.Samples) == 0 { + continue + } + + nanoMin := series.Samples[0].TimestampMs * int64(time.Millisecond) + nanoMax := series.Samples[len(series.Samples)-1].TimestampMs * int64(time.Millisecond) + + tsr, ok := q.seriesUpdated[series.PromLabels] + if !ok { + q.seriesUpdated[series.PromLabels] = tsRange{minTS: nanoMin, maxTS: nanoMax} + continue + } + + var updated bool + if nanoMin < tsr.minTS { + updated = true + tsr.minTS = nanoMin + } + if nanoMax > tsr.maxTS { + updated = true + tsr.maxTS = nanoMax + } + + if updated { + q.seriesUpdated[series.PromLabels] = tsr + } + } +} diff --git a/pkg/traceql/combine_test.go b/pkg/traceql/combine_test.go index 3adbe5955fa..a5503786024 100644 --- a/pkg/traceql/combine_test.go +++ b/pkg/traceql/combine_test.go @@ -1,7 +1,11 @@ package traceql import ( + "fmt" + "slices" + "strings" "testing" + "time" "github.com/grafana/tempo/pkg/tempopb" v1 "github.com/grafana/tempo/pkg/tempopb/common/v1" @@ -263,3 +267,156 @@ func TestCombineResults(t *testing.T) { }) } } + +// nolint:govet +func TestQueryRangeCombinerDiffs(t *testing.T) { + start := uint64(100 * time.Millisecond) + end := uint64(150 * time.Millisecond) + step := uint64(10 * time.Millisecond) + + tcs := []struct { + resp, expectedResponse, expectedDiff *tempopb.QueryRangeResponse + }{ + // push nothing get nothing + { + resp: &tempopb.QueryRangeResponse{}, + expectedResponse: &tempopb.QueryRangeResponse{ + Series: []*tempopb.TimeSeries{}, + }, + }, + // push 3 data points, get them back + { + resp: &tempopb.QueryRangeResponse{ + Series: []*tempopb.TimeSeries{ + timeSeries("foo", "1", []tempopb.Sample{{100, 1}, {110, 2}, {120, 3}}), + }, + }, + expectedResponse: &tempopb.QueryRangeResponse{ + Series: []*tempopb.TimeSeries{ + timeSeries("foo", "1", []tempopb.Sample{{100, 1}, {110, 2}, {120, 3}, {130, 0}, {140, 0}, {150, 0}}), + }, + }, + expectedDiff: &tempopb.QueryRangeResponse{ + Series: []*tempopb.TimeSeries{ + timeSeries("foo", "1", []tempopb.Sample{{100, 1}, {110, 2}, {120, 3}}), + }, + }, + }, + // push 2 data points, check aggregation + { + resp: &tempopb.QueryRangeResponse{ + Series: []*tempopb.TimeSeries{ + timeSeries("foo", "1", []tempopb.Sample{{120, 1}, {130, 2}, {150, 3}}), + }, + }, + expectedResponse: &tempopb.QueryRangeResponse{ + Series: []*tempopb.TimeSeries{ + timeSeries("foo", "1", []tempopb.Sample{{100, 1}, {110, 2}, {120, 4}, {130, 2}, {140, 0}, {150, 3}}), + }, + }, + }, + // push different series + { + resp: &tempopb.QueryRangeResponse{ + Series: []*tempopb.TimeSeries{ + timeSeries("bar", "1", []tempopb.Sample{{100, 1}, {110, 2}, {120, 3}}), + }, + }, + expectedResponse: &tempopb.QueryRangeResponse{ + Series: []*tempopb.TimeSeries{ + timeSeries("foo", "1", []tempopb.Sample{{100, 1}, {110, 2}, {120, 4}, {130, 2}, {140, 0}, {150, 3}}), + timeSeries("bar", "1", []tempopb.Sample{{100, 1}, {110, 2}, {120, 3}, {130, 0}, {140, 0}, {150, 0}}), + }, + }, + // includes last 2 pushes + expectedDiff: &tempopb.QueryRangeResponse{ + Series: []*tempopb.TimeSeries{ + timeSeries("foo", "1", []tempopb.Sample{{120, 4}, {130, 2}, {140, 0}, {150, 3}}), + timeSeries("bar", "1", []tempopb.Sample{{100, 1}, {110, 2}, {120, 3}}), + }, + }, + }, + // push different series by label value + { + resp: &tempopb.QueryRangeResponse{ + Series: []*tempopb.TimeSeries{ + timeSeries("foo", "2", []tempopb.Sample{{100, 1}, {110, 2}, {120, 3}}), + }, + }, + expectedResponse: &tempopb.QueryRangeResponse{ + Series: []*tempopb.TimeSeries{ + timeSeries("foo", "1", []tempopb.Sample{{100, 1}, {110, 2}, {120, 4}, {130, 2}, {140, 0}, {150, 3}}), + timeSeries("bar", "1", []tempopb.Sample{{100, 1}, {110, 2}, {120, 3}, {130, 0}, {140, 0}, {150, 0}}), + timeSeries("foo", "2", []tempopb.Sample{{100, 1}, {110, 2}, {120, 3}, {130, 0}, {140, 0}, {150, 0}}), + }, + }, + // includes last 2 pushes + expectedDiff: &tempopb.QueryRangeResponse{ + Series: []*tempopb.TimeSeries{ + timeSeries("foo", "2", []tempopb.Sample{{100, 1}, {110, 2}, {120, 3}}), + }, + }, + }, + } + + req := &tempopb.QueryRangeRequest{ + Start: start, + End: end, + Step: step, + Query: "{} | rate()", // simple aggregate + } + combiner, err := QueryRangeCombinerFor(req, AggregateModeFinal, true) + require.NoError(t, err) + + for i, tc := range tcs { + t.Run(fmt.Sprintf("step %d", i), func(t *testing.T) { + combiner.Combine(tc.resp) + + resp := combiner.Response() + resp.Metrics = nil // we want to ignore metrics for this test, just nil them out + metricsEqual(t, tc.expectedResponse, resp) + + if tc.expectedDiff != nil { + // call diff and get expected + diff := combiner.Diff() + diff.Metrics = nil + metricsEqual(t, tc.expectedDiff, diff) + + // call diff again and get nothing! + diff = combiner.Diff() + diff.Metrics = nil + require.Equal(t, &tempopb.QueryRangeResponse{ + Series: []*tempopb.TimeSeries{}, + }, diff) + } + }) + } +} + +func metricsEqual(t *testing.T, a, b *tempopb.QueryRangeResponse) { + t.Helper() + + slices.SortFunc(a.Series, func(a, b *tempopb.TimeSeries) int { + return strings.Compare(a.PromLabels, b.PromLabels) + }) + slices.SortFunc(b.Series, func(a, b *tempopb.TimeSeries) int { + return strings.Compare(a.PromLabels, b.PromLabels) + }) + + require.Equal(t, a, b) +} + +func timeSeries(name, val string, samples []tempopb.Sample) *tempopb.TimeSeries { + lbls := Labels{ + { + Name: name, + Value: NewStaticString(val), + }, + } + + return &tempopb.TimeSeries{ + Labels: []v1.KeyValue{{Key: name, Value: &v1.AnyValue{Value: &v1.AnyValue_StringValue{StringValue: val}}}}, + Samples: samples, + PromLabels: lbls.String(), + } +} diff --git a/pkg/traceql/engine_metrics.go b/pkg/traceql/engine_metrics.go index f5d9306cd8e..62a21c55b90 100644 --- a/pkg/traceql/engine_metrics.go +++ b/pkg/traceql/engine_metrics.go @@ -125,6 +125,10 @@ type TimeSeries struct { type SeriesSet map[string]TimeSeries func (set SeriesSet) ToProto(req *tempopb.QueryRangeRequest) []*tempopb.TimeSeries { + return set.ToProtoDiff(req, nil) +} + +func (set SeriesSet) ToProtoDiff(req *tempopb.QueryRangeRequest, rangeForLabels func(string) (uint64, uint64, bool)) []*tempopb.TimeSeries { resp := make([]*tempopb.TimeSeries, 0, len(set)) for promLabels, s := range set { @@ -138,10 +142,27 @@ func (set SeriesSet) ToProto(req *tempopb.QueryRangeRequest) []*tempopb.TimeSeri ) } - intervals := IntervalCount(req.Start, req.End, req.Step) + start, end := req.Start, req.End + include := true + if rangeForLabels != nil { + start, end, include = rangeForLabels(promLabels) + } + + if !include { + continue + } + + intervals := IntervalCount(start, end, req.Step) samples := make([]tempopb.Sample, 0, intervals) for i, value := range s.Values { ts := TimestampOf(uint64(i), req.Start, req.Step) + + // todo: this loop should be able to be restructured to directly pass over + // the desired intervals + if ts < start || ts > end { + continue + } + samples = append(samples, tempopb.Sample{ TimestampMs: time.Unix(0, int64(ts)).UnixMilli(), Value: value,