Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
/tempodb/encoding/benchmark_block
/cmd/tempo-serverless/vendor/
/pkg/traceql/y.output
private-key.key
private-key.key
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
## main / unreleased

* [FEATURE] Add support for Azure Workload Identity authentication [#2195](https://github.com/grafana/tempo/pull/2195) (@LambArchie)
* [ENHANCEMENT] Add Throughput and SLO Metrics with SLOConfig in Query Frontend [#2008](https://github.com/grafana/tempo/pull/2008) (@electron0zero)
- **BREAKING CHANGE** `query_frontend_result_metrics_inspected_bytes` metric removed in favour of `query_frontend_bytes_processed_per_second`
* [FEATURE] Add flag to check configuration [#2131](https://github.com/grafana/tempo/issues/2131) (@robertscherbarth @agrib-01)
* [FEATURE] Add flag to optionally enable all available Go runtime metrics [#2005](https://github.com/grafana/tempo/pull/2005) (@andreasgerstmayr)
* [FEATURE] Add support for span `kind` to TraceQL [#2217](https://github.com/grafana/tempo/pull/2217) (@joe-elliott)
Expand Down
14 changes: 14 additions & 0 deletions docs/sources/tempo/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,16 @@ query_frontend:
# (default: 30m)
[query_ingesters_until: <duration>]

# If set to a non-zero value, it's value will be used to decide if query is within SLO or not.
# Query is within SLO if it returned 200 within duration_slo seconds OR processed throughput_slo bytes/s data.
# NOTE: `duration_slo` and `throughput_bytes_slo` both must be configured for it to work
[duration_slo: <duration> | default = 0s ]

# If set to a non-zero value, it's value will be used to decide if query is within SLO or not.
# Query is within SLO if it returned 200 within duration_slo seconds OR processed throughput_slo bytes/s data.
[throughput_bytes_slo: <float> | default = 0 ]


# Trace by ID lookup configuration
trace_by_id:
# The number of shards to split a trace by id query into.
Expand All @@ -413,6 +423,10 @@ query_frontend:
# The maximum number of requests to execute when hedging.
# Requires hedge_requests_at to be set. Must be greater than 0.
[hedge_requests_up_to: <int> | default = 2 ]

# If set to a non-zero value, it's value will be used to decide if query is within SLO or not.
# Query is within SLO if it returned 200 within duration_slo seconds.
[duration_slo: <duration> | default = 0s ]
```

## Querier
Expand Down
17 changes: 15 additions & 2 deletions modules/frontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ package frontend

import (
"flag"
"time"

"net/http"
"time"

"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -28,19 +27,31 @@ type Config struct {

type SearchConfig struct {
Sharder SearchSharderConfig `yaml:",inline"`
SLO SLOConfig `yaml:",inline"`
}

type TraceByIDConfig struct {
QueryShards int `yaml:"query_shards,omitempty"`
Hedging HedgingConfig `yaml:",inline"`
SLO SLOConfig `yaml:",inline"`
Comment thread
mdisibio marked this conversation as resolved.
}

type HedgingConfig struct {
HedgeRequestsAt time.Duration `yaml:"hedge_requests_at"`
HedgeRequestsUpTo int `yaml:"hedge_requests_up_to"`
}

type SLOConfig struct {
DurationSLO time.Duration `yaml:"duration_slo,omitempty"`
ThroughputBytesSLO float64 `yaml:"throughput_bytes_slo,omitempty"`
}

func (cfg *Config) RegisterFlagsAndApplyDefaults(string, *flag.FlagSet) {
slo := SLOConfig{
DurationSLO: 0,
ThroughputBytesSLO: 0,
}

cfg.Config.MaxOutstandingPerTenant = 2000
cfg.MaxRetries = 2
cfg.TolerateFailedBlocks = 0
Expand All @@ -54,9 +65,11 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(string, *flag.FlagSet) {
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
},
SLO: slo,
}
cfg.TraceByID = TraceByIDConfig{
QueryShards: 50,
SLO: slo,
Hedging: HedgingConfig{
HedgeRequestsAt: 2 * time.Second,
HedgeRequestsUpTo: 2,
Expand Down
22 changes: 8 additions & 14 deletions modules/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ const (
type QueryFrontend struct {
TraceByID, Search http.Handler
logger log.Logger
queriesPerTenant *prometheus.CounterVec
Comment thread
electron0zero marked this conversation as resolved.
store storage.Store
}

Expand Down Expand Up @@ -70,21 +69,16 @@ func New(cfg Config, next http.RoundTripper, o *overrides.Overrides, store stora
traceByIDMiddleware := MergeMiddlewares(newTraceByIDMiddleware(cfg, logger), retryWare)
searchMiddleware := MergeMiddlewares(newSearchMiddleware(cfg, o, store, logger), retryWare)

traceByIDCounter := queriesPerTenant.MustCurryWith(prometheus.Labels{
"op": traceByIDOp,
})
searchCounter := queriesPerTenant.MustCurryWith(prometheus.Labels{
"op": searchOp,
})
traceByIDCounter := queriesPerTenant.MustCurryWith(prometheus.Labels{"op": traceByIDOp})
searchCounter := queriesPerTenant.MustCurryWith(prometheus.Labels{"op": searchOp})

traces := traceByIDMiddleware.Wrap(next)
search := searchMiddleware.Wrap(next)
return &QueryFrontend{
TraceByID: newHandler(traces, traceByIDCounter, logger),
Search: newHandler(search, searchCounter, logger),
logger: logger,
queriesPerTenant: queriesPerTenant,
store: store,
TraceByID: newHandler(traces, traceByIDCounter, logger),
Search: newHandler(search, searchCounter, logger),
logger: logger,
store: store,
}, nil
}

Expand All @@ -98,7 +92,7 @@ func newTraceByIDMiddleware(cfg Config, logger log.Logger) Middleware {
rt := NewRoundTripper(
next,
newDeduper(logger),
newTraceByIDSharder(cfg.TraceByID.QueryShards, cfg.TolerateFailedBlocks, logger),
newTraceByIDSharder(cfg.TraceByID.QueryShards, cfg.TolerateFailedBlocks, cfg.TraceByID.SLO, logger),
newHedgedRequestWare(cfg.TraceByID.Hedging),
)

Expand Down Expand Up @@ -181,7 +175,7 @@ func newTraceByIDMiddleware(cfg Config, logger log.Logger) Middleware {
func newSearchMiddleware(cfg Config, o *overrides.Overrides, reader tempodb.Reader, logger log.Logger) Middleware {
return MiddlewareFunc(func(next http.RoundTripper) http.RoundTripper {
ingesterSearchRT := next
backendSearchRT := NewRoundTripper(next, newSearchSharder(reader, o, cfg.Search.Sharder, logger))
backendSearchRT := NewRoundTripper(next, newSearchSharder(reader, o, cfg.Search.Sharder, cfg.Search.SLO, logger))

return RoundTripperFunc(func(r *http.Request) (*http.Response, error) {
// backend search queries require sharding so we pass through a special roundtripper
Expand Down
11 changes: 11 additions & 0 deletions modules/frontend/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ func TestFrontendRoundTripsSearch(t *testing.T) {
f, err := New(Config{
TraceByID: TraceByIDConfig{
QueryShards: minQueryShards,
SLO: testSLOcfg,
},
Search: SearchConfig{
Sharder: SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
},
SLO: testSLOcfg,
},
}, next, nil, nil, log.NewNopLogger(), nil)
require.NoError(t, err)
Expand All @@ -55,6 +57,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
},
SLO: testSLOcfg,
},
}, nil, nil, nil, log.NewNopLogger(), nil)
assert.EqualError(t, err, "frontend query shards should be between 2 and 256 (both inclusive)")
Expand All @@ -63,12 +66,14 @@ func TestFrontendBadConfigFails(t *testing.T) {
f, err = New(Config{
TraceByID: TraceByIDConfig{
QueryShards: maxQueryShards + 1,
SLO: testSLOcfg,
},
Search: SearchConfig{
Sharder: SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
},
SLO: testSLOcfg,
},
}, nil, nil, nil, log.NewNopLogger(), nil)
assert.EqualError(t, err, "frontend query shards should be between 2 and 256 (both inclusive)")
Expand All @@ -77,12 +82,14 @@ func TestFrontendBadConfigFails(t *testing.T) {
f, err = New(Config{
TraceByID: TraceByIDConfig{
QueryShards: maxQueryShards,
SLO: testSLOcfg,
},
Search: SearchConfig{
Sharder: SearchSharderConfig{
ConcurrentRequests: 0,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
},
SLO: testSLOcfg,
},
}, nil, nil, nil, log.NewNopLogger(), nil)
assert.EqualError(t, err, "frontend search concurrent requests should be greater than 0")
Expand All @@ -91,12 +98,14 @@ func TestFrontendBadConfigFails(t *testing.T) {
f, err = New(Config{
TraceByID: TraceByIDConfig{
QueryShards: maxQueryShards,
SLO: testSLOcfg,
},
Search: SearchConfig{
Sharder: SearchSharderConfig{
ConcurrentRequests: defaultConcurrentRequests,
TargetBytesPerRequest: 0,
},
SLO: testSLOcfg,
},
}, nil, nil, nil, log.NewNopLogger(), nil)
assert.EqualError(t, err, "frontend search target bytes per request should be greater than 0")
Expand All @@ -105,6 +114,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
f, err = New(Config{
TraceByID: TraceByIDConfig{
QueryShards: maxQueryShards,
SLO: testSLOcfg,
},
Search: SearchConfig{
Sharder: SearchSharderConfig{
Expand All @@ -113,6 +123,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
QueryIngestersUntil: time.Minute,
QueryBackendAfter: time.Hour,
},
SLO: testSLOcfg,
},
}, nil, nil, nil, log.NewNopLogger(), nil)
assert.EqualError(t, err, "query backend after should be less than or equal to query ingester until")
Expand Down
77 changes: 50 additions & 27 deletions modules/frontend/searchsharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"

Expand All @@ -30,12 +31,23 @@ const (
)

var (
metricInspectedBytes = promauto.NewHistogram(prometheus.HistogramOpts{
queryThroughput = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "tempo",
Name: "query_frontend_result_metrics_inspected_bytes",
Help: "Inspected Bytes in a search query",
Name: "query_frontend_bytes_processed_per_second",
Help: "Bytes processed per second in the query per tenant",
Buckets: prometheus.ExponentialBuckets(1024*1024, 2, 10), // from 1MB up to 1GB
})
}, []string{"tenant", "op"})

searchThroughput = queryThroughput.MustCurryWith(prometheus.Labels{"op": searchOp})

sloQueriesPerTenant = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "tempo",
Name: "query_frontend_queries_within_slo_total",
Help: "Total Queries within SLO per tenant",
}, []string{"tenant", "op"})

sloTraceByIDCounter = sloQueriesPerTenant.MustCurryWith(prometheus.Labels{"op": traceByIDOp})
sloSearchCounter = sloQueriesPerTenant.MustCurryWith(prometheus.Labels{"op": searchOp})
)

type searchSharder struct {
Expand All @@ -44,6 +56,7 @@ type searchSharder struct {
overrides *overrides.Overrides

cfg SearchSharderConfig
sloCfg SLOConfig
logger log.Logger
}

Expand All @@ -58,14 +71,15 @@ type SearchSharderConfig struct {
}

// newSearchSharder creates a sharding middleware for search
func newSearchSharder(reader tempodb.Reader, o *overrides.Overrides, cfg SearchSharderConfig, logger log.Logger) Middleware {
func newSearchSharder(reader tempodb.Reader, o *overrides.Overrides, cfg SearchSharderConfig, sloCfg SLOConfig, logger log.Logger) Middleware {
return MiddlewareFunc(func(next http.RoundTripper) http.RoundTripper {
return searchSharder{
next: next,
reader: reader,
overrides: o,
logger: logger,
cfg: cfg,
sloCfg: sloCfg,
logger: logger,
}
})
}
Expand All @@ -78,8 +92,6 @@ func newSearchSharder(reader tempodb.Reader, o *overrides.Overrides, cfg SearchS
// start=<unix epoch seconds>
// end=<unix epoch seconds>
func (s searchSharder) RoundTrip(r *http.Request) (*http.Response, error) {
// create search related query metrics here??
// this is the place where we agg search response.
searchReq, err := api.ParseSearchRequest(r)
if err != nil {
return &http.Response{
Expand All @@ -102,6 +114,7 @@ func (s searchSharder) RoundTrip(r *http.Request) (*http.Response, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "frontend.ShardSearch")
defer span.Finish()

reqStart := time.Now()
// sub context to cancel in-progress sub requests
subCtx, subCancel := context.WithCancel(ctx)
defer subCancel()
Expand Down Expand Up @@ -216,34 +229,35 @@ func (s searchSharder) RoundTrip(r *http.Request) (*http.Response, error) {

// print out request metrics
cancelledReqs := startedReqs - overallResponse.finishedRequests
level.Info(s.logger).Log(
"msg", "sharded search query request stats",
"raw_query", r.URL.RawQuery,
"total", len(reqs),
"started", startedReqs,
"finished", overallResponse.finishedRequests,
"cancelled", cancelledReqs)

// all goroutines have finished, we can safely access searchResults fields directly now
span.SetTag("inspectedBlocks", overallResponse.resultsMetrics.InspectedBlocks)
span.SetTag("skippedBlocks", overallResponse.resultsMetrics.SkippedBlocks)
span.SetTag("inspectedBytes", overallResponse.resultsMetrics.InspectedBytes)
span.SetTag("inspectedTraces", overallResponse.resultsMetrics.InspectedTraces)
span.SetTag("skippedTraces", overallResponse.resultsMetrics.SkippedTraces)
span.SetTag("totalBlockBytes", overallResponse.resultsMetrics.TotalBlockBytes)
reqTime := time.Since(reqStart)
throughput := float64(overallResponse.resultsMetrics.InspectedBytes) / reqTime.Seconds()
searchThroughput.WithLabelValues(tenantID).Observe(throughput)

query, _ := url.PathUnescape(r.URL.RawQuery)
span.SetTag("query", query)
level.Info(s.logger).Log(
"msg", "sharded search query SearchMetrics",
"raw_query", r.URL.RawQuery,
"msg", "sharded search query request stats and SearchMetrics",
"query", query,
Comment thread
joe-elliott marked this conversation as resolved.
"duration_seconds", reqTime,
"request_throughput", throughput,
"total_requests", len(reqs),
"started_requests", startedReqs,
"cancelled_requests", cancelledReqs,
"finished_requests", overallResponse.finishedRequests,
"inspectedBlocks", overallResponse.resultsMetrics.InspectedBlocks,
"skippedBlocks", overallResponse.resultsMetrics.SkippedBlocks,
"inspectedBytes", overallResponse.resultsMetrics.InspectedBytes,
"inspectedTraces", overallResponse.resultsMetrics.InspectedTraces,
"skippedTraces", overallResponse.resultsMetrics.SkippedTraces,
"totalBlockBytes", overallResponse.resultsMetrics.TotalBlockBytes)

// Collect inspectedBytes metrics
metricInspectedBytes.Observe(float64(overallResponse.resultsMetrics.InspectedBytes))
// all goroutines have finished, we can safely access searchResults fields directly now
span.SetTag("inspectedBlocks", overallResponse.resultsMetrics.InspectedBlocks)
span.SetTag("skippedBlocks", overallResponse.resultsMetrics.SkippedBlocks)
span.SetTag("inspectedBytes", overallResponse.resultsMetrics.InspectedBytes)
span.SetTag("inspectedTraces", overallResponse.resultsMetrics.InspectedTraces)
span.SetTag("skippedTraces", overallResponse.resultsMetrics.SkippedTraces)
span.SetTag("totalBlockBytes", overallResponse.resultsMetrics.TotalBlockBytes)

if overallResponse.err != nil {
return nil, overallResponse.err
Expand All @@ -266,6 +280,15 @@ func (s searchSharder) RoundTrip(r *http.Request) (*http.Response, error) {
return nil, err
}

// only record metric when it's enabled and within slo
if s.sloCfg.DurationSLO != 0 && s.sloCfg.ThroughputBytesSLO != 0 {
if reqTime < s.sloCfg.DurationSLO || throughput > s.sloCfg.ThroughputBytesSLO {
// query is within SLO if query returned 200 within DurationSLO seconds OR
// processed ThroughputBytesSLO bytes/s data
sloSearchCounter.WithLabelValues(tenantID).Inc()
}
}

return &http.Response{
StatusCode: http.StatusOK,
Header: http.Header{
Expand Down
Loading