diff --git a/CHANGELOG.md b/CHANGELOG.md index 706d63a8164..3d8d73841aa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -54,6 +54,7 @@ configurable via the throughput_bytes_slo field, and it will populate op="traces * [BUGFIX] Rhythm - fix adjustment of the start and end range for livetraces blocks [#4746](https://github.com/grafana/tempo/pull/4746) (@javiermolinar) * [BUGFIX] Return the operand as the only value if the tag is already filtered in the query [#4673](https://github.com/grafana/tempo/pull/4673) (@mapno) * [BUGFIX] Fix flaky test [#4787](https://github.com/grafana/tempo/pull/4787) (@javiermolinar) +* [BUGFIX] Fix rare panic that occurred when a querier modified results from ingesters/generators while they were being marshalled to proto. [#4790](https://github.com/grafana/tempo/pull/4790) (@joe-elliott) * [BUGFIX] Fix memcached settings for docker compose example [#4346](https://github.com/grafana/tempo/pull/4695) (@ruslan-mikhailov) * [BUGFIX] Fix setting processors in user configurations overrides via API [#4741](https://github.com/grafana/tempo/pull/4741) (@ruslan-mikhailov) * [BUGFIX] Fix panic on startup [#4744](https://github.com/grafana/tempo/pull/4744) (@ruslan-mikhailov) diff --git a/modules/querier/querier.go b/modules/querier/querier.go index 2ec7f0344a5..d18ed5d071a 100644 --- a/modules/querier/querier.go +++ b/modules/querier/querier.go @@ -20,7 +20,6 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" oteltrace "go.opentelemetry.io/otel/trace" - "go.uber.org/atomic" "go.uber.org/multierr" generator_client "github.com/grafana/tempo/modules/generator/client" @@ -58,8 +57,8 @@ var ( ) type ( - forEachFn func(ctx context.Context, client tempopb.QuerierClient) error - forEachGeneratorFn func(ctx context.Context, client tempopb.MetricsGeneratorClient) error + forEachFn func(ctx context.Context, client tempopb.QuerierClient) (any, error) + forEachGeneratorFn func(ctx context.Context, client tempopb.MetricsGeneratorClient) (any, error) replicationSetFn func(r ring.ReadRing) (ring.ReplicationSet, error) ) @@ -214,7 +213,7 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque maxBytes := q.limits.MaxBytesPerTrace(userID) combiner := trace.NewCombiner(maxBytes, req.AllowPartialTrace) - mc := collector.NewMetricsCollector() + var inspectedBytes uint64 if req.QueryMode == QueryModeIngesters || req.QueryMode == QueryModeAll { var getRSFn replicationSetFn @@ -224,40 +223,45 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque return r.Get(traceKey, ring.Read, nil, nil, nil) } } - var spanCountTotal, traceCountTotal atomic.Int64 - var found atomic.Bool // get responses from all ingesters in parallel span.AddEvent("searching ingesters") - forEach := func(funcCtx context.Context, client tempopb.QuerierClient) error { - resp, err := client.FindTraceByID(funcCtx, req) - if err != nil { - return err - } - t := resp.Trace - if t != nil { - // we found a trace, consume and count it - spanCount, err := combiner.Consume(t) - if err != nil { - return err - } - spanCountTotal.Add(int64(spanCount)) - traceCountTotal.Inc() - found.Store(true) - if resp.Metrics != nil { - mc.Add(resp.Metrics.InspectedBytes) - } - } - return nil + forEach := func(funcCtx context.Context, client tempopb.QuerierClient) (any, error) { + return client.FindTraceByID(funcCtx, req) } - err := q.forIngesterRings(ctx, userID, getRSFn, forEach) + partialTraces, err := q.forIngesterRings(ctx, userID, getRSFn, forEach) if err != nil { return nil, fmt.Errorf("error querying ingesters in Querier.FindTraceByID: %w", err) } + + var spanCountTotal, traceCountTotal int64 + found := false + + for _, partialTrace := range partialTraces { + resp := partialTrace.(*tempopb.TraceByIDResponse) + if resp.Trace == nil { + continue + } + + found = true + + spanCount, err := combiner.Consume(resp.Trace) + if err != nil { + return nil, fmt.Errorf("error combining ingester results in Querier.FindTraceByID: %w", err) + } + + spanCountTotal += int64(spanCount) + traceCountTotal++ + if resp.Metrics != nil { + inspectedBytes += resp.Metrics.InspectedBytes + } + } + span.AddEvent("done searching ingesters", oteltrace.WithAttributes( - attribute.Bool("found", found.Load()), - attribute.Int64("combinedSpans", spanCountTotal.Load()), - attribute.Int64("combinedTraces", traceCountTotal.Load()))) + attribute.Bool("found", found), + attribute.Int64("combinedSpans", spanCountTotal), + attribute.Int64("combinedTraces", traceCountTotal), + )) } if req.QueryMode == QueryModeBlocks || req.QueryMode == QueryModeAll { @@ -291,7 +295,7 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque return nil, err } if partialTrace.Metrics != nil { - mc.Add(partialTrace.Metrics.InspectedBytes) + inspectedBytes += partialTrace.Metrics.InspectedBytes } } } @@ -299,7 +303,7 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque completeTrace, _ := combiner.Result() resp := &tempopb.TraceByIDResponse{ Trace: completeTrace, - Metrics: &tempopb.TraceByIDMetrics{InspectedBytes: mc.TotalValue()}, + Metrics: &tempopb.TraceByIDMetrics{InspectedBytes: inspectedBytes}, } if combiner.IsPartialTrace() { @@ -311,15 +315,15 @@ func (q *Querier) FindTraceByID(ctx context.Context, req *tempopb.TraceByIDReque } // forIngesterRings runs f, in parallel, for given ingesters -func (q *Querier) forIngesterRings(ctx context.Context, userID string, getReplicationSet replicationSetFn, f forEachFn) error { +func (q *Querier) forIngesterRings(ctx context.Context, userID string, getReplicationSet replicationSetFn, f forEachFn) ([]any, error) { if ctx.Err() != nil { _ = level.Debug(log.Logger).Log("forIngesterRings context error", "ctx.Err()", ctx.Err().Error()) - return ctx.Err() + return nil, ctx.Err() } // if we have no configured ingester rings this will fail silently. let's return an actual error instead if len(q.ingesterRings) == 0 { - return errors.New("forIngesterRings: no ingester rings configured") + return nil, errors.New("forIngesterRings: no ingester rings configured") } // if a nil replicationSetFn is passed, that means to just use a standard Read ring @@ -332,7 +336,8 @@ func (q *Querier) forIngesterRings(ctx context.Context, userID string, getReplic var mtx sync.Mutex var wg sync.WaitGroup - var responseErr error + var overallErr error + var overallResults []any for i, ingesterRing := range q.ingesterRings { if q.cfg.ShuffleShardingIngestersEnabled { @@ -346,34 +351,36 @@ func (q *Querier) forIngesterRings(ctx context.Context, userID string, getReplic replicationSet, err := getReplicationSet(ingesterRing) if err != nil { - return fmt.Errorf("forIngesterRings: error getting replication set for ring (%d): %w", i, err) + return nil, fmt.Errorf("forIngesterRings: error getting replication set for ring (%d): %w", i, err) } pool := q.ingesterPools[i] wg.Add(1) go func() { defer wg.Done() - err := forOneIngesterRing(ctx, replicationSet, f, pool, q.cfg.ExtraQueryDelay) + results, err := forOneIngesterRing(ctx, replicationSet, f, pool, q.cfg.ExtraQueryDelay) mtx.Lock() defer mtx.Unlock() if err != nil { - responseErr = multierr.Combine(responseErr, err) + overallErr = multierr.Combine(overallErr, err) return } + + overallResults = append(overallResults, results...) }() } wg.Wait() - if responseErr != nil { - return responseErr + if overallErr != nil { + return nil, overallErr } - return nil + return overallResults, nil } -func forOneIngesterRing(ctx context.Context, replicationSet ring.ReplicationSet, f forEachFn, pool *ring_client.Pool, extraQueryDelay time.Duration) error { +func forOneIngesterRing(ctx context.Context, replicationSet ring.ReplicationSet, f forEachFn, pool *ring_client.Pool, extraQueryDelay time.Duration) ([]any, error) { ctx, span := tracer.Start(ctx, "Querier.forOneIngesterRing") defer span.End() @@ -388,29 +395,22 @@ func forOneIngesterRing(ctx context.Context, replicationSet ring.ReplicationSet, return nil, fmt.Errorf("failed to get client for %s: %w", ingester.Addr, err) } - err = f(funcCtx, client.(tempopb.QuerierClient)) + result, err := f(funcCtx, client.(tempopb.QuerierClient)) if err != nil { return nil, fmt.Errorf("failed to execute f() for %s: %w", ingester.Addr, err) } - // we are returning the empty response here because response is collected by - // the collector inside forEachFn - return nil, nil + return result, nil } - // ignore response because it's nil, and we are using a collector inside forEachFn to - // collect the actual response. we need to return nil here and ignore it - // because doFunc expects us to return a response - _, err := replicationSet.Do(ctx, extraQueryDelay, doFunc) - - return err + return replicationSet.Do(ctx, extraQueryDelay, doFunc) } // forGivenGenerators runs f, in parallel, for given generators -func (q *Querier) forGivenGenerators(ctx context.Context, replicationSet ring.ReplicationSet, f forEachGeneratorFn) error { +func (q *Querier) forGivenGenerators(ctx context.Context, replicationSet ring.ReplicationSet, f forEachGeneratorFn) ([]any, error) { if ctx.Err() != nil { _ = level.Debug(log.Logger).Log("foreGivenGenerators context error", "ctx.Err()", ctx.Err().Error()) - return ctx.Err() + return nil, ctx.Err() } ctx, span := tracer.Start(ctx, "Querier.forGivenGenerators") @@ -427,49 +427,33 @@ func (q *Querier) forGivenGenerators(ctx context.Context, replicationSet ring.Re return nil, fmt.Errorf("failed to get client for %s: %w", generator.Addr, err) } - err = f(funcCtx, client.(tempopb.MetricsGeneratorClient)) + result, err := f(funcCtx, client.(tempopb.MetricsGeneratorClient)) if err != nil { return nil, fmt.Errorf("failed to execute f() for %s: %w", generator.Addr, err) } - // we are returning the empty response here because response is collected by - // the collector inside forEachGeneratorFn - return nil, nil + return result, nil } - // ignore response because it's nil, and we are using a collector inside forEachGeneratorFn to - // collect the actual response. we need to return nil here and ignore it - // because doFunc expects us to return a response - _, err := replicationSet.Do(ctx, q.cfg.ExtraQueryDelay, doFunc) + results, err := replicationSet.Do(ctx, q.cfg.ExtraQueryDelay, doFunc) if err != nil { - return fmt.Errorf("failed to get response from generators: %w", err) + return nil, fmt.Errorf("failed to get response from generators: %w", err) } - return nil + return results, nil } func (q *Querier) SearchRecent(ctx context.Context, req *tempopb.SearchRequest) (*tempopb.SearchResponse, error) { userID, err := user.ExtractOrgID(ctx) if err != nil { - return nil, fmt.Errorf("error extracting org id in Querier.Search: %w", err) + return nil, fmt.Errorf("error extracting org id in Querier.SearchRecent: %w", err) } - var results []*tempopb.SearchResponse - mtx := sync.Mutex{} - - forEach := func(ctx context.Context, client tempopb.QuerierClient) error { - resp, err := client.SearchRecent(ctx, req) - if err != nil { - return err - } - mtx.Lock() - defer mtx.Unlock() - results = append(results, resp) - return nil - } - err = q.forIngesterRings(ctx, userID, nil, forEach) + results, err := q.forIngesterRings(ctx, userID, nil, func(ctx context.Context, client tempopb.QuerierClient) (any, error) { + return client.SearchRecent(ctx, req) + }) if err != nil { - return nil, fmt.Errorf("error querying ingesters in Querier.Search: %w", err) + return nil, fmt.Errorf("error querying ingesters in Querier.SearchRecent: %w", err) } return q.postProcessIngesterSearchResults(req, results), nil @@ -524,29 +508,28 @@ func (q *Querier) SearchTags(ctx context.Context, req *tempopb.SearchTagsRequest maxDataSize := q.limits.MaxBytesPerTagValuesQuery(userID) distinctValues := collector.NewDistinctString(maxDataSize, req.MaxTagsPerScope, req.StaleValuesThreshold) - mc := collector.NewMetricsCollector() + var inspectedBytes uint64 - forEach := func(ctx context.Context, client tempopb.QuerierClient) error { - resp, err := client.SearchTags(ctx, req) - if err != nil { - return err - } - // collect metrics first because we stop early with return + results, err := q.forIngesterRings(ctx, userID, nil, func(ctx context.Context, client tempopb.QuerierClient) (any, error) { + return client.SearchTags(ctx, req) + }) + if err != nil { + return nil, fmt.Errorf("error querying ingesters in Querier.SearchTags: %w", err) + } + +outer: + for _, result := range results { + resp := result.(*tempopb.SearchTagsResponse) if resp.Metrics != nil { - mc.Add(resp.Metrics.InspectedBytes) + inspectedBytes += resp.Metrics.InspectedBytes } for _, tag := range resp.TagNames { distinctValues.Collect(tag) if distinctValues.Exceeded() { - return nil // stop early + break outer } } - return nil - } - err = q.forIngesterRings(ctx, userID, nil, forEach) - if err != nil { - return nil, fmt.Errorf("error querying ingesters in Querier.SearchTags: %w", err) } if distinctValues.Exceeded() { @@ -555,7 +538,7 @@ func (q *Querier) SearchTags(ctx context.Context, req *tempopb.SearchTagsRequest return &tempopb.SearchTagsResponse{ TagNames: distinctValues.Strings(), - Metrics: &tempopb.MetadataMetrics{InspectedBytes: mc.TotalValue()}, + Metrics: &tempopb.MetadataMetrics{InspectedBytes: inspectedBytes}, }, nil } @@ -567,32 +550,30 @@ func (q *Querier) SearchTagsV2(ctx context.Context, req *tempopb.SearchTagsReque maxBytesPerTag := q.limits.MaxBytesPerTagValuesQuery(orgID) distinctValues := collector.NewScopedDistinctString(maxBytesPerTag, req.MaxTagsPerScope, req.StaleValuesThreshold) - mc := collector.NewMetricsCollector() + var inspectedBytes uint64 // Get results from all ingesters - forEach := func(ctx context.Context, client tempopb.QuerierClient) error { - resp, err := client.SearchTagsV2(ctx, req) - if err != nil { - return err - } - // collect metrics first because we stop early with return + results, err := q.forIngesterRings(ctx, orgID, nil, func(ctx context.Context, client tempopb.QuerierClient) (any, error) { + return client.SearchTagsV2(ctx, req) + }) + if err != nil { + return nil, fmt.Errorf("error querying ingesters in Querier.SearchTags: %w", err) + } + +outer: + for _, result := range results { + resp := result.(*tempopb.SearchTagsV2Response) if resp.Metrics != nil { - mc.Add(resp.Metrics.InspectedBytes) + inspectedBytes += resp.Metrics.InspectedBytes } for _, res := range resp.Scopes { for _, tag := range res.Tags { if distinctValues.Collect(res.Name, tag) { - return nil + break outer } } } - return nil - } - - err = q.forIngesterRings(ctx, orgID, nil, forEach) - if err != nil { - return nil, fmt.Errorf("error querying ingesters in Querier.SearchTags: %w", err) } if distinctValues.Exceeded() { @@ -602,7 +583,7 @@ func (q *Querier) SearchTagsV2(ctx context.Context, req *tempopb.SearchTagsReque collected := distinctValues.Strings() resp := &tempopb.SearchTagsV2Response{ Scopes: make([]*tempopb.SearchTagsV2Scope, 0, len(collected)), - Metrics: &tempopb.MetadataMetrics{InspectedBytes: mc.TotalValue()}, // send metrics with response + Metrics: &tempopb.MetadataMetrics{InspectedBytes: inspectedBytes}, } for scope, vals := range collected { resp.Scopes = append(resp.Scopes, &tempopb.SearchTagsV2Scope{ @@ -622,7 +603,7 @@ func (q *Querier) SearchTagValues(ctx context.Context, req *tempopb.SearchTagVal maxDataSize := q.limits.MaxBytesPerTagValuesQuery(userID) distinctValues := collector.NewDistinctString(maxDataSize, req.MaxTagValues, req.StaleValueThreshold) - mc := collector.NewMetricsCollector() + var inspectedBytes uint64 // Virtual tags values. Get these first. for _, v := range search.GetVirtualTagValues(req.TagName) { @@ -630,28 +611,26 @@ func (q *Querier) SearchTagValues(ctx context.Context, req *tempopb.SearchTagVal distinctValues.Collect(v) } - forEach := func(ctx context.Context, client tempopb.QuerierClient) error { - resp, err := client.SearchTagValues(ctx, req) - if err != nil { - return err - } - // add metrics first because we stop early with return + results, err := q.forIngesterRings(ctx, userID, nil, func(ctx context.Context, client tempopb.QuerierClient) (any, error) { + return client.SearchTagValues(ctx, req) + }) + if err != nil { + return nil, fmt.Errorf("error querying ingesters in Querier.SearchTagValues: %w", err) + } + +outer: + for _, result := range results { + resp := result.(*tempopb.SearchTagValuesResponse) if resp.Metrics != nil { - mc.Add(resp.Metrics.InspectedBytes) + inspectedBytes += resp.Metrics.InspectedBytes } for _, res := range resp.TagValues { distinctValues.Collect(res) if distinctValues.Exceeded() { - return nil + break outer } } - return nil - } - - err = q.forIngesterRings(ctx, userID, nil, forEach) - if err != nil { - return nil, fmt.Errorf("error querying ingesters in Querier.SearchTagValues: %w", err) } if distinctValues.Exceeded() { @@ -660,7 +639,7 @@ func (q *Querier) SearchTagValues(ctx context.Context, req *tempopb.SearchTagVal return &tempopb.SearchTagValuesResponse{ TagValues: distinctValues.Strings(), - Metrics: &tempopb.MetadataMetrics{InspectedBytes: mc.TotalValue()}, + Metrics: &tempopb.MetadataMetrics{InspectedBytes: inspectedBytes}, }, nil } @@ -672,7 +651,7 @@ func (q *Querier) SearchTagValuesV2(ctx context.Context, req *tempopb.SearchTagV maxDataSize := q.limits.MaxBytesPerTagValuesQuery(userID) distinctValues := collector.NewDistinctValue(maxDataSize, req.MaxTagValues, req.StaleValueThreshold, func(v tempopb.TagValue) int { return len(v.Type) + len(v.Value) }) - mc := collector.NewMetricsCollector() + var inspectedBytes uint64 // Virtual tags values. Get these first. virtualVals := search.GetVirtualTagValuesV2(req.TagName) @@ -688,48 +667,36 @@ func (q *Querier) SearchTagValuesV2(ctx context.Context, req *tempopb.SearchTagV return valuesToV2Response(distinctValues, 0), nil } - forEach := func(ctx context.Context, client tempopb.QuerierClient) error { - // combine metrics as we get results from ingesters - resp, err := client.SearchTagValuesV2(ctx, req) - if err != nil { - return err - } - // collect metrics first, we stop early with return + results, err := q.forIngesterRings(ctx, userID, nil, func(ctx context.Context, client tempopb.QuerierClient) (any, error) { + return client.SearchTagValuesV2(ctx, req) + }) + if err != nil { + return nil, fmt.Errorf("error querying ingesters in Querier.SearchTagValues: %w", err) + } + +outer: + for _, result := range results { + resp := result.(*tempopb.SearchTagValuesV2Response) if resp.Metrics != nil { - mc.Add(resp.Metrics.InspectedBytes) + inspectedBytes += resp.Metrics.InspectedBytes } for _, res := range resp.TagValues { distinctValues.Collect(*res) if distinctValues.Exceeded() { - return nil // stop early + break outer } } - return nil - } - err = q.forIngesterRings(ctx, userID, nil, forEach) - if err != nil { - return nil, fmt.Errorf("error querying ingesters in Querier.SearchTagValues: %w", err) } if distinctValues.Exceeded() { _ = level.Warn(log.Logger).Log("msg", "Search of tag values exceeded limit, reduce cardinality or size of tags", "tag", req.TagName, "orgID", userID, "stopReason", distinctValues.StopReason()) } - return valuesToV2Response(distinctValues, mc.TotalValue()), nil + return valuesToV2Response(distinctValues, inspectedBytes), nil } -func (q *Querier) SpanMetricsSummary( - ctx context.Context, - req *tempopb.SpanMetricsSummaryRequest, -) (*tempopb.SpanMetricsSummaryResponse, error) { - // userID, err := user.ExtractOrgID(ctx) - // if err != nil { - // return nil, errors.Wrap(err, "error extracting org id in Querier.SpanMetricsSummary") - // } - - // limit := q.limits.MaxBytesPerTagValuesQuery(userID) - +func (q *Querier) SpanMetricsSummary(ctx context.Context, req *tempopb.SpanMetricsSummaryRequest) (*tempopb.SpanMetricsSummaryResponse, error) { genReq := &tempopb.SpanMetricsRequest{ Query: req.Query, GroupBy: req.GroupBy, @@ -744,21 +711,9 @@ func (q *Querier) SpanMetricsSummary( return nil, fmt.Errorf("error finding generators in Querier.SpanMetricsSummary: %w", err) } - var results []*tempopb.SpanMetricsResponse - mtx := sync.Mutex{} - - forEach := func(ctx context.Context, client tempopb.MetricsGeneratorClient) error { - resp, err := client.GetMetrics(ctx, genReq) - if err != nil { - return err - } - // collect the results from the generators in the pool - mtx.Lock() - defer mtx.Unlock() - results = append(results, resp) - return nil - } - err = q.forGivenGenerators(ctx, replicationSet, forEach) + results, err := q.forGivenGenerators(ctx, replicationSet, func(ctx context.Context, client tempopb.MetricsGeneratorClient) (any, error) { + return client.GetMetrics(ctx, genReq) + }) if err != nil { return nil, fmt.Errorf("error querying generators in Querier.SpanMetricsSummary: %w", err) } @@ -769,7 +724,9 @@ func (q *Querier) SpanMetricsSummary( var h *traceqlmetrics.LatencyHistogram var s traceqlmetrics.MetricSeries - for _, r := range results { + for _, result := range results { + r := result.(*tempopb.SpanMetricsResponse) + for _, m := range r.Metrics { s = protoToMetricSeries(m.Series) k := s.MetricKeys() @@ -947,10 +904,10 @@ func (q *Querier) internalTagsSearchBlockV2(ctx context.Context, req *tempopb.Se } valueCollector := collector.NewScopedDistinctString(q.limits.MaxBytesPerTagValuesQuery(tenantID), req.MaxTagsPerScope, req.StaleValueThreshold) - mc := collector.NewMetricsCollector() + var inspectedBytes uint64 fetcher := traceql.NewTagNamesFetcherWrapper(func(ctx context.Context, req traceql.FetchTagsRequest, cb traceql.FetchTagsCallback) error { - return q.store.FetchTagNames(ctx, meta, req, cb, mc.Add, common.DefaultSearchOptions()) + return q.store.FetchTagNames(ctx, meta, req, cb, func(bytesRead uint64) { inspectedBytes += bytesRead }, common.DefaultSearchOptions()) }) scope := traceql.AttributeScopeFromString(req.SearchReq.Scope) @@ -972,7 +929,7 @@ func (q *Querier) internalTagsSearchBlockV2(ctx context.Context, req *tempopb.Se scopedVals := valueCollector.Strings() resp := &tempopb.SearchTagsV2Response{ Scopes: make([]*tempopb.SearchTagsV2Scope, 0, len(scopedVals)), - Metrics: &tempopb.MetadataMetrics{InspectedBytes: mc.TotalValue()}, // send metrics with response + Metrics: &tempopb.MetadataMetrics{InspectedBytes: inspectedBytes}, } for scope, vals := range scopedVals { resp.Scopes = append(resp.Scopes, &tempopb.SearchTagsV2Scope{ @@ -1082,10 +1039,10 @@ func (q *Querier) internalTagValuesSearchBlockV2(ctx context.Context, req *tempo req.SearchReq.MaxTagValues, req.SearchReq.StaleValueThreshold, func(v tempopb.TagValue) int { return len(v.Type) + len(v.Value) }) - mc := collector.NewMetricsCollector() + var inspectedBytes uint64 fetcher := traceql.NewTagValuesFetcherWrapper(func(ctx context.Context, req traceql.FetchTagValuesRequest, cb traceql.FetchTagValuesCallback) error { - return q.store.FetchTagValues(ctx, meta, req, cb, mc.Add, opts) + return q.store.FetchTagValues(ctx, meta, req, cb, func(bytesRead uint64) { inspectedBytes += bytesRead }, opts) }) err = q.engine.ExecuteTagValues(ctx, tag, query, traceql.MakeCollectTagValueFunc(valueCollector.Collect), fetcher) @@ -1097,17 +1054,19 @@ func (q *Querier) internalTagValuesSearchBlockV2(ctx context.Context, req *tempo level.Warn(log.Logger).Log("msg", "Search tags exceeded limit, reduce cardinality or size of tags", "orgID", tenantID, "stopReason", valueCollector.StopReason()) } - return valuesToV2Response(valueCollector, mc.TotalValue()), nil + return valuesToV2Response(valueCollector, inspectedBytes), nil } -func (q *Querier) postProcessIngesterSearchResults(req *tempopb.SearchRequest, results []*tempopb.SearchResponse) *tempopb.SearchResponse { +func (q *Querier) postProcessIngesterSearchResults(req *tempopb.SearchRequest, results []any) *tempopb.SearchResponse { response := &tempopb.SearchResponse{ Metrics: &tempopb.SearchMetrics{}, } traces := map[string]*tempopb.TraceSearchMetadata{} - for _, sr := range results { + for _, result := range results { + sr := result.(*tempopb.SearchResponse) + for _, t := range sr.Traces { // Just simply take first result for each trace if _, ok := traces[t.TraceID]; !ok { diff --git a/modules/querier/querier_query_range.go b/modules/querier/querier_query_range.go index 517a54799a7..2eb95045029 100644 --- a/modules/querier/querier_query_range.go +++ b/modules/querier/querier_query_range.go @@ -3,7 +3,6 @@ package querier import ( "context" "fmt" - "sync" "time" "github.com/go-kit/log/level" @@ -26,7 +25,7 @@ func (q *Querier) QueryRange(ctx context.Context, req *tempopb.QueryRangeRequest } func (q *Querier) queryRangeRecent(ctx context.Context, req *tempopb.QueryRangeRequest) (*tempopb.QueryRangeResponse, error) { - // // Get results from all generators + // Get results from all generators replicationSet, err := q.generatorRing.GetReplicationSetForOperation(ring.Read) if err != nil { return nil, fmt.Errorf("error finding generators in Querier.queryRangeRecent: %w", err) @@ -37,23 +36,19 @@ func (q *Querier) queryRangeRecent(ctx context.Context, req *tempopb.QueryRangeR return nil, err } - mtx := sync.Mutex{} // combiner doesn't lock, so take lock before calling Combine to make is safe - forEach := func(ctx context.Context, client tempopb.MetricsGeneratorClient) error { - resp, err := client.QueryRange(ctx, req) - if err != nil { - return err - } - mtx.Lock() - defer mtx.Unlock() - c.Combine(resp) - return nil - } - err = q.forGivenGenerators(ctx, replicationSet, forEach) + results, err := q.forGivenGenerators(ctx, replicationSet, func(ctx context.Context, client tempopb.MetricsGeneratorClient) (any, error) { + return client.QueryRange(ctx, req) + }) if err != nil { _ = level.Error(log.Logger).Log("msg", "error querying generators in Querier.queryRangeRecent", "err", err) return nil, fmt.Errorf("error querying generators in Querier.queryRangeRecent: %w", err) } + for _, result := range results { + resp := result.(*tempopb.QueryRangeResponse) + c.Combine(resp) + } + return c.Response(), nil }