Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion modules/frontend/combiner/metrics_query_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func NewQueryRange(req *tempopb.QueryRangeRequest, trackDiffs bool) (Combiner, e
httpStatusCode: 200,
new: func() *tempopb.QueryRangeResponse { return &tempopb.QueryRangeResponse{} },
current: &tempopb.QueryRangeResponse{Metrics: &tempopb.SearchMetrics{}},
combine: func(partial *tempopb.QueryRangeResponse, _ *tempopb.QueryRangeResponse, resp PipelineResponse) error {
combine: func(partial *tempopb.QueryRangeResponse, _ *tempopb.QueryRangeResponse, _ PipelineResponse) error {
if partial.Metrics != nil {
// this is a coordination between the sharder and combiner. the sharder returns one response with summary metrics
// only. the combiner correctly takes and accumulates that job. however, if the response has no jobs this is
Expand Down
32 changes: 20 additions & 12 deletions modules/frontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,27 @@ package frontend

import (
"flag"
"net/http"
"time"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/tempo/modules/frontend/transport"
"github.com/grafana/tempo/modules/frontend/pipeline"
v1 "github.com/grafana/tempo/modules/frontend/v1"
"github.com/grafana/tempo/pkg/usagestats"
)

var statVersion = usagestats.NewString("frontend_version")

type Config struct {
Config v1.Config `yaml:",inline"`
MaxRetries int `yaml:"max_retries,omitempty"`
Search SearchConfig `yaml:"search"`
TraceByID TraceByIDConfig `yaml:"trace_by_id"`
Metrics MetricsConfig `yaml:"metrics"`
MultiTenantQueriesEnabled bool `yaml:"multi_tenant_queries_enabled"`
ResponseConsumers int `yaml:"response_consumers"`

Config v1.Config `yaml:",inline"`
MaxRetries int `yaml:"max_retries,omitempty"`
Search SearchConfig `yaml:"search"`
TraceByID TraceByIDConfig `yaml:"trace_by_id"`
Metrics MetricsConfig `yaml:"metrics"`
MultiTenantQueriesEnabled bool `yaml:"multi_tenant_queries_enabled"`
ResponseConsumers int `yaml:"response_consumers"`
Weights pipeline.WeightsConfig `yaml:"weights"`
// the maximum time limit that tempo will work on an api request. this includes both
// grpc and http requests and applies to all "api" frontend query endpoints such as
// traceql, tag search, tag value search, trace by id and all streaming gRPC endpoints.
Expand All @@ -32,6 +31,9 @@ type Config struct {

// A list of regexes for black listing requests, these will apply for every request regardless the endpoint
URLDenyList []string `yaml:"url_deny_list,omitempty"`

RequestWithWeights bool `yaml:"request_with_weights,omitempty"`
Comment thread
javiermolinar marked this conversation as resolved.
RetryWithWeights bool `yaml:"retry_with_weights,omitempty"`
}

type SearchConfig struct {
Expand Down Expand Up @@ -95,6 +97,12 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(string, *flag.FlagSet) {
},
SLO: slo,
}
cfg.Weights = pipeline.WeightsConfig{
RequestWithWeights: true,
RetryWithWeights: true,
MaxRegexConditions: 1,
MaxTraceQLConditions: 4,
}

// enable multi tenant queries by default
cfg.MultiTenantQueriesEnabled = true
Expand All @@ -107,12 +115,12 @@ type CortexNoQuerierLimits struct{}
// Returned RoundTripper can be wrapped in more round-tripper middlewares, and then eventually registered
// into HTTP server using the Handler from this package. Returned RoundTripper is always non-nil
// (if there are no errors), and it uses the returned frontend (if any).
func InitFrontend(cfg v1.Config, log log.Logger, reg prometheus.Registerer) (http.RoundTripper, *v1.Frontend, error) {
func InitFrontend(cfg v1.Config, log log.Logger, reg prometheus.Registerer) (pipeline.RoundTripper, *v1.Frontend, error) {
statVersion.Set("v1")
// No scheduler = use original frontend.
fr, err := v1.New(cfg, log, reg)
if err != nil {
return nil, nil, err
}
return transport.AdaptGrpcRoundTripperToHTTPRoundTripper(fr), fr, nil
return fr, fr, nil
}
11 changes: 8 additions & 3 deletions modules/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type QueryFrontend struct {
var tracer = otel.Tracer("modules/frontend")

// New returns a new QueryFrontend
func New(cfg Config, next http.RoundTripper, o overrides.Interface, reader tempodb.Reader, cacheProvider cache.Provider, apiPrefix string, logger log.Logger, registerer prometheus.Registerer) (*QueryFrontend, error) {
func New(cfg Config, next pipeline.RoundTripper, o overrides.Interface, reader tempodb.Reader, cacheProvider cache.Provider, apiPrefix string, logger log.Logger, registerer prometheus.Registerer) (*QueryFrontend, error) {
level.Info(logger).Log("msg", "creating middleware in query frontend")

if cfg.TraceByID.QueryShards < minQueryShards || cfg.TraceByID.QueryShards > maxQueryShards {
Expand Down Expand Up @@ -90,8 +90,7 @@ func New(cfg Config, next http.RoundTripper, o overrides.Interface, reader tempo
return nil, fmt.Errorf("frontend metrics interval should be greater than 0")
}

retryWare := pipeline.NewRetryWare(cfg.MaxRetries, registerer)

retryWare := pipeline.NewRetryWare(cfg.MaxRetries, cfg.Weights.RetryWithWeights, registerer)
cacheWare := pipeline.NewCachingWare(cacheProvider, cache.RoleFrontendSearch, logger)
statusCodeWare := pipeline.NewStatusCodeAdjustWare()
traceIDStatusCodeWare := pipeline.NewStatusCodeAdjustWareWithAllowedCode(http.StatusNotFound)
Expand All @@ -101,6 +100,7 @@ func New(cfg Config, next http.RoundTripper, o overrides.Interface, reader tempo
tracePipeline := pipeline.Build(
[]pipeline.AsyncMiddleware[combiner.PipelineResponse]{
urlDenyListWare,
pipeline.NewWeightRequestWare(pipeline.TraceByID, cfg.Weights),
multiTenantMiddleware(cfg, logger),
newAsyncTraceIDSharder(&cfg.TraceByID, logger),
},
Expand All @@ -111,6 +111,7 @@ func New(cfg Config, next http.RoundTripper, o overrides.Interface, reader tempo
[]pipeline.AsyncMiddleware[combiner.PipelineResponse]{
urlDenyListWare,
queryValidatorWare,
pipeline.NewWeightRequestWare(pipeline.TraceQLSearch, cfg.Weights),
multiTenantMiddleware(cfg, logger),
newAsyncSearchSharder(reader, o, cfg.Search.Sharder, logger),
},
Expand All @@ -120,6 +121,7 @@ func New(cfg Config, next http.RoundTripper, o overrides.Interface, reader tempo
searchTagsPipeline := pipeline.Build(
[]pipeline.AsyncMiddleware[combiner.PipelineResponse]{
urlDenyListWare,
pipeline.NewWeightRequestWare(pipeline.Default, cfg.Weights),
multiTenantMiddleware(cfg, logger),
newAsyncTagSharder(reader, o, cfg.Search.Sharder, parseTagsRequest, logger),
},
Expand All @@ -129,6 +131,7 @@ func New(cfg Config, next http.RoundTripper, o overrides.Interface, reader tempo
searchTagValuesPipeline := pipeline.Build(
[]pipeline.AsyncMiddleware[combiner.PipelineResponse]{
urlDenyListWare,
pipeline.NewWeightRequestWare(pipeline.Default, cfg.Weights),
multiTenantMiddleware(cfg, logger),
newAsyncTagSharder(reader, o, cfg.Search.Sharder, parseTagValuesRequest, logger),
},
Expand All @@ -140,6 +143,7 @@ func New(cfg Config, next http.RoundTripper, o overrides.Interface, reader tempo
[]pipeline.AsyncMiddleware[combiner.PipelineResponse]{
urlDenyListWare,
queryValidatorWare,
pipeline.NewWeightRequestWare(pipeline.Default, cfg.Weights),
multiTenantUnsupportedMiddleware(cfg, logger),
},
[]pipeline.Middleware{statusCodeWare, retryWare},
Expand All @@ -150,6 +154,7 @@ func New(cfg Config, next http.RoundTripper, o overrides.Interface, reader tempo
[]pipeline.AsyncMiddleware[combiner.PipelineResponse]{
urlDenyListWare,
queryValidatorWare,
pipeline.NewWeightRequestWare(pipeline.TraceQLMetrics, cfg.Weights),
multiTenantMiddleware(cfg, logger),
newAsyncQueryRangeSharder(reader, o, cfg.Metrics.Sharder, logger),
},
Expand Down
26 changes: 12 additions & 14 deletions modules/frontend/metrics_query_range_sharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"math"
"net/http"
"time"

"github.com/go-kit/log" //nolint:all deprecated
Expand Down Expand Up @@ -69,7 +68,7 @@ func (s queryRangeSharder) RoundTrip(pipelineRequest pipeline.Request) (pipeline
return pipeline.NewBadRequest(err), nil
}

expr, _, _, _, err := traceql.NewEngine().Compile(req.Query)
expr, _, _, _, err := traceql.Compile(req.Query)
if err != nil {
return pipeline.NewBadRequest(err), nil
}
Expand All @@ -89,7 +88,7 @@ func (s queryRangeSharder) RoundTrip(pipelineRequest pipeline.Request) (pipeline
// Note: this is checked after alignment for consistency.
maxDuration := s.maxDuration(tenantID)
if maxDuration != 0 && time.Duration(req.End-req.Start)*time.Nanosecond > maxDuration {
err = fmt.Errorf(fmt.Sprintf("range specified by start and end (%s) exceeds %s. received start=%d end=%d", time.Duration(req.End-req.Start), maxDuration, req.Start, req.End))
err = fmt.Errorf("range specified by start and end (%s) exceeds %s. received start=%d end=%d", time.Duration(req.End-req.Start), maxDuration, req.Start, req.End)
return pipeline.NewBadRequest(err), nil
}

Expand All @@ -99,14 +98,14 @@ func (s queryRangeSharder) RoundTrip(pipelineRequest pipeline.Request) (pipeline
cutoff = time.Now().Add(-s.cfg.QueryBackendAfter)
)

generatorReq := s.generatorRequest(*req, r, tenantID, cutoff)
generatorReq := s.generatorRequest(ctx, tenantID, pipelineRequest, *req, cutoff)
reqCh := make(chan pipeline.Request, 2) // buffer of 2 allows us to insert generatorReq and metrics

if generatorReq != nil {
reqCh <- pipeline.NewHTTPRequest(generatorReq)
reqCh <- generatorReq
}

totalJobs, totalBlocks, totalBlockBytes := s.backendRequests(ctx, tenantID, r, *req, cutoff, targetBytesPerRequest, reqCh)
totalJobs, totalBlocks, totalBlockBytes := s.backendRequests(ctx, tenantID, pipelineRequest, *req, cutoff, targetBytesPerRequest, reqCh)

span.SetAttributes(attribute.Int64("totalJobs", int64(totalJobs)))
span.SetAttributes(attribute.Int64("totalBlocks", int64(totalBlocks)))
Expand Down Expand Up @@ -158,7 +157,7 @@ func (s *queryRangeSharder) exemplarsPerShard(total uint32) uint32 {
return uint32(math.Ceil(float64(s.cfg.MaxExemplars)*1.2)) / total
}

func (s *queryRangeSharder) backendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq tempopb.QueryRangeRequest, cutoff time.Time, targetBytesPerRequest int, reqCh chan pipeline.Request) (totalJobs, totalBlocks uint32, totalBlockBytes uint64) {
func (s *queryRangeSharder) backendRequests(ctx context.Context, tenantID string, parent pipeline.Request, searchReq tempopb.QueryRangeRequest, cutoff time.Time, targetBytesPerRequest int, reqCh chan pipeline.Request) (totalJobs, totalBlocks uint32, totalBlockBytes uint64) {
// request without start or end, search only in generator
if searchReq.Start == 0 || searchReq.End == 0 {
close(reqCh)
Expand Down Expand Up @@ -204,7 +203,7 @@ func (s *queryRangeSharder) backendRequests(ctx context.Context, tenantID string
return
}

func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq tempopb.QueryRangeRequest, metas []*backend.BlockMeta, targetBytesPerRequest int, reqCh chan<- pipeline.Request) {
func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID string, parent pipeline.Request, searchReq tempopb.QueryRangeRequest, metas []*backend.BlockMeta, targetBytesPerRequest int, reqCh chan<- pipeline.Request) {
defer close(reqCh)

queryHash := hashForQueryRangeRequest(&searchReq)
Expand All @@ -230,7 +229,7 @@ func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID s
}

for startPage := 0; startPage < int(m.TotalRecords); startPage += pages {
subR := parent.Clone(ctx)
subR := parent.HTTPRequest().Clone(ctx)

dedColsJSON, err := colsToJSON.JSONForDedicatedColumns(m.DedicatedColumns)
if err != nil {
Expand Down Expand Up @@ -268,7 +267,7 @@ func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID s
subR = api.BuildQueryRangeRequest(subR, queryRangeReq, dedColsJSON)

prepareRequestForQueriers(subR, tenantID)
pipelineR := pipeline.NewHTTPRequest(subR)
pipelineR := parent.CloneFromHTTPRequest(subR)

// TODO: Handle sampling rate
key := queryRangeCacheKey(tenantID, queryHash, int64(queryRangeReq.Start), int64(queryRangeReq.End), m, int(queryRangeReq.StartPage), int(queryRangeReq.PagesToSearch))
Expand All @@ -292,9 +291,8 @@ func max(a, b uint32) uint32 {
return b
}

func (s *queryRangeSharder) generatorRequest(searchReq tempopb.QueryRangeRequest, parent *http.Request, tenantID string, cutoff time.Time) *http.Request {
func (s *queryRangeSharder) generatorRequest(ctx context.Context, tenantID string, parent pipeline.Request, searchReq tempopb.QueryRangeRequest, cutoff time.Time) *pipeline.HTTPRequest {
traceql.TrimToAfter(&searchReq, cutoff)

// if start == end then we don't need to query it
if searchReq.Start == searchReq.End {
return nil
Expand All @@ -303,12 +301,12 @@ func (s *queryRangeSharder) generatorRequest(searchReq tempopb.QueryRangeRequest
searchReq.QueryMode = querier.QueryModeRecent
searchReq.Exemplars = uint32(s.cfg.MaxExemplars) // TODO: Review this

subR := parent.Clone(parent.Context())
subR := parent.HTTPRequest().Clone(ctx)
subR = api.BuildQueryRangeRequest(subR, &searchReq, "") // dedicated cols are never passed to the generators

prepareRequestForQueriers(subR, tenantID)

return subR
return parent.CloneFromHTTPRequest(subR)
}

// maxDuration returns the max search duration allowed for this tenant.
Expand Down
125 changes: 125 additions & 0 deletions modules/frontend/pipeline/async_weight_middleware.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package pipeline

import (
"github.com/grafana/tempo/modules/frontend/combiner"
"github.com/grafana/tempo/pkg/traceql"
)

type RequestType int

type WeightRequest interface {
SetWeight(int)
Weight() int
}

type WeightsConfig struct {
RequestWithWeights bool `yaml:"request_with_weights,omitempty"`
RetryWithWeights bool `yaml:"retry_with_weights,omitempty"`
MaxTraceQLConditions int `yaml:"max_traceql_conditions,omitempty"`
MaxRegexConditions int `yaml:"max_regex_conditions,omitempty"`
}

type Weights struct {
DefaultWeight int
TraceQLSearchWeight int
TraceByIDWeight int
MaxTraceQLConditions int
MaxRegexConditions int
}

const (
Default RequestType = iota
TraceByID
TraceQLSearch
TraceQLMetrics
)

type weightRequestWare struct {
requestType RequestType
enabled bool
next AsyncRoundTripper[combiner.PipelineResponse]

weights Weights
}

// It increments the weight of a retriyed request
func IncrementRetriedRequestWeight(r WeightRequest) {
r.SetWeight(r.Weight() + 1)
}

// It returns a new weight request middleware
func NewWeightRequestWare(rt RequestType, cfg WeightsConfig) AsyncMiddleware[combiner.PipelineResponse] {
weights := Weights{
DefaultWeight: 1,
TraceQLSearchWeight: 1,
TraceByIDWeight: 2,
MaxTraceQLConditions: cfg.MaxTraceQLConditions,
MaxRegexConditions: cfg.MaxRegexConditions,
}
return AsyncMiddlewareFunc[combiner.PipelineResponse](func(next AsyncRoundTripper[combiner.PipelineResponse]) AsyncRoundTripper[combiner.PipelineResponse] {
return &weightRequestWare{
requestType: rt,
enabled: cfg.RequestWithWeights,
weights: weights,
next: next,
}
})
}

func (c weightRequestWare) RoundTrip(req Request) (Responses[combiner.PipelineResponse], error) {
c.setWeight(req)
return c.next.RoundTrip(req)
}

func (c weightRequestWare) setWeight(req Request) {
if !c.enabled {
req.SetWeight(c.weights.DefaultWeight)
return
}
switch c.requestType {
case TraceByID:
req.SetWeight(c.weights.TraceByIDWeight)
case TraceQLSearch, TraceQLMetrics:
c.setTraceQLWeight(req)
default:
req.SetWeight(c.weights.DefaultWeight)
}
}

func (c weightRequestWare) setTraceQLWeight(req Request) {
var traceQLQuery string
query := req.HTTPRequest().URL.Query()
if query.Has("q") {
traceQLQuery = query.Get("q")
}
if query.Has("query") {
traceQLQuery = query.Get("query")
}

req.SetWeight(c.weights.TraceQLSearchWeight)

if traceQLQuery == "" {
return
}

_, _, _, spanRequest, err := traceql.Compile(traceQLQuery)
if err != nil || spanRequest == nil {
return
}

conditions := 0
regexConditions := 0

for _, c := range spanRequest.Conditions {
if c.Op != traceql.OpNone {
conditions++
}
if c.Op == traceql.OpRegex || c.Op == traceql.OpNotRegex {
regexConditions++
}
}
complexQuery := regexConditions >= c.weights.MaxRegexConditions || conditions >= c.weights.MaxTraceQLConditions
if complexQuery {
req.SetWeight(c.weights.TraceQLSearchWeight + 1)
}
}
Loading