Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* [FEATURE] TraecQL support for event attributes [#3708](https://github.com/grafana/tempo/pull/3748) (@ie-pham)
* [FEATURE] Flush and query RF1 blocks for TraceQL metric queries [#3628](https://github.com/grafana/tempo/pull/3628) [#3691](https://github.com/grafana/tempo/pull/3691) [#3723](https://github.com/grafana/tempo/pull/3723) (@mapno)
* [FEATURE] Add new compare() metrics function [#3695](https://github.com/grafana/tempo/pull/3695) (@mdisibio)
* [FEATURE] Add new api `/api/metrics/query` for instant metrics queries [#3859](https://github.com/grafana/tempo/pull/3859) (@mdisibio)
* [FEATURE] Add a `q` parameter to `/api/v2/serach/tags` for tag name filtering [#3822](https://github.com/grafana/tempo/pull/3822) (@joe-elliott)
* [ENHANCEMENT] Tag value lookup use protobuf internally for improved latency [#3731](https://github.com/grafana/tempo/pull/3731) (@mdisibio)
* [ENHANCEMENT] TraceQL metrics queries use protobuf internally for improved latency [#3745](https://github.com/grafana/tempo/pull/3745) (@mdisibio)
Expand Down
1 change: 1 addition & 0 deletions cmd/tempo/app/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ func (t *App) initQueryFrontend() (services.Service, error) {

// http metrics endpoints
t.Server.HTTPRouter().Handle(addHTTPAPIPrefix(&t.cfg, api.PathSpanMetricsSummary), base.Wrap(queryFrontend.MetricsSummaryHandler))
t.Server.HTTPRouter().Handle(addHTTPAPIPrefix(&t.cfg, api.PathMetricsQueryInstant), base.Wrap(queryFrontend.MetricsQueryInstantHandler))
t.Server.HTTPRouter().Handle(addHTTPAPIPrefix(&t.cfg, api.PathMetricsQueryRange), base.Wrap(queryFrontend.MetricsQueryRangeHandler))

// the query frontend needs to have knowledge of the blocks so it can shard search jobs
Expand Down
38 changes: 20 additions & 18 deletions modules/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,16 @@ type (
)

type QueryFrontend struct {
TraceByIDHandler, SearchHandler, MetricsSummaryHandler, MetricsQueryRangeHandler http.Handler
SearchTagsHandler, SearchTagsV2Handler, SearchTagsValuesHandler, SearchTagsValuesV2Handler http.Handler
cacheProvider cache.Provider
streamingSearch streamingSearchHandler
streamingTags streamingTagsHandler
streamingTagsV2 streamingTagsV2Handler
streamingTagValues streamingTagValuesHandler
streamingTagValuesV2 streamingTagValuesV2Handler
streamingQueryRange streamingQueryRangeHandler
logger log.Logger
TraceByIDHandler, SearchHandler, MetricsSummaryHandler, MetricsQueryInstantHandler, MetricsQueryRangeHandler http.Handler
SearchTagsHandler, SearchTagsV2Handler, SearchTagsValuesHandler, SearchTagsValuesV2Handler http.Handler
cacheProvider cache.Provider
streamingSearch streamingSearchHandler
streamingTags streamingTagsHandler
streamingTagsV2 streamingTagsV2Handler
streamingTagValues streamingTagValuesHandler
streamingTagValuesV2 streamingTagValuesV2Handler
streamingQueryRange streamingQueryRangeHandler
logger log.Logger
}

// New returns a new QueryFrontend
Expand Down Expand Up @@ -140,18 +140,20 @@ func New(cfg Config, next http.RoundTripper, o overrides.Interface, reader tempo
searchTagValues := newTagHTTPHandler(cfg, searchTagValuesPipeline, o, combiner.NewSearchTagValues, logger)
searchTagValuesV2 := newTagHTTPHandler(cfg, searchTagValuesPipeline, o, combiner.NewSearchTagValuesV2, logger)
metrics := newMetricsSummaryHandler(metricsPipeline, logger)
queryInstant := newMetricsQueryInstantHTTPHandler(cfg, queryRangePipeline, logger) // Reuses the same pipeline
queryrange := newMetricsQueryRangeHTTPHandler(cfg, queryRangePipeline, logger)

return &QueryFrontend{
// http/discrete
TraceByIDHandler: newHandler(cfg.Config.LogQueryRequestHeaders, traces, logger),
SearchHandler: newHandler(cfg.Config.LogQueryRequestHeaders, search, logger),
SearchTagsHandler: newHandler(cfg.Config.LogQueryRequestHeaders, searchTags, logger),
SearchTagsV2Handler: newHandler(cfg.Config.LogQueryRequestHeaders, searchTagsV2, logger),
SearchTagsValuesHandler: newHandler(cfg.Config.LogQueryRequestHeaders, searchTagValues, logger),
SearchTagsValuesV2Handler: newHandler(cfg.Config.LogQueryRequestHeaders, searchTagValuesV2, logger),
MetricsSummaryHandler: newHandler(cfg.Config.LogQueryRequestHeaders, metrics, logger),
MetricsQueryRangeHandler: newHandler(cfg.Config.LogQueryRequestHeaders, queryrange, logger),
TraceByIDHandler: newHandler(cfg.Config.LogQueryRequestHeaders, traces, logger),
SearchHandler: newHandler(cfg.Config.LogQueryRequestHeaders, search, logger),
SearchTagsHandler: newHandler(cfg.Config.LogQueryRequestHeaders, searchTags, logger),
SearchTagsV2Handler: newHandler(cfg.Config.LogQueryRequestHeaders, searchTagsV2, logger),
SearchTagsValuesHandler: newHandler(cfg.Config.LogQueryRequestHeaders, searchTagValues, logger),
SearchTagsValuesV2Handler: newHandler(cfg.Config.LogQueryRequestHeaders, searchTagValuesV2, logger),
MetricsSummaryHandler: newHandler(cfg.Config.LogQueryRequestHeaders, metrics, logger),
MetricsQueryInstantHandler: newHandler(cfg.Config.LogQueryRequestHeaders, queryInstant, logger),
MetricsQueryRangeHandler: newHandler(cfg.Config.LogQueryRequestHeaders, queryrange, logger),

// grpc/streaming
streamingSearch: newSearchStreamingGRPCHandler(cfg, searchPipeline, apiPrefix, logger),
Expand Down
171 changes: 171 additions & 0 deletions modules/frontend/metrics_query_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
package frontend

import (
"fmt"
"io"
"net/http"
"strings"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gogo/protobuf/jsonpb"
"github.com/grafana/dskit/user"
"github.com/grafana/tempo/modules/frontend/combiner"
"github.com/grafana/tempo/modules/frontend/pipeline"
"github.com/grafana/tempo/pkg/api"
"github.com/grafana/tempo/pkg/tempopb"
)

// newMetricsQueryInstantHTTPHandler handles instant queries. Internally these are rewritten as query_range with single step
// to make use of the existing pipeline.
func newMetricsQueryInstantHTTPHandler(cfg Config, next pipeline.AsyncRoundTripper[combiner.PipelineResponse], logger log.Logger) http.RoundTripper {
Comment thread
mdisibio marked this conversation as resolved.
postSLOHook := metricsSLOPostHook(cfg.Metrics.SLO)

return pipeline.RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
tenant, _ := user.ExtractOrgID(req.Context())
start := time.Now()

// Parse request
i, err := api.ParseQueryInstantRequest(req)
if err != nil {
level.Error(logger).Log("msg", "query instant: parse search request failed", "err", err)
return &http.Response{
StatusCode: http.StatusBadRequest,
Status: http.StatusText(http.StatusBadRequest),
Body: io.NopCloser(strings.NewReader(err.Error())),
}, nil
}

logQueryInstantRequest(logger, tenant, i)

// --------------------------------------------------
// Rewrite into a query_range request.
// --------------------------------------------------
qr := &tempopb.QueryRangeRequest{
Query: i.Query,
Start: i.Start,
End: i.End,
Step: i.End - i.Start,
}

// Clone existing to keep it unaltered.
req = req.Clone(req.Context())
req.URL.Path = strings.ReplaceAll(req.URL.Path, api.PathMetricsQueryInstant, api.PathMetricsQueryRange)
req = api.BuildQueryRangeRequest(req, qr)

combiner, err := combiner.NewTypedQueryRange(qr, false)
if err != nil {
level.Error(logger).Log("msg", "query instant: query range combiner failed", "err", err)
return &http.Response{
StatusCode: http.StatusInternalServerError,
Status: http.StatusText(http.StatusInternalServerError),
Body: io.NopCloser(strings.NewReader(err.Error())),
}, nil
}
rt := pipeline.NewHTTPCollector(next, cfg.ResponseConsumers, combiner)

// Roundtrip the request and look for intermediate failures
innerResp, err := rt.RoundTrip(req)
if err != nil {
return nil, err
}
if innerResp != nil && innerResp.StatusCode != http.StatusOK {
return innerResp, nil
}

// --------------------------------------------------
// Get the final data and translate to instant.
// --------------------------------------------------
qrResp, err := combiner.GRPCFinal()
if err != nil {
return nil, err
}

iResp := &tempopb.QueryInstantRespone{
Metrics: qrResp.Metrics,
}
for _, series := range qrResp.Series {
if len(series.Samples) == 0 {
continue
}
// Use first value
iResp.Series = append(iResp.Series, &tempopb.InstantSeries{
Labels: series.Labels,
PromLabels: series.PromLabels,
Value: series.Samples[0].Value,
})
}

bodyString, err := new(jsonpb.Marshaler).MarshalToString(iResp)
if err != nil {
return nil, fmt.Errorf("error marshalling response body: %w", err)
}

resp := &http.Response{
StatusCode: combiner.StatusCode(),
Header: http.Header{
api.HeaderContentType: {api.HeaderAcceptJSON},
},
Body: io.NopCloser(strings.NewReader(bodyString)),
ContentLength: int64(len([]byte(bodyString))),
}

duration := time.Since(start)
var bytesProcessed uint64
if iResp.Metrics != nil {
bytesProcessed = iResp.Metrics.InspectedBytes
}
postSLOHook(resp, tenant, bytesProcessed, duration, err)
logQueryInstantResult(logger, tenant, duration.Seconds(), i, iResp, err)

return resp, nil
})
}

func logQueryInstantResult(logger log.Logger, tenantID string, durationSeconds float64, req *tempopb.QueryInstantRequest, resp *tempopb.QueryInstantRespone, err error) {
if resp == nil {
level.Info(logger).Log(
"msg", "query instant results - no resp",
"tenant", tenantID,
"duration_seconds", durationSeconds,
"error", err)

return
}

if resp.Metrics == nil {
level.Info(logger).Log(
"msg", "query instant results - no metrics",
"tenant", tenantID,
"query", req.Query,
"range_nanos", req.End-req.Start,
"duration_seconds", durationSeconds,
"error", err)
return
}

level.Info(logger).Log(
"msg", "query instant results",
"tenant", tenantID,
"query", req.Query,
"range_nanos", req.End-req.Start,
"duration_seconds", durationSeconds,
"request_throughput", float64(resp.Metrics.InspectedBytes)/durationSeconds,
"total_requests", resp.Metrics.TotalJobs,
"total_blockBytes", resp.Metrics.TotalBlockBytes,
"total_blocks", resp.Metrics.TotalBlocks,
"completed_requests", resp.Metrics.CompletedJobs,
"inspected_bytes", resp.Metrics.InspectedBytes,
"inspected_traces", resp.Metrics.InspectedTraces,
"inspected_spans", resp.Metrics.InspectedSpans,
"error", err)
}

func logQueryInstantRequest(logger log.Logger, tenantID string, req *tempopb.QueryInstantRequest) {
level.Info(logger).Log(
"msg", "query instant request",
"tenant", tenantID,
"query", req.Query,
"range_seconds", req.End-req.Start)
}
Loading