diff --git a/CHANGELOG.md b/CHANGELOG.md index 8142277cfc2..def387f9289 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,7 @@ * [ENHANCEMENT] Add vParquet4 support to the tempo-cli analyse blocks command [#3868](https://github.com/grafana/tempo/pull/3868) (@stoewer) * [ENHANCEMENT] Improve trace id lookup from Tempo Vulture by selecting a date range [#3874](https://github.com/grafana/tempo/pull/3874) (@javiermolinar) * [ENHANCEMENT] Add native histograms for internal metrics[#3870](https://github.com/grafana/tempo/pull/3870) (@zalegrala) +* [ENHANCEMENT] Reduce memory consumption of query-frontend[#3888](https://github.com/grafana/tempo/pull/3888) (@joe-elliott) * [BUGFIX] Fix panic in certain metrics queries using `rate()` with `by` [#3847](https://github.com/grafana/tempo/pull/3847) (@stoewer) * [BUGFIX] Fix metrics queries when grouping by attributes that may not exist [#3734](https://github.com/grafana/tempo/pull/3734) (@mdisibio) * [BUGFIX] Fix frontend parsing error on cached responses [#3759](https://github.com/grafana/tempo/pull/3759) (@mdisibio) diff --git a/modules/frontend/frontend.go b/modules/frontend/frontend.go index 46fea50d439..4687d74d74e 100644 --- a/modules/frontend/frontend.go +++ b/modules/frontend/frontend.go @@ -4,7 +4,6 @@ import ( "fmt" "io" "net/http" - "net/url" "path" "strings" @@ -23,6 +22,13 @@ import ( "github.com/grafana/tempo/tempodb" ) +type RoundTripperFunc func(*http.Request) (*http.Response, error) + +// RoundTrip implememnts http.RoundTripper +func (fn RoundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error) { + return fn(req) +} + // these handler funcs could likely be removed and the code written directly into the respective // gRPC functions type ( @@ -202,7 +208,7 @@ func (q *QueryFrontend) MetricsQueryInstant(req *tempopb.QueryInstantRequest, sr // newSpanMetricsMiddleware creates a new frontend middleware to handle metrics-generator requests. func newMetricsSummaryHandler(next pipeline.AsyncRoundTripper[combiner.PipelineResponse], logger log.Logger) http.RoundTripper { - return pipeline.RoundTripperFunc(func(req *http.Request) (*http.Response, error) { + return RoundTripperFunc(func(req *http.Request) (*http.Response, error) { tenant, err := user.ExtractOrgID(req.Context()) if err != nil { level.Error(logger).Log("msg", "metrics summary: failed to extract tenant id", "err", err) @@ -212,14 +218,14 @@ func newMetricsSummaryHandler(next pipeline.AsyncRoundTripper[combiner.PipelineR Body: io.NopCloser(strings.NewReader(err.Error())), }, nil } - prepareRequestForQueriers(req, tenant, req.RequestURI, nil) + prepareRequestForQueriers(req, tenant) level.Info(logger).Log( "msg", "metrics summary request", "tenant", tenant, "path", req.URL.Path) - resps, err := next.RoundTrip(req) + resps, err := next.RoundTrip(pipeline.NewHTTPRequest(req)) if err != nil { return nil, err } @@ -239,20 +245,19 @@ func newMetricsSummaryHandler(next pipeline.AsyncRoundTripper[combiner.PipelineR // prepareRequestForQueriers modifies the request so they will be farmed correctly to the queriers // - adds the tenant header // - sets the requesturi (see below for details) -func prepareRequestForQueriers(req *http.Request, tenant string, originalURI string, params url.Values) { +func prepareRequestForQueriers(req *http.Request, tenant string) { // set the tenant header req.Header.Set(user.OrgIDHeaderName, tenant) - // build and set the request uri + // copy the url (which is correct) to the RequestURI // we do this because dskit/common uses the RequestURI field to translate from http.Request to httpgrpc.Request - // https://github.com/grafana/dskit/blob/740f56bd293423c5147773ce97264519f9fddc58/httpgrpc/server/server.go#L59 + // https://github.com/grafana/dskit/blob/f5bd38371e1cfae5479b2c23b3893c1a97868bdf/httpgrpc/httpgrpc.go#L53 const queryDelimiter = "?" - uri := path.Join(api.PathPrefixQuerier, originalURI) - if len(params) > 0 { - uri += queryDelimiter + params.Encode() + uri := path.Join(api.PathPrefixQuerier, req.URL.Path) + if len(req.URL.RawQuery) > 0 { + uri += queryDelimiter + req.URL.RawQuery } - req.RequestURI = uri } diff --git a/modules/frontend/metrics_query_handler.go b/modules/frontend/metrics_query_handler.go index 9b0c25aa158..a347dedb198 100644 --- a/modules/frontend/metrics_query_handler.go +++ b/modules/frontend/metrics_query_handler.go @@ -80,7 +80,7 @@ func newQueryInstantStreamingGRPCHandler(cfg Config, next pipeline.AsyncRoundTri func newMetricsQueryInstantHTTPHandler(cfg Config, next pipeline.AsyncRoundTripper[combiner.PipelineResponse], logger log.Logger) http.RoundTripper { postSLOHook := metricsSLOPostHook(cfg.Metrics.SLO) - return pipeline.RoundTripperFunc(func(req *http.Request) (*http.Response, error) { + return RoundTripperFunc(func(req *http.Request) (*http.Response, error) { tenant, _ := user.ExtractOrgID(req.Context()) start := time.Now() diff --git a/modules/frontend/metrics_query_range_handler.go b/modules/frontend/metrics_query_range_handler.go index ce42b51f0af..1b3e3c2f59a 100644 --- a/modules/frontend/metrics_query_range_handler.go +++ b/modules/frontend/metrics_query_range_handler.go @@ -65,7 +65,7 @@ func newQueryRangeStreamingGRPCHandler(cfg Config, next pipeline.AsyncRoundTripp func newMetricsQueryRangeHTTPHandler(cfg Config, next pipeline.AsyncRoundTripper[combiner.PipelineResponse], logger log.Logger) http.RoundTripper { postSLOHook := metricsSLOPostHook(cfg.Metrics.SLO) - return pipeline.RoundTripperFunc(func(req *http.Request) (*http.Response, error) { + return RoundTripperFunc(func(req *http.Request) (*http.Response, error) { tenant, _ := user.ExtractOrgID(req.Context()) start := time.Now() diff --git a/modules/frontend/metrics_query_range_sharder.go b/modules/frontend/metrics_query_range_sharder.go index 70d737c6b25..79a62e0e137 100644 --- a/modules/frontend/metrics_query_range_sharder.go +++ b/modules/frontend/metrics_query_range_sharder.go @@ -64,7 +64,9 @@ func newAsyncQueryRangeSharder(reader tempodb.Reader, o overrides.Interface, cfg }) } -func (s queryRangeSharder) RoundTrip(r *http.Request) (pipeline.Responses[combiner.PipelineResponse], error) { +func (s queryRangeSharder) RoundTrip(pipelineRequest pipeline.Request) (pipeline.Responses[combiner.PipelineResponse], error) { + r := pipelineRequest.HTTPRequest() + span, ctx := opentracing.StartSpanFromContext(r.Context(), "frontend.QueryRangeSharder") defer span.Finish() @@ -112,10 +114,10 @@ func (s queryRangeSharder) RoundTrip(r *http.Request) (pipeline.Responses[combin } generatorReq := s.generatorRequest(*req, r, tenantID, cutoff) - reqCh := make(chan *http.Request, 2) // buffer of 2 allows us to insert generatorReq and metrics + reqCh := make(chan pipeline.Request, 2) // buffer of 2 allows us to insert generatorReq and metrics if generatorReq != nil { - reqCh <- generatorReq + reqCh <- pipeline.NewHTTPRequest(generatorReq) } var ( @@ -171,7 +173,7 @@ func (s *queryRangeSharder) blockMetas(start, end int64, tenantID string) []*bac return metas } -func (s *queryRangeSharder) shardedBackendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq tempopb.QueryRangeRequest, cutoff time.Time, samplingRate float64, targetBytesPerRequest int, interval time.Duration, reqCh chan *http.Request, _ func(error)) (totalJobs, totalBlocks uint32, totalBlockBytes uint64) { +func (s *queryRangeSharder) shardedBackendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq tempopb.QueryRangeRequest, cutoff time.Time, samplingRate float64, targetBytesPerRequest int, interval time.Duration, reqCh chan pipeline.Request, _ func(error)) (totalJobs, totalBlocks uint32, totalBlockBytes uint64) { // request without start or end, search only in generator if searchReq.Start == 0 || searchReq.End == 0 { close(reqCh) @@ -244,7 +246,7 @@ func (s *queryRangeSharder) shardedBackendRequests(ctx context.Context, tenantID return } -func (s *queryRangeSharder) buildShardedBackendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq tempopb.QueryRangeRequest, samplingRate float64, targetBytesPerRequest int, interval time.Duration, reqCh chan *http.Request) { +func (s *queryRangeSharder) buildShardedBackendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq tempopb.QueryRangeRequest, samplingRate float64, targetBytesPerRequest int, interval time.Duration, reqCh chan pipeline.Request) { defer close(reqCh) var ( @@ -281,6 +283,8 @@ func (s *queryRangeSharder) buildShardedBackendRequests(ctx context.Context, ten shardR.ShardID = i shardR.ShardCount = shards httpReq := s.toUpstreamRequest(ctx, shardR, parent, tenantID) + + pipelineR := pipeline.NewHTTPRequest(httpReq) if samplingRate != 1.0 { shardR.ShardID *= uint32(1.0 / samplingRate) shardR.ShardCount *= uint32(1.0 / samplingRate) @@ -288,11 +292,11 @@ func (s *queryRangeSharder) buildShardedBackendRequests(ctx context.Context, ten // Set final sampling rate after integer rounding samplingRate = float64(shards) / float64(shardR.ShardCount) - httpReq = pipeline.ContextAddResponseDataForResponse(samplingRate, httpReq) + pipelineR.SetResponseData(samplingRate) } select { - case reqCh <- httpReq: + case reqCh <- pipelineR: case <-ctx.Done(): return } @@ -302,7 +306,7 @@ func (s *queryRangeSharder) buildShardedBackendRequests(ctx context.Context, ten } } -func (s *queryRangeSharder) backendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq tempopb.QueryRangeRequest, cutoff time.Time, _ float64, targetBytesPerRequest int, _ time.Duration, reqCh chan *http.Request) (totalJobs, totalBlocks uint32, totalBlockBytes uint64) { +func (s *queryRangeSharder) backendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq tempopb.QueryRangeRequest, cutoff time.Time, _ float64, targetBytesPerRequest int, _ time.Duration, reqCh chan pipeline.Request) (totalJobs, totalBlocks uint32, totalBlockBytes uint64) { // request without start or end, search only in generator if searchReq.Start == 0 || searchReq.End == 0 { close(reqCh) @@ -348,7 +352,7 @@ func (s *queryRangeSharder) backendRequests(ctx context.Context, tenantID string return } -func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq tempopb.QueryRangeRequest, metas []*backend.BlockMeta, targetBytesPerRequest int, reqCh chan<- *http.Request) { +func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq tempopb.QueryRangeRequest, metas []*backend.BlockMeta, targetBytesPerRequest int, reqCh chan<- pipeline.Request) { defer close(reqCh) queryHash := hashForQueryRangeRequest(&searchReq) @@ -404,15 +408,17 @@ func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID s subR = api.BuildQueryRangeRequest(subR, queryRangeReq) subR.Header.Set(api.HeaderAccept, api.HeaderAcceptProtobuf) - prepareRequestForQueriers(subR, tenantID, subR.URL.Path, subR.URL.Query()) + prepareRequestForQueriers(subR, tenantID) + pipelineR := pipeline.NewHTTPRequest(subR) + // TODO: Handle sampling rate key := queryRangeCacheKey(tenantID, queryHash, int64(queryRangeReq.Start), int64(queryRangeReq.End), m, int(queryRangeReq.StartPage), int(queryRangeReq.PagesToSearch)) if len(key) > 0 { - subR = pipeline.ContextAddCacheKey(key, subR) + pipelineR.SetCacheKey(key) } select { - case reqCh <- subR: + case reqCh <- pipelineR: case <-ctx.Done(): return } @@ -440,7 +446,7 @@ func (s *queryRangeSharder) toUpstreamRequest(ctx context.Context, req tempopb.Q subR := parent.Clone(ctx) subR = api.BuildQueryRangeRequest(subR, &req) - prepareRequestForQueriers(subR, tenantID, parent.URL.Path, subR.URL.Query()) + prepareRequestForQueriers(subR, tenantID) return subR } diff --git a/modules/frontend/pipeline/async_handler_multitenant.go b/modules/frontend/pipeline/async_handler_multitenant.go index b714a312879..984787d7113 100644 --- a/modules/frontend/pipeline/async_handler_multitenant.go +++ b/modules/frontend/pipeline/async_handler_multitenant.go @@ -1,9 +1,7 @@ package pipeline import ( - "context" "errors" - "net/http" "strings" "github.com/go-kit/log" @@ -34,7 +32,7 @@ func NewMultiTenantMiddleware(logger log.Logger) AsyncMiddleware[combiner.Pipeli }) } -func (t *tenantRoundTripper) RoundTrip(req *http.Request) (Responses[combiner.PipelineResponse], error) { +func (t *tenantRoundTripper) RoundTrip(req Request) (Responses[combiner.PipelineResponse], error) { // extract tenant ids, this will normalize and de-duplicate tenant ids tenants, err := t.resolver.TenantIDs(req.Context()) if err != nil { @@ -51,21 +49,24 @@ func (t *tenantRoundTripper) RoundTrip(req *http.Request) (Responses[combiner.Pi // join tenants for logger because list value type is unsupported. _ = level.Debug(t.logger).Log("msg", "handling multi-tenant query", "tenants", strings.Join(tenants, ",")) - return NewAsyncSharderFunc(req.Context(), 0, len(tenants), func(tenantIdx int) *http.Request { + return NewAsyncSharderFunc(req.Context(), 0, len(tenants), func(tenantIdx int) Request { if tenantIdx >= len(tenants) { return nil } - return requestForTenant(req.Context(), req, tenants[tenantIdx]) + return requestForTenant(req, tenants[tenantIdx]) }, t.next), nil } // requestForTenant makes a copy of request and injects the tenant id into context and Header. // this allows us to keep all multi-tenant logic in query frontend and keep other components single tenant -func requestForTenant(ctx context.Context, r *http.Request, tenant string) *http.Request { +func requestForTenant(req Request, tenant string) Request { + r := req.HTTPRequest() + ctx := r.Context() + ctx = user.InjectOrgID(ctx, tenant) rCopy := r.Clone(ctx) rCopy.Header.Set(user.OrgIDHeaderName, tenant) - return rCopy + return NewHTTPRequest(rCopy) } type unsupportedRoundTripper struct { @@ -85,7 +86,7 @@ func NewMultiTenantUnsupportedMiddleware(logger log.Logger) AsyncMiddleware[comb }) } -func (t *unsupportedRoundTripper) RoundTrip(req *http.Request) (Responses[combiner.PipelineResponse], error) { +func (t *unsupportedRoundTripper) RoundTrip(req Request) (Responses[combiner.PipelineResponse], error) { // extract tenant ids tenants, err := t.resolver.TenantIDs(req.Context()) if err != nil { diff --git a/modules/frontend/pipeline/async_handler_multitenant_test.go b/modules/frontend/pipeline/async_handler_multitenant_test.go index 10fedbe1c94..c1d7d0ff858 100644 --- a/modules/frontend/pipeline/async_handler_multitenant_test.go +++ b/modules/frontend/pipeline/async_handler_multitenant_test.go @@ -52,7 +52,7 @@ func TestMultiTenant(t *testing.T) { trace := test.MakeTrace(10, traceID) once := sync.Once{} - next := AsyncRoundTripperFunc[combiner.PipelineResponse](func(req *http.Request) (Responses[combiner.PipelineResponse], error) { + next := AsyncRoundTripperFunc[combiner.PipelineResponse](func(req Request) (Responses[combiner.PipelineResponse], error) { reqCount.Inc() // Count the number of requests. // Check if the tenant is in the list of tenants. @@ -91,7 +91,7 @@ func TestMultiTenant(t *testing.T) { ctx := user.InjectOrgID(context.Background(), tc.tenants) req = req.WithContext(ctx) - resps, err := rt.RoundTrip(req) + resps, err := rt.RoundTrip(NewHTTPRequest(req)) require.NoError(t, err) for { @@ -153,12 +153,12 @@ func TestMultiTenantNotSupported(t *testing.T) { } test := NewMultiTenantUnsupportedMiddleware(log.NewNopLogger()) - next := AsyncRoundTripperFunc[combiner.PipelineResponse](func(req *http.Request) (Responses[combiner.PipelineResponse], error) { + next := AsyncRoundTripperFunc[combiner.PipelineResponse](func(_ Request) (Responses[combiner.PipelineResponse], error) { return NewSuccessfulResponse("foo"), nil }) rt := test.Wrap(next) - resps, err := rt.RoundTrip(req) + resps, err := rt.RoundTrip(NewHTTPRequest(req)) require.NoError(t, err) // no error expected. tenant unsupported should be passed back as a bad request. errors bubble up as 5xx r, done, err := resps.Next(context.Background()) diff --git a/modules/frontend/pipeline/async_handler_noop.go b/modules/frontend/pipeline/async_handler_noop.go index f797984c835..c7bb8e40fe0 100644 --- a/modules/frontend/pipeline/async_handler_noop.go +++ b/modules/frontend/pipeline/async_handler_noop.go @@ -1,15 +1,13 @@ package pipeline import ( - "net/http" - "github.com/grafana/tempo/modules/frontend/combiner" ) // NewNoopMiddleware returns a middleware that is a passthrough only func NewNoopMiddleware() AsyncMiddleware[combiner.PipelineResponse] { return AsyncMiddlewareFunc[combiner.PipelineResponse](func(next AsyncRoundTripper[combiner.PipelineResponse]) AsyncRoundTripper[combiner.PipelineResponse] { - return AsyncRoundTripperFunc[combiner.PipelineResponse](func(req *http.Request) (Responses[combiner.PipelineResponse], error) { + return AsyncRoundTripperFunc[combiner.PipelineResponse](func(req Request) (Responses[combiner.PipelineResponse], error) { return next.RoundTrip(req) }) }) diff --git a/modules/frontend/pipeline/async_sharding.go b/modules/frontend/pipeline/async_sharding.go index 07bc30a11da..09e92230f9d 100644 --- a/modules/frontend/pipeline/async_sharding.go +++ b/modules/frontend/pipeline/async_sharding.go @@ -2,7 +2,6 @@ package pipeline import ( "context" - "net/http" "sync" "github.com/grafana/tempo/modules/frontend/combiner" @@ -17,7 +16,7 @@ type waitGroup interface { // NewAsyncSharderFunc creates a new AsyncResponse that shards requests to the next AsyncRoundTripper[combiner.PipelineResponse]. It creates one // goroutine per concurrent request. -func NewAsyncSharderFunc(ctx context.Context, concurrentReqs, totalReqs int, reqFn func(i int) *http.Request, next AsyncRoundTripper[combiner.PipelineResponse]) Responses[combiner.PipelineResponse] { +func NewAsyncSharderFunc(ctx context.Context, concurrentReqs, totalReqs int, reqFn func(i int) Request, next AsyncRoundTripper[combiner.PipelineResponse]) Responses[combiner.PipelineResponse] { var wg waitGroup if concurrentReqs <= 0 { wg = &sync.WaitGroup{} @@ -43,7 +42,7 @@ func NewAsyncSharderFunc(ctx context.Context, concurrentReqs, totalReqs int, req } wg.Add(1) - go func(r *http.Request) { + go func(r Request) { defer wg.Done() resp, err := next.RoundTrip(r) @@ -63,7 +62,7 @@ func NewAsyncSharderFunc(ctx context.Context, concurrentReqs, totalReqs int, req } // NewAsyncSharderChan creates a new AsyncResponse that shards requests to the next AsyncRoundTripper[combiner.PipelineResponse] using a limited number of goroutines. -func NewAsyncSharderChan(ctx context.Context, concurrentReqs int, reqs <-chan *http.Request, resps Responses[combiner.PipelineResponse], next AsyncRoundTripper[combiner.PipelineResponse]) Responses[combiner.PipelineResponse] { +func NewAsyncSharderChan(ctx context.Context, concurrentReqs int, reqs <-chan Request, resps Responses[combiner.PipelineResponse], next AsyncRoundTripper[combiner.PipelineResponse]) Responses[combiner.PipelineResponse] { if concurrentReqs == 0 { panic("NewAsyncSharderChan: concurrentReqs must be greater than 0") } diff --git a/modules/frontend/pipeline/async_sharding_test.go b/modules/frontend/pipeline/async_sharding_test.go index 4b389a9bf85..6078cf9420a 100644 --- a/modules/frontend/pipeline/async_sharding_test.go +++ b/modules/frontend/pipeline/async_sharding_test.go @@ -22,32 +22,32 @@ func TestAsyncSharders(t *testing.T) { { name: "AsyncSharder", responseFn: func(next AsyncRoundTripper[combiner.PipelineResponse]) *asyncResponse { - return NewAsyncSharderFunc(context.Background(), 10, expectedRequestCount, func(i int) *http.Request { + return NewAsyncSharderFunc(context.Background(), 10, expectedRequestCount, func(i int) Request { if i >= expectedRequestCount { return nil } - return &http.Request{} + return NewHTTPRequest(&http.Request{}) }, next).(*asyncResponse) }, }, { name: "AsyncSharder - no limit", responseFn: func(next AsyncRoundTripper[combiner.PipelineResponse]) *asyncResponse { - return NewAsyncSharderFunc(context.Background(), 0, expectedRequestCount, func(i int) *http.Request { + return NewAsyncSharderFunc(context.Background(), 0, expectedRequestCount, func(i int) Request { if i >= expectedRequestCount { return nil } - return &http.Request{} + return NewHTTPRequest(&http.Request{}) }, next).(*asyncResponse) }, }, { name: "AsyncSharderLimitedGoroutines", responseFn: func(next AsyncRoundTripper[combiner.PipelineResponse]) *asyncResponse { - reqChan := make(chan *http.Request) + reqChan := make(chan Request) go func() { for i := 0; i < expectedRequestCount; i++ { - reqChan <- &http.Request{} + reqChan <- NewHTTPRequest(&http.Request{}) } close(reqChan) }() @@ -59,7 +59,7 @@ func TestAsyncSharders(t *testing.T) { for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { - next := AsyncRoundTripperFunc[combiner.PipelineResponse](func(r *http.Request) (Responses[combiner.PipelineResponse], error) { + next := AsyncRoundTripperFunc[combiner.PipelineResponse](func(_ Request) (Responses[combiner.PipelineResponse], error) { // return a generic 200 return NewHTTPToAsyncResponse(&http.Response{ Body: io.NopCloser(strings.NewReader("")), diff --git a/modules/frontend/pipeline/collector_grpc.go b/modules/frontend/pipeline/collector_grpc.go index 09280d9d9f8..697bf132323 100644 --- a/modules/frontend/pipeline/collector_grpc.go +++ b/modules/frontend/pipeline/collector_grpc.go @@ -34,7 +34,7 @@ func (c GRPCCollector[T]) RoundTrip(req *http.Request) error { defer cancel() req = req.WithContext(ctx) - resps, err := c.next.RoundTrip(req) + resps, err := c.next.RoundTrip(NewHTTPRequest(req)) if err != nil { return grpcError(err) } diff --git a/modules/frontend/pipeline/collector_http.go b/modules/frontend/pipeline/collector_http.go index 0009024d6ba..447630409c2 100644 --- a/modules/frontend/pipeline/collector_http.go +++ b/modules/frontend/pipeline/collector_http.go @@ -34,7 +34,7 @@ func (r httpCollector) RoundTrip(req *http.Request) (*http.Response, error) { defer cancel() req = req.WithContext(ctx) - resps, err := r.next.RoundTrip(req) + resps, err := r.next.RoundTrip(NewHTTPRequest(req)) if err != nil { return nil, err } diff --git a/modules/frontend/pipeline/context.go b/modules/frontend/pipeline/context.go deleted file mode 100644 index f11ef97b67d..00000000000 --- a/modules/frontend/pipeline/context.go +++ /dev/null @@ -1,24 +0,0 @@ -package pipeline - -import ( - "context" - "net/http" -) - -// this file exists to consolidate and clearly document all context keys that are valid and recognized by the pipeline package - -// contextCacheKey is used by cachingWare to store the cache key in the request context. It stores a string value. -var contextCacheKey = struct{}{} - -func ContextAddCacheKey(key string, req *http.Request) *http.Request { - return req.WithContext(context.WithValue(req.Context(), contextCacheKey, key)) -} - -// contextEchoData is used to echo request specific data through the pipeline. It stores any value. -// see usage for samplingRate in modules/frontend/metrics_query_range_sharder.go -var contextRequestDataForResponse = struct{}{} - -// ContextAddResponseDataForResponse adds a value to the request context that will be echoed back in the response. -func ContextAddResponseDataForResponse(val any, req *http.Request) *http.Request { - return req.WithContext(context.WithValue(req.Context(), contextRequestDataForResponse, val)) -} diff --git a/modules/frontend/pipeline/pipeline.go b/modules/frontend/pipeline/pipeline.go index 8683103fcb4..f2316e3405f 100644 --- a/modules/frontend/pipeline/pipeline.go +++ b/modules/frontend/pipeline/pipeline.go @@ -1,22 +1,78 @@ package pipeline import ( + "context" "net/http" "github.com/grafana/tempo/modules/frontend/combiner" ) +type Request interface { + HTTPRequest() *http.Request + Context() context.Context + WithContext(context.Context) + + SetCacheKey(string) + CacheKey() string + + SetResponseData(any) // add data that will be sent back with this requests response + ResponseData() any +} + +type HTTPRequest struct { + req *http.Request + + cacheKey string + responseData any +} + +func NewHTTPRequest(req *http.Request) *HTTPRequest { + return &HTTPRequest{req: req} +} + +func (r HTTPRequest) HTTPRequest() *http.Request { + return r.req +} + +func (r HTTPRequest) Context() context.Context { + if r.req == nil { + return nil + } + + return r.req.Context() +} + +func (r *HTTPRequest) WithContext(ctx context.Context) { + r.req = r.req.WithContext(ctx) +} + +func (r *HTTPRequest) SetCacheKey(s string) { + r.cacheKey = s +} + +func (r *HTTPRequest) CacheKey() string { + return r.cacheKey +} + +func (r *HTTPRequest) SetResponseData(data any) { + r.responseData = data +} + +func (r *HTTPRequest) ResponseData() any { + return r.responseData +} + // // Async Pipeline // type AsyncRoundTripper[T any] interface { - RoundTrip(*http.Request) (Responses[T], error) + RoundTrip(Request) (Responses[T], error) } -type AsyncRoundTripperFunc[T any] func(*http.Request) (Responses[T], error) +type AsyncRoundTripperFunc[T any] func(Request) (Responses[T], error) -func (fn AsyncRoundTripperFunc[T]) RoundTrip(req *http.Request) (Responses[T], error) { +func (fn AsyncRoundTripperFunc[T]) RoundTrip(req Request) (Responses[T], error) { return fn(req) } @@ -37,23 +93,27 @@ func (f AsyncMiddlewareFunc[T]) Wrap(w AsyncRoundTripper[T]) AsyncRoundTripper[T // Sync Pipeline // -type RoundTripperFunc func(*http.Request) (*http.Response, error) +type RoundTripperFunc func(Request) (*http.Response, error) -// RoundTrip implememnts http.RoundTripper -func (fn RoundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error) { +// RoundTrip implememnts RoundTripper +func (fn RoundTripperFunc) RoundTrip(req Request) (*http.Response, error) { return fn(req) } +type RoundTripper interface { + RoundTrip(Request) (*http.Response, error) +} + // Middleware is used to build pipelines of pipeline.Roundtrippers type Middleware interface { - Wrap(http.RoundTripper) http.RoundTripper + Wrap(RoundTripper) RoundTripper } // MiddlewareFunc is like http.HandlerFunc, but for Middleware. -type MiddlewareFunc func(http.RoundTripper) http.RoundTripper +type MiddlewareFunc func(RoundTripper) RoundTripper // Wrap implements Middleware. -func (f MiddlewareFunc) Wrap(w http.RoundTripper) http.RoundTripper { +func (f MiddlewareFunc) Wrap(w RoundTripper) RoundTripper { return f(w) } @@ -70,7 +130,7 @@ func Build(asyncMW []AsyncMiddleware[combiner.PipelineResponse], mw []Middleware return next }) - syncPipeline := MiddlewareFunc(func(next http.RoundTripper) http.RoundTripper { + syncPipeline := MiddlewareFunc(func(next RoundTripper) RoundTripper { for i := len(mw) - 1; i >= 0; i-- { next = mw[i].Wrap(next) } @@ -79,7 +139,9 @@ func Build(asyncMW []AsyncMiddleware[combiner.PipelineResponse], mw []Middleware // bridge the two pipelines bridge := &pipelineBridge{ - next: syncPipeline.Wrap(next), + next: syncPipeline.Wrap(RoundTripperFunc(func(req Request) (*http.Response, error) { + return next.RoundTrip(req.HTTPRequest()) + })), convert: NewHTTPToAsyncResponse, } return asyncPipeline.Wrap(bridge) @@ -88,18 +150,18 @@ func Build(asyncMW []AsyncMiddleware[combiner.PipelineResponse], mw []Middleware var _ AsyncRoundTripper[combiner.PipelineResponse] = (*pipelineBridge)(nil) type pipelineBridge struct { - next http.RoundTripper + next RoundTripper convert func(*http.Response) Responses[combiner.PipelineResponse] } -func (b *pipelineBridge) RoundTrip(req *http.Request) (Responses[combiner.PipelineResponse], error) { +func (b *pipelineBridge) RoundTrip(req Request) (Responses[combiner.PipelineResponse], error) { r, err := b.next.RoundTrip(req) if err != nil { return nil, err } // check for request data in the context and echo it back if it exists - if val := req.Context().Value(contextRequestDataForResponse); val != nil { + if val := req.ResponseData(); val != nil { return NewHTTPToAsyncResponseWithRequestData(r, val), nil } diff --git a/modules/frontend/pipeline/responses_test.go b/modules/frontend/pipeline/responses_test.go index 1ebab93748b..966f5c1f56f 100644 --- a/modules/frontend/pipeline/responses_test.go +++ b/modules/frontend/pipeline/responses_test.go @@ -214,7 +214,7 @@ func TestAsyncResponsesDoesNotLeak(t *testing.T) { { name: "happy path", finalRT: func(_ context.CancelFunc) RoundTripperFunc { - return RoundTripperFunc(func(r *http.Request) (*http.Response, error) { + return RoundTripperFunc(func(_ Request) (*http.Response, error) { return &http.Response{ Body: io.NopCloser(strings.NewReader("foo")), }, nil @@ -224,7 +224,7 @@ func TestAsyncResponsesDoesNotLeak(t *testing.T) { { name: "error path", finalRT: func(_ context.CancelFunc) RoundTripperFunc { - return RoundTripperFunc(func(r *http.Request) (*http.Response, error) { + return RoundTripperFunc(func(_ Request) (*http.Response, error) { return nil, errors.New("foo") }) }, @@ -234,7 +234,7 @@ func TestAsyncResponsesDoesNotLeak(t *testing.T) { finalRT: func(_ context.CancelFunc) RoundTripperFunc { responseCounter := atomic.Int32{} - return RoundTripperFunc(func(r *http.Request) (*http.Response, error) { + return RoundTripperFunc(func(_ Request) (*http.Response, error) { counter := responseCounter.Add(1) if counter == 2 { return &http.Response{ @@ -257,7 +257,7 @@ func TestAsyncResponsesDoesNotLeak(t *testing.T) { cancel() }() - return RoundTripperFunc(func(r *http.Request) (*http.Response, error) { + return RoundTripperFunc(func(_ Request) (*http.Response, error) { time.Sleep(3 * time.Second) return &http.Response{ @@ -274,7 +274,7 @@ func TestAsyncResponsesDoesNotLeak(t *testing.T) { finalRT: func(cancel context.CancelFunc) RoundTripperFunc { responseCounter := atomic.Int32{} - return RoundTripperFunc(func(r *http.Request) (*http.Response, error) { + return RoundTripperFunc(func(_ Request) (*http.Response, error) { counter := responseCounter.Add(1) if counter == 2 { cancel() @@ -392,25 +392,25 @@ type sharder struct { funcSharder bool } -func (s sharder) RoundTrip(r *http.Request) (Responses[combiner.PipelineResponse], error) { +func (s sharder) RoundTrip(r Request) (Responses[combiner.PipelineResponse], error) { total := 4 concurrent := 2 // execute requests if s.funcSharder { - return NewAsyncSharderFunc(r.Context(), concurrent, total, func(i int) *http.Request { + return NewAsyncSharderFunc(r.HTTPRequest().Context(), concurrent, total, func(_ int) Request { return r }, s.next), nil } - reqCh := make(chan *http.Request) + reqCh := make(chan Request) go func() { for i := 0; i < total; i++ { reqCh <- r } close(reqCh) }() - return NewAsyncSharderChan(r.Context(), concurrent, reqCh, nil, s.next), nil + return NewAsyncSharderChan(r.HTTPRequest().Context(), concurrent, reqCh, nil, s.next), nil } func BenchmarkNewSyncToAsyncResponse(b *testing.B) { diff --git a/modules/frontend/pipeline/sync_handler_adjust_response_code.go b/modules/frontend/pipeline/sync_handler_adjust_response_code.go index 4b290378825..a7c9a362af1 100644 --- a/modules/frontend/pipeline/sync_handler_adjust_response_code.go +++ b/modules/frontend/pipeline/sync_handler_adjust_response_code.go @@ -5,7 +5,7 @@ import ( ) type statusCodeAdjustWare struct { - next http.RoundTripper + next RoundTripper allowedCode int } @@ -13,7 +13,7 @@ type statusCodeAdjustWare struct { // This is necessary because the queriers may return 4xx status codes for bad requests, but we want to represent // these as 500s to the rest of the pipeline. This also allows the rest of the pipeline to return 4xxs that can be trusted. func NewStatusCodeAdjustWare() Middleware { - return MiddlewareFunc(func(next http.RoundTripper) http.RoundTripper { + return MiddlewareFunc(func(next RoundTripper) RoundTripper { return statusCodeAdjustWare{ next: next, } @@ -21,7 +21,7 @@ func NewStatusCodeAdjustWare() Middleware { } func NewStatusCodeAdjustWareWithAllowedCode(code int) Middleware { - return MiddlewareFunc(func(next http.RoundTripper) http.RoundTripper { + return MiddlewareFunc(func(next RoundTripper) RoundTripper { return statusCodeAdjustWare{ next: next, allowedCode: code, @@ -30,7 +30,7 @@ func NewStatusCodeAdjustWareWithAllowedCode(code int) Middleware { } // RoundTrip implements http.RoundTripper -func (c statusCodeAdjustWare) RoundTrip(req *http.Request) (*http.Response, error) { +func (c statusCodeAdjustWare) RoundTrip(req Request) (*http.Response, error) { resp, err := c.next.RoundTrip(req) if err != nil { return resp, err diff --git a/modules/frontend/pipeline/sync_handler_adjust_response_code_test.go b/modules/frontend/pipeline/sync_handler_adjust_response_code_test.go index fe85a43880b..67f7a3f9a74 100644 --- a/modules/frontend/pipeline/sync_handler_adjust_response_code_test.go +++ b/modules/frontend/pipeline/sync_handler_adjust_response_code_test.go @@ -9,8 +9,8 @@ import ( ) func TestAdjustsResponseCode(t *testing.T) { - nextFn := func(status int) http.RoundTripper { - return RoundTripperFunc(func(req *http.Request) (*http.Response, error) { + nextFn := func(status int) RoundTripper { + return RoundTripperFunc(func(_ Request) (*http.Response, error) { return &http.Response{StatusCode: status}, nil }) } @@ -30,7 +30,7 @@ func TestAdjustsResponseCode(t *testing.T) { retryWare := NewStatusCodeAdjustWare() handler := retryWare.Wrap(nextFn(tc.actualCode)) req := httptest.NewRequest("GET", "http://example.com", nil) - res, err := handler.RoundTrip(req) + res, err := handler.RoundTrip(NewHTTPRequest(req)) require.NoError(t, err) require.Equal(t, res.StatusCode, tc.expectedCode) @@ -38,8 +38,8 @@ func TestAdjustsResponseCode(t *testing.T) { } func TestAdjustsResponseCodeTeapotAllowed(t *testing.T) { - nextFn := func(status int) http.RoundTripper { - return RoundTripperFunc(func(req *http.Request) (*http.Response, error) { + nextFn := func(status int) RoundTripper { + return RoundTripperFunc(func(_ Request) (*http.Response, error) { return &http.Response{StatusCode: status}, nil }) } @@ -59,7 +59,7 @@ func TestAdjustsResponseCodeTeapotAllowed(t *testing.T) { retryWare := NewStatusCodeAdjustWareWithAllowedCode(418) handler := retryWare.Wrap(nextFn(tc.actualCode)) req := httptest.NewRequest("GET", "http://example.com", nil) - res, err := handler.RoundTrip(req) + res, err := handler.RoundTrip(NewHTTPRequest(req)) require.NoError(t, err) require.Equal(t, res.StatusCode, tc.expectedCode) diff --git a/modules/frontend/pipeline/sync_handler_cache.go b/modules/frontend/pipeline/sync_handler_cache.go index ec6bba792e2..9d69ea6f0ac 100644 --- a/modules/frontend/pipeline/sync_handler_cache.go +++ b/modules/frontend/pipeline/sync_handler_cache.go @@ -15,7 +15,7 @@ import ( ) func NewCachingWare(cacheProvider cache.Provider, role cache.Role, logger log.Logger) Middleware { - return MiddlewareFunc(func(next http.RoundTripper) http.RoundTripper { + return MiddlewareFunc(func(next RoundTripper) RoundTripper { return cachingWare{ next: next, cache: newFrontendCache(cacheProvider, role, logger), @@ -24,20 +24,20 @@ func NewCachingWare(cacheProvider cache.Provider, role cache.Role, logger log.Lo } type cachingWare struct { - next http.RoundTripper + next RoundTripper cache *frontendCache } // RoundTrip implements http.RoundTripper -func (c cachingWare) RoundTrip(req *http.Request) (*http.Response, error) { +func (c cachingWare) RoundTrip(req Request) (*http.Response, error) { // short circuit everything if there cache is no cache if c.cache == nil { return c.next.RoundTrip(req) } // extract cache key - key, ok := req.Context().Value(contextCacheKey).(string) - if ok && len(key) > 0 { + key := req.CacheKey() + if len(key) > 0 { body := c.cache.fetchBytes(key) if len(body) > 0 { resp := &http.Response{ diff --git a/modules/frontend/pipeline/sync_handler_retry.go b/modules/frontend/pipeline/sync_handler_retry.go index d95ae39cf91..082fcdca72f 100644 --- a/modules/frontend/pipeline/sync_handler_retry.go +++ b/modules/frontend/pipeline/sync_handler_retry.go @@ -28,7 +28,7 @@ func NewRetryWare(maxRetries int, registerer prometheus.Registerer) Middleware { NativeHistogramMinResetDuration: 1 * time.Hour, }) - return MiddlewareFunc(func(next http.RoundTripper) http.RoundTripper { + return MiddlewareFunc(func(next RoundTripper) RoundTripper { return retryWare{ next: next, maxRetries: maxRetries, @@ -38,20 +38,20 @@ func NewRetryWare(maxRetries int, registerer prometheus.Registerer) Middleware { } type retryWare struct { - next http.RoundTripper + next RoundTripper maxRetries int retriesCount prometheus.Histogram } // RoundTrip implements http.RoundTripper -func (r retryWare) RoundTrip(req *http.Request) (*http.Response, error) { +func (r retryWare) RoundTrip(req Request) (*http.Response, error) { ctx := req.Context() span, ctx := opentracing.StartSpanFromContext(ctx, "frontend.Retry") defer span.Finish() ext.SpanKindRPCClient.Set(span) // context propagation - req = req.WithContext(ctx) + req.WithContext(ctx) tries := 0 defer func() { r.retriesCount.Observe(float64(tries)) }() diff --git a/modules/frontend/pipeline/sync_handler_retry_test.go b/modules/frontend/pipeline/sync_handler_retry_test.go index 452ba662747..5d57db7c6d1 100644 --- a/modules/frontend/pipeline/sync_handler_retry_test.go +++ b/modules/frontend/pipeline/sync_handler_retry_test.go @@ -18,7 +18,7 @@ func TestRetry(t *testing.T) { for _, tc := range []struct { name string - handler http.RoundTripper + handler RoundTripper maxRetries int expectedTries int32 expectedRes *http.Response @@ -26,7 +26,7 @@ func TestRetry(t *testing.T) { }{ { name: "retry errors until success", - handler: RoundTripperFunc(func(req *http.Request) (*http.Response, error) { + handler: RoundTripperFunc(func(_ Request) (*http.Response, error) { if try.Inc() == 5 { return &http.Response{StatusCode: 200}, nil } @@ -39,7 +39,7 @@ func TestRetry(t *testing.T) { }, { name: "don't retry 400's", - handler: RoundTripperFunc(func(req *http.Request) (*http.Response, error) { + handler: RoundTripperFunc(func(_ Request) (*http.Response, error) { try.Inc() return &http.Response{StatusCode: 400}, nil }), @@ -50,7 +50,7 @@ func TestRetry(t *testing.T) { }, { name: "don't retry GRPC request with HTTP 400's", - handler: RoundTripperFunc(func(req *http.Request) (*http.Response, error) { + handler: RoundTripperFunc(func(_ Request) (*http.Response, error) { try.Inc() return nil, httpgrpc.ErrorFromHTTPResponse(&httpgrpc.HTTPResponse{Code: 400}) }), @@ -61,7 +61,7 @@ func TestRetry(t *testing.T) { }, { name: "retry 500s", - handler: RoundTripperFunc(func(req *http.Request) (*http.Response, error) { + handler: RoundTripperFunc(func(_ Request) (*http.Response, error) { try.Inc() return &http.Response{StatusCode: 503}, nil }), @@ -72,7 +72,7 @@ func TestRetry(t *testing.T) { }, { name: "return last error", - handler: RoundTripperFunc(func(req *http.Request) (*http.Response, error) { + handler: RoundTripperFunc(func(_ Request) (*http.Response, error) { if try.Inc() == 5 { return nil, errors.New("request failed") } @@ -85,7 +85,7 @@ func TestRetry(t *testing.T) { }, { name: "maxRetries=1", - handler: RoundTripperFunc(func(req *http.Request) (*http.Response, error) { + handler: RoundTripperFunc(func(_ Request) (*http.Response, error) { try.Inc() return &http.Response{StatusCode: 500}, nil }), @@ -96,7 +96,7 @@ func TestRetry(t *testing.T) { }, { name: "maxRetries=0", - handler: RoundTripperFunc(func(req *http.Request) (*http.Response, error) { + handler: RoundTripperFunc(func(_ Request) (*http.Response, error) { try.Inc() return &http.Response{StatusCode: 500}, nil }), @@ -114,7 +114,7 @@ func TestRetry(t *testing.T) { req := httptest.NewRequest("GET", "http://example.com", nil) - res, err := handler.RoundTrip(req) + res, err := handler.RoundTrip(NewHTTPRequest(req)) require.Equal(t, tc.expectedTries, try.Load()) require.Equal(t, tc.expectedErr, err) @@ -134,10 +134,10 @@ func TestRetry_CancelledRequest(t *testing.T) { require.NoError(t, err) _, err = NewRetryWare(5, prometheus.NewRegistry()). - Wrap(RoundTripperFunc(func(req *http.Request) (*http.Response, error) { + Wrap(RoundTripperFunc(func(_ Request) (*http.Response, error) { try.Inc() return nil, ctx.Err() - })).RoundTrip(req) + })).RoundTrip(NewHTTPRequest(req)) require.Equal(t, int32(0), try.Load()) require.Equal(t, ctx.Err(), err) @@ -149,11 +149,11 @@ func TestRetry_CancelledRequest(t *testing.T) { require.NoError(t, err) _, err = NewRetryWare(5, prometheus.NewRegistry()). - Wrap(RoundTripperFunc(func(req *http.Request) (*http.Response, error) { + Wrap(RoundTripperFunc(func(_ Request) (*http.Response, error) { try.Inc() cancel() return nil, errors.New("this request failed") - })).RoundTrip(req) + })).RoundTrip(NewHTTPRequest(req)) require.Equal(t, int32(1), try.Load()) require.Equal(t, ctx.Err(), err) diff --git a/modules/frontend/search_handlers.go b/modules/frontend/search_handlers.go index 4ff51fe9f07..daca325ce71 100644 --- a/modules/frontend/search_handlers.go +++ b/modules/frontend/search_handlers.go @@ -74,7 +74,7 @@ func newSearchStreamingGRPCHandler(cfg Config, next pipeline.AsyncRoundTripper[c func newSearchHTTPHandler(cfg Config, next pipeline.AsyncRoundTripper[combiner.PipelineResponse], logger log.Logger) http.RoundTripper { postSLOHook := searchSLOPostHook(cfg.Search.SLO) - return pipeline.RoundTripperFunc(func(req *http.Request) (*http.Response, error) { + return RoundTripperFunc(func(req *http.Request) (*http.Response, error) { tenant, _ := user.ExtractOrgID(req.Context()) start := time.Now() diff --git a/modules/frontend/search_handlers_test.go b/modules/frontend/search_handlers_test.go index 02f614b0fd2..dfb94e5e565 100644 --- a/modules/frontend/search_handlers_test.go +++ b/modules/frontend/search_handlers_test.go @@ -660,9 +660,52 @@ func cacheResponsesEqual(t *testing.T, cacheResponse *tempopb.SearchResponse, pi require.Equal(t, pipelineResp, cacheResponse) } +func BenchmarkSearchPipeline(b *testing.B) { + tenant := "foo" + + // create 500 blocks. each is forces one job + totalBlocks := 500 + rdr := &mockReader{ + metas: make([]*backend.BlockMeta, 0, totalBlocks), + } + for i := 0; i < totalBlocks; i++ { + rdr.metas = append(rdr.metas, &backend.BlockMeta{ + StartTime: time.Unix(15, 0), + EndTime: time.Unix(16, 0), + Size: defaultTargetBytesPerRequest, + TotalRecords: 1, + BlockID: uuid.MustParse(fmt.Sprintf("00000000-0000-0000-0000-%012d", i)), + }) + } + + f := frontendWithSettings(b, nil, rdr, nil, nil) + + // setup query + query := "{}" + + start := uint32(10) + end := uint32(20) + + // execute query + path := fmt.Sprintf("/?start=%d&end=%d&q=%s&limit=3&spss=2", start, end, query) // encapsulates block above + req := httptest.NewRequest("GET", path, nil) + ctx := req.Context() + ctx = user.InjectOrgID(ctx, tenant) + req = req.WithContext(ctx) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + respWriter := httptest.NewRecorder() + f.SearchHandler.ServeHTTP(respWriter, req) + + resp := respWriter.Result() + require.Equal(b, 200, resp.StatusCode) + } +} + // frontendWithSettings returns a new frontend with the given settings. any nil options // are given "happy path" defaults -func frontendWithSettings(t *testing.T, next http.RoundTripper, rdr tempodb.Reader, cfg *Config, cacheProvider cache.Provider, +func frontendWithSettings(t require.TestingT, next http.RoundTripper, rdr tempodb.Reader, cfg *Config, cacheProvider cache.Provider, opts ...func(*Config), ) *QueryFrontend { if next == nil { diff --git a/modules/frontend/search_sharder.go b/modules/frontend/search_sharder.go index f1991a7a067..e87d0b0f96d 100644 --- a/modules/frontend/search_sharder.go +++ b/modules/frontend/search_sharder.go @@ -64,7 +64,9 @@ func newAsyncSearchSharder(reader tempodb.Reader, o overrides.Interface, cfg Sea // RoundTrip implements http.RoundTripper // execute up to concurrentRequests simultaneously where each request scans ~targetMBsPerRequest // until limit results are found -func (s asyncSearchSharder) RoundTrip(r *http.Request) (pipeline.Responses[combiner.PipelineResponse], error) { +func (s asyncSearchSharder) RoundTrip(pipelineRequest pipeline.Request) (pipeline.Responses[combiner.PipelineResponse], error) { + r := pipelineRequest.HTTPRequest() + searchReq, err := api.ParseSearchRequest(r) if err != nil { return pipeline.NewBadRequest(err), nil @@ -91,7 +93,7 @@ func (s asyncSearchSharder) RoundTrip(r *http.Request) (pipeline.Responses[combi } // buffer of shards+1 allows us to insert ingestReq and metrics - reqCh := make(chan *http.Request, s.cfg.IngesterShards+1) + reqCh := make(chan pipeline.Request, s.cfg.IngesterShards+1) // build request to search ingesters based on query_ingesters_until config and time range // pass subCtx in requests so we can cancel and exit early @@ -153,7 +155,7 @@ func (s *asyncSearchSharder) blockMetas(start, end int64, tenantID string) []*ba // backendRequest builds backend requests to search backend blocks. backendRequest takes ownership of reqCh and closes it. // it returns 3 int values: totalBlocks, totalBlockBytes, and estimated jobs -func (s *asyncSearchSharder) backendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq *tempopb.SearchRequest, reqCh chan<- *http.Request, errFn func(error)) (totalJobs, totalBlocks int, totalBlockBytes uint64) { +func (s *asyncSearchSharder) backendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq *tempopb.SearchRequest, reqCh chan<- pipeline.Request, errFn func(error)) (totalJobs, totalBlocks int, totalBlockBytes uint64) { var blocks []*backend.BlockMeta // request without start or end, search only in ingester @@ -199,7 +201,7 @@ func (s *asyncSearchSharder) backendRequests(ctx context.Context, tenantID strin // that covers the ingesters. If nil is returned for the http.Request then there is no ingesters query. // since this function modifies searchReq.Start and End we are taking a value instead of a pointer to prevent it from // unexpectedly changing the passed searchReq. -func (s *asyncSearchSharder) ingesterRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq tempopb.SearchRequest, reqCh chan *http.Request) error { +func (s *asyncSearchSharder) ingesterRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq tempopb.SearchRequest, reqCh chan pipeline.Request) error { // request without start or end, search only in ingester if searchReq.Start == 0 || searchReq.End == 0 { return buildIngesterRequest(ctx, tenantID, parent, &searchReq, reqCh) @@ -297,7 +299,7 @@ func backendRange(start, end uint32, queryBackendAfter time.Duration) (uint32, u // buildBackendRequests returns a slice of requests that cover all blocks in the store // that are covered by start/end. -func buildBackendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq *tempopb.SearchRequest, metas []*backend.BlockMeta, bytesPerRequest int, reqCh chan<- *http.Request, errFn func(error)) { +func buildBackendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq *tempopb.SearchRequest, metas []*backend.BlockMeta, bytesPerRequest int, reqCh chan<- pipeline.Request, errFn func(error)) { defer close(reqCh) queryHash := hashForSearchRequest(searchReq) @@ -336,14 +338,13 @@ func buildBackendRequests(ctx context.Context, tenantID string, parent *http.Req continue } - prepareRequestForQueriers(subR, tenantID, subR.URL.Path, subR.URL.Query()) + prepareRequestForQueriers(subR, tenantID) key := searchJobCacheKey(tenantID, queryHash, int64(searchReq.Start), int64(searchReq.End), m, startPage, pages) - if len(key) > 0 { - subR = pipeline.ContextAddCacheKey(key, subR) - } + pipelineR := pipeline.NewHTTPRequest(subR) + pipelineR.SetCacheKey(key) select { - case reqCh <- subR: + case reqCh <- pipelineR: case <-ctx.Done(): // ignore the error if there is one. it will be handled elsewhere return @@ -396,14 +397,14 @@ func pagesPerRequest(m *backend.BlockMeta, bytesPerRequest int) int { return pagesPerQuery } -func buildIngesterRequest(ctx context.Context, tenantID string, parent *http.Request, searchReq *tempopb.SearchRequest, reqCh chan *http.Request) error { +func buildIngesterRequest(ctx context.Context, tenantID string, parent *http.Request, searchReq *tempopb.SearchRequest, reqCh chan pipeline.Request) error { subR := parent.Clone(ctx) subR, err := api.BuildSearchRequest(subR, searchReq) if err != nil { return err } - prepareRequestForQueriers(subR, tenantID, subR.URL.Path, subR.URL.Query()) - reqCh <- subR + prepareRequestForQueriers(subR, tenantID) + reqCh <- pipeline.NewHTTPRequest(subR) return nil } diff --git a/modules/frontend/search_sharder_test.go b/modules/frontend/search_sharder_test.go index 7c83884d9c9..3d6fdb95db7 100644 --- a/modules/frontend/search_sharder_test.go +++ b/modules/frontend/search_sharder_test.go @@ -224,7 +224,7 @@ func TestBuildBackendRequests(t *testing.T) { require.NoError(t, err) ctx, cancelCause := context.WithCancelCause(context.Background()) - reqCh := make(chan *http.Request) + reqCh := make(chan pipeline.Request) go func() { buildBackendRequests(ctx, "test", req, searchReq, tc.metas, tc.targetBytesPerRequest, reqCh, cancelCause) @@ -233,7 +233,7 @@ func TestBuildBackendRequests(t *testing.T) { actualURIs := []string{} for r := range reqCh { if r != nil { - actualURIs = append(actualURIs, r.RequestURI) + actualURIs = append(actualURIs, r.HTTPRequest().RequestURI) } } @@ -313,7 +313,7 @@ func TestBackendRequests(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) - reqCh := make(chan *http.Request) + reqCh := make(chan pipeline.Request) ctx, cancelCause := context.WithCancelCause(context.Background()) @@ -325,7 +325,7 @@ func TestBackendRequests(t *testing.T) { actualReqURIs := []string{} for r := range reqCh { if r != nil { - actualReqURIs = append(actualReqURIs, r.RequestURI) + actualReqURIs = append(actualReqURIs, r.HTTPRequest().RequestURI) } } require.NoError(t, ctx.Err()) @@ -489,7 +489,7 @@ func TestIngesterRequests(t *testing.T) { searchReq, err := api.ParseSearchRequest(req) require.NoError(t, err) - reqChan := make(chan *http.Request, tc.ingesterShards) + reqChan := make(chan pipeline.Request, tc.ingesterShards) defer close(reqChan) copyReq := searchReq @@ -506,7 +506,7 @@ func TestIngesterRequests(t *testing.T) { req := <-reqChan require.NotNil(t, req) - values := req.URL.Query() + values := req.HTTPRequest().URL.Query() expectedQueryStringValues, err := url.ParseQuery(expectedURI) require.NoError(t, err) @@ -640,7 +640,7 @@ func TestBackendRange(t *testing.T) { } func TestTotalJobsIncludesIngester(t *testing.T) { - next := pipeline.AsyncRoundTripperFunc[combiner.PipelineResponse](func(r *http.Request) (pipeline.Responses[combiner.PipelineResponse], error) { + next := pipeline.AsyncRoundTripperFunc[combiner.PipelineResponse](func(_ pipeline.Request) (pipeline.Responses[combiner.PipelineResponse], error) { resString, err := (&jsonpb.Marshaler{}).MarshalToString(&tempopb.SearchResponse{ Metrics: &tempopb.SearchMetrics{}, }) @@ -681,7 +681,7 @@ func TestTotalJobsIncludesIngester(t *testing.T) { ctx = user.InjectOrgID(ctx, "blerg") req = req.WithContext(ctx) - resps, err := testRT.RoundTrip(req) + resps, err := testRT.RoundTrip(pipeline.NewHTTPRequest(req)) require.NoError(t, err) // find a response with total jobs > . this is the metadata response var resp *tempopb.SearchResponse @@ -710,7 +710,7 @@ func TestTotalJobsIncludesIngester(t *testing.T) { } func TestSearchSharderRoundTripBadRequest(t *testing.T) { - next := pipeline.AsyncRoundTripperFunc[combiner.PipelineResponse](func(r *http.Request) (pipeline.Responses[combiner.PipelineResponse], error) { + next := pipeline.AsyncRoundTripperFunc[combiner.PipelineResponse](func(_ pipeline.Request) (pipeline.Responses[combiner.PipelineResponse], error) { return nil, nil }) @@ -726,18 +726,18 @@ func TestSearchSharderRoundTripBadRequest(t *testing.T) { // no org id req := httptest.NewRequest("GET", "/?start=1000&end=1100", nil) - resp, err := testRT.RoundTrip(req) + resp, err := testRT.RoundTrip(pipeline.NewHTTPRequest(req)) testBadRequestFromResponses(t, resp, err, "no org id") // start/end outside of max duration req = httptest.NewRequest("GET", "/?start=1000&end=1500", nil) req = req.WithContext(user.InjectOrgID(req.Context(), "blerg")) - resp, err = testRT.RoundTrip(req) + resp, err = testRT.RoundTrip(pipeline.NewHTTPRequest(req)) testBadRequestFromResponses(t, resp, err, "range specified by start and end exceeds 5m0s. received start=1000 end=1500") // bad request req = httptest.NewRequest("GET", "/?start=asdf&end=1500", nil) - resp, err = testRT.RoundTrip(req) + resp, err = testRT.RoundTrip(pipeline.NewHTTPRequest(req)) testBadRequestFromResponses(t, resp, err, "invalid start: strconv.ParseInt: parsing \"asdf\": invalid syntax") // test max duration error with overrides @@ -759,7 +759,7 @@ func TestSearchSharderRoundTripBadRequest(t *testing.T) { req = httptest.NewRequest("GET", "/?start=1000&end=1500", nil) req = req.WithContext(user.InjectOrgID(req.Context(), "blerg")) - resp, err = testRT.RoundTrip(req) + resp, err = testRT.RoundTrip(pipeline.NewHTTPRequest(req)) testBadRequestFromResponses(t, resp, err, "range specified by start and end exceeds 1m0s. received start=1000 end=1500") } diff --git a/modules/frontend/tag_handlers.go b/modules/frontend/tag_handlers.go index bd5ce961768..63bbdde660c 100644 --- a/modules/frontend/tag_handlers.go +++ b/modules/frontend/tag_handlers.go @@ -108,7 +108,7 @@ func streamingTags[TReq proto.Message, TResp proto.Message](ctx context.Context, return status.Error(codes.InvalidArgument, err.Error()) } - prepareRequestForQueriers(httpReq, tenant, httpReq.URL.Path, httpReq.URL.Query()) + prepareRequestForQueriers(httpReq, tenant) c := fnCombiner(o.MaxBytesPerTagValuesQuery(tenant)) collector := pipeline.NewGRPCCollector[TResp](next, cfg.ResponseConsumers, c, fnSend) @@ -123,7 +123,7 @@ func streamingTags[TReq proto.Message, TResp proto.Message](ctx context.Context, // newTagHTTPHandler returns a handler that returns a single response from the HTTP handler func newTagHTTPHandler(cfg Config, next pipeline.AsyncRoundTripper[combiner.PipelineResponse], o overrides.Interface, fnCombiner func(int) combiner.Combiner, logger log.Logger) http.RoundTripper { - return pipeline.RoundTripperFunc(func(req *http.Request) (*http.Response, error) { + return RoundTripperFunc(func(req *http.Request) (*http.Response, error) { tenant, err := user.ExtractOrgID(req.Context()) if err != nil { level.Error(logger).Log("msg", "tags failed to extract orgid", "err", err) diff --git a/modules/frontend/tag_sharder.go b/modules/frontend/tag_sharder.go index 0fb3b5dc4e2..6110cc254ba 100644 --- a/modules/frontend/tag_sharder.go +++ b/modules/frontend/tag_sharder.go @@ -189,7 +189,8 @@ func newAsyncTagSharder(reader tempodb.Reader, o overrides.Interface, cfg Search // RoundTrip implements pipeline.AsyncRoundTripper // execute up to concurrentRequests simultaneously where each request scans ~targetMBsPerRequest // until limit results are found -func (s searchTagSharder) RoundTrip(r *http.Request) (pipeline.Responses[combiner.PipelineResponse], error) { +func (s searchTagSharder) RoundTrip(pipelineRequest pipeline.Request) (pipeline.Responses[combiner.PipelineResponse], error) { + r := pipelineRequest.HTTPRequest() requestCtx := r.Context() tenantID, err := user.ExtractOrgID(requestCtx) @@ -218,9 +219,9 @@ func (s searchTagSharder) RoundTrip(r *http.Request) (pipeline.Responses[combine return nil, err } - reqCh := make(chan *http.Request, 1) // buffer of 1 allows us to insert ingestReq if it exists + reqCh := make(chan pipeline.Request, 1) // buffer of 1 allows us to insert ingestReq if it exists if ingesterReq != nil { - reqCh <- ingesterReq + reqCh <- pipeline.NewHTTPRequest(ingesterReq) } s.backendRequests(ctx, tenantID, r, searchReq, reqCh, func(err error) { @@ -249,7 +250,7 @@ func (s searchTagSharder) blockMetas(start, end int64, tenantID string) []*backe // backendRequest builds backend requests to search backend blocks. backendRequest takes ownership of reqCh and closes it. // it returns 3 int values: totalBlocks, totalBlockBytes, and estimated jobs -func (s searchTagSharder) backendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq tagSearchReq, reqCh chan<- *http.Request, errFn func(error)) { +func (s searchTagSharder) backendRequests(ctx context.Context, tenantID string, parent *http.Request, searchReq tagSearchReq, reqCh chan<- pipeline.Request, errFn func(error)) { var blocks []*backend.BlockMeta // request without start or end, search only in ingester @@ -279,7 +280,7 @@ func (s searchTagSharder) backendRequests(ctx context.Context, tenantID string, // buildBackendRequests returns a slice of requests that cover all blocks in the store // that are covered by start/end. -func (s searchTagSharder) buildBackendRequests(ctx context.Context, tenantID string, parent *http.Request, metas []*backend.BlockMeta, bytesPerRequest int, reqCh chan<- *http.Request, errFn func(error), searchReq tagSearchReq) { +func (s searchTagSharder) buildBackendRequests(ctx context.Context, tenantID string, parent *http.Request, metas []*backend.BlockMeta, bytesPerRequest int, reqCh chan<- pipeline.Request, errFn func(error), searchReq tagSearchReq) { defer close(reqCh) hash := searchReq.hash() @@ -300,15 +301,14 @@ func (s searchTagSharder) buildBackendRequests(ctx context.Context, tenantID str return } subR.Header.Set(api.HeaderAccept, api.HeaderAcceptProtobuf) - prepareRequestForQueriers(subR, tenantID, parent.URL.Path, subR.URL.Query()) + prepareRequestForQueriers(subR, tenantID) + pipelineR := pipeline.NewHTTPRequest(subR) key := cacheKey(keyPrefix, tenantID, hash, int64(searchReq.start()), int64(searchReq.end()), m, startPage, pages) - if len(key) > 0 { - subR = pipeline.ContextAddCacheKey(key, subR) - } + pipelineR.SetCacheKey(key) select { - case reqCh <- subR: + case reqCh <- pipelineR: case <-ctx.Done(): return } @@ -358,7 +358,7 @@ func (s searchTagSharder) buildIngesterRequest(ctx context.Context, tenantID str return nil, err } subR.Header.Set(api.HeaderAccept, api.HeaderAcceptProtobuf) - prepareRequestForQueriers(subR, tenantID, subR.URL.Path, subR.URL.Query()) + prepareRequestForQueriers(subR, tenantID) return subR, nil } diff --git a/modules/frontend/tag_sharder_test.go b/modules/frontend/tag_sharder_test.go index 6913f44400a..0e961e4fcdb 100644 --- a/modules/frontend/tag_sharder_test.go +++ b/modules/frontend/tag_sharder_test.go @@ -154,7 +154,7 @@ func TestTagsBackendRequests(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) - reqCh := make(chan *http.Request) + reqCh := make(chan pipeline.Request) req := fakeReq{} if tc.params != nil { req.startValue = uint32(tc.params.start) @@ -167,7 +167,7 @@ func TestTagsBackendRequests(t *testing.T) { actualReqURIs := []string{} for r := range reqCh { - actualReqURIs = append(actualReqURIs, r.RequestURI) + actualReqURIs = append(actualReqURIs, r.HTTPRequest().RequestURI) } require.Equal(t, tc.expectedReqsURIs, actualReqURIs) }) @@ -283,7 +283,7 @@ func TestTagsIngesterRequest(t *testing.T) { } func TestTagsSearchSharderRoundTripBadRequest(t *testing.T) { - next := pipeline.AsyncRoundTripperFunc[combiner.PipelineResponse](func(r *http.Request) (pipeline.Responses[combiner.PipelineResponse], error) { + next := pipeline.AsyncRoundTripperFunc[combiner.PipelineResponse](func(_ pipeline.Request) (pipeline.Responses[combiner.PipelineResponse], error) { return nil, nil }) @@ -299,19 +299,19 @@ func TestTagsSearchSharderRoundTripBadRequest(t *testing.T) { // no org id req := httptest.NewRequest("GET", "/?start=1000&end=1100", nil) - resp, err := testRT.RoundTrip(req) + resp, err := testRT.RoundTrip(pipeline.NewHTTPRequest(req)) testBadRequestFromResponses(t, resp, err, "no org id") // start/end outside of max duration req = httptest.NewRequest("GET", "/?start=1000&end=1500", nil) req = req.WithContext(user.InjectOrgID(req.Context(), "blerg")) - resp, err = testRT.RoundTrip(req) + resp, err = testRT.RoundTrip(pipeline.NewHTTPRequest(req)) testBadRequestFromResponses(t, resp, err, "range specified by start and end exceeds 5m0s. received start=1000 end=1500") // bad request req = httptest.NewRequest("GET", "/?start=asdf&end=1500", nil) req = req.WithContext(user.InjectOrgID(req.Context(), "blerg")) - resp, err = testRT.RoundTrip(req) + resp, err = testRT.RoundTrip(pipeline.NewHTTPRequest(req)) testBadRequestFromResponses(t, resp, err, "invalid start: strconv.ParseInt: parsing \"asdf\": invalid syntax") // test max duration error with overrides @@ -333,6 +333,6 @@ func TestTagsSearchSharderRoundTripBadRequest(t *testing.T) { req = httptest.NewRequest("GET", "/?start=1000&end=1500", nil) req = req.WithContext(user.InjectOrgID(req.Context(), "blerg")) - resp, err = testRT.RoundTrip(req) + resp, err = testRT.RoundTrip(pipeline.NewHTTPRequest(req)) testBadRequestFromResponses(t, resp, err, "range specified by start and end exceeds 1m0s. received start=1000 end=1500") } diff --git a/modules/frontend/traceid_handlers.go b/modules/frontend/traceid_handlers.go index c86585f4883..c78ae54853f 100644 --- a/modules/frontend/traceid_handlers.go +++ b/modules/frontend/traceid_handlers.go @@ -19,7 +19,7 @@ import ( func newTraceIDHandler(cfg Config, o overrides.Interface, next pipeline.AsyncRoundTripper[combiner.PipelineResponse], logger log.Logger) http.RoundTripper { postSLOHook := traceByIDSLOPostHook(cfg.TraceByID.SLO) - return pipeline.RoundTripperFunc(func(req *http.Request) (*http.Response, error) { + return RoundTripperFunc(func(req *http.Request) (*http.Response, error) { tenant, err := user.ExtractOrgID(req.Context()) if err != nil { level.Error(logger).Log("msg", "trace id: failed to extract tenant id", "err", err) @@ -58,7 +58,6 @@ func newTraceIDHandler(cfg Config, o overrides.Interface, next pipeline.AsyncRou // enforce all communication internal to Tempo to be in protobuf bytes req.Header.Set(api.HeaderAccept, api.HeaderAcceptProtobuf) - prepareRequestForQueriers(req, tenant, req.RequestURI, nil) level.Info(logger).Log( "msg", "trace id request", diff --git a/modules/frontend/traceid_handlers_test.go b/modules/frontend/traceid_handlers_test.go index e19c9a77aff..103137902a6 100644 --- a/modules/frontend/traceid_handlers_test.go +++ b/modules/frontend/traceid_handlers_test.go @@ -12,7 +12,6 @@ import ( "github.com/gogo/protobuf/proto" "github.com/gorilla/mux" "github.com/grafana/dskit/user" - "github.com/grafana/tempo/modules/frontend/pipeline" "github.com/grafana/tempo/pkg/model/trace" "github.com/grafana/tempo/pkg/tempopb" "github.com/grafana/tempo/pkg/util/test" @@ -134,7 +133,7 @@ func TestTraceIDHandler(t *testing.T) { for _, tc := range tests { tc := tc // copy the test case to prevent race on the loop variable t.Run(tc.name, func(t *testing.T) { - next := pipeline.RoundTripperFunc(func(r *http.Request) (*http.Response, error) { + next := RoundTripperFunc(func(r *http.Request) (*http.Response, error) { var testTrace *tempopb.Trace var statusCode int var err error diff --git a/modules/frontend/traceid_sharder.go b/modules/frontend/traceid_sharder.go index a71aae98508..7de2e15da2b 100644 --- a/modules/frontend/traceid_sharder.go +++ b/modules/frontend/traceid_sharder.go @@ -39,7 +39,9 @@ func newAsyncTraceIDSharder(cfg *TraceByIDConfig, logger log.Logger) pipeline.As } // RoundTrip implements http.RoundTripper -func (s asyncTraceSharder) RoundTrip(r *http.Request) (pipeline.Responses[combiner.PipelineResponse], error) { +func (s asyncTraceSharder) RoundTrip(pipelineRequest pipeline.Request) (pipeline.Responses[combiner.PipelineResponse], error) { + r := pipelineRequest.HTTPRequest() + span, ctx := opentracing.StartSpanFromContext(r.Context(), "frontend.ShardQuery") defer span.Finish() r = r.WithContext(ctx) @@ -55,8 +57,8 @@ func (s asyncTraceSharder) RoundTrip(r *http.Request) (pipeline.Responses[combin concurrentShards = uint(s.cfg.ConcurrentShards) } - return pipeline.NewAsyncSharderFunc(ctx, int(concurrentShards), len(reqs), func(i int) *http.Request { - return reqs[i] + return pipeline.NewAsyncSharderFunc(ctx, int(concurrentShards), len(reqs), func(i int) pipeline.Request { + return pipeline.NewHTTPRequest(reqs[i]) }, s.next), nil } @@ -83,8 +85,9 @@ func (s *asyncTraceSharder) buildShardedRequests(ctx context.Context, parent *ht q.Add(querier.BlockEndKey, hex.EncodeToString(s.blockBoundaries[i])) q.Add(querier.QueryModeKey, querier.QueryModeBlocks) } + reqs[i].URL.RawQuery = q.Encode() - prepareRequestForQueriers(reqs[i], userID, reqs[i].URL.Path, q) + prepareRequestForQueriers(reqs[i], userID) } return reqs, nil