Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
## main / unreleased

* [ENHANCEMENT] Improved live store readiness check and added `readiness_target_lag` and `readiness_max_wait` config parameters. Live store will now - if `readiness_target_lag` is set - not report `/ready` until Kafka lag is brought under the specified value [#6238](https://github.com/grafana/tempo/pull/6238) (@oleg-kozlyuk-grafana)
* [CHANGE] Allow duplicate dimensions for span metrics and service graphs. This is a valid use case if using different instrumentation libraries, with spans having "deployment.environment" and others "deployment_environment", for example. [#6288](https://github.com/grafana/tempo/pull/6288) (@carles-grafana)
* [CHANGE] Updade default max duration for traceql metrics queries up to one day [#6285](https://github.com/grafana/tempo/pull/6285) (@javiermolinar)
* [CHANGE] Set traceQL query metrics checks by default in Vulture [#6275](https://github.com/grafana/tempo/pull/6275) (@javiermolinar)
* [FEATURE] Add span_multiplier_key to overrides. This allows tenants to specify the attribute key used for span multiplier values to compensate for head-based sampling. [#6260](https://github.com/grafana/tempo/pull/6260) (@carles-grafana)
* [ENHANCEMENT] Double the maximum number of dedicated string columns in vParquet5 and update tempo-cli to determine the optimum number for the data [#6282](https://github.com/grafana/tempo/pull/6282) (@mdisibio)
* [ENHANCEMENT] Improved live store readiness check and added `readiness_target_lag` and `readiness_max_wait` config parameters. Live store will now - if `readiness_target_lag` is set - not report `/ready` until Kafka lag is brought under the specified value [#6238](https://github.com/grafana/tempo/pull/6238) (@oleg-kozlyuk-grafana)
* [ENHANCEMENT] Expose a new histogram metric to track the jobs per query distribution [#6343](https://github.com/grafana/tempo/pull/6343) (@javiermolinar)
* [BUGFIX] Correct avg_over_time calculation [#6252](https://github.com/grafana/tempo/pull/6252) (@ruslan-mikhailov)
* [BUGFIX] Correct instant query calculation for rate() [#6205](https://github.com/grafana/tempo/pull/6205) (@ruslan-mikhailov)
* [BUGFIX] Fix live-store deadlock occurring after a complete block failure [#6338](https://github.com/grafana/tempo/pull/6338) (@ruslan-mikhailov)
Expand Down
21 changes: 14 additions & 7 deletions modules/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/grafana/dskit/middleware"
"github.com/grafana/dskit/user"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/grafana/tempo/modules/frontend/combiner"
"github.com/grafana/tempo/modules/frontend/pipeline"
Expand Down Expand Up @@ -125,6 +126,12 @@ func New(cfg Config, next pipeline.RoundTripper, o overrides.Interface, reader t
return nil, fmt.Errorf("QueryBackendAfter (%v) must be greater than query end cutoff (%v)", cfg.Search.Sharder.QueryBackendAfter, cfg.QueryEndCutoff)
}

jobsPerQuery := promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{
Name: "tempo_query_frontend_jobs_per_query",
Help: "Number of planned jobs per query in the query frontend.",
Buckets: prometheus.ExponentialBuckets(1, 10, 7),
}, []string{"op"})

// Propagate RF1After to search and traceByID sharders
cfg.Search.Sharder.RF1After = cfg.RF1After
cfg.TraceByID.RF1After = cfg.RF1After
Expand All @@ -147,7 +154,7 @@ func New(cfg Config, next pipeline.RoundTripper, o overrides.Interface, reader t
pipeline.NewWeightRequestWare(pipeline.TraceByID, cfg.Weights),
multiTenantMiddleware(cfg, logger),
tenantValidatorWare,
newAsyncTraceIDSharder(&cfg.TraceByID, logger),
newAsyncTraceIDSharder(&cfg.TraceByID, jobsPerQuery, logger),
},
[]pipeline.Middleware{traceIDStatusCodeWare, retryWare},
next)
Expand All @@ -161,7 +168,7 @@ func New(cfg Config, next pipeline.RoundTripper, o overrides.Interface, reader t
pipeline.NewWeightRequestWare(pipeline.TraceQLSearch, cfg.Weights),
multiTenantMiddleware(cfg, logger),
tenantValidatorWare,
newAsyncSearchSharder(reader, o, cfg.Search.Sharder, logger),
newAsyncSearchSharder(reader, o, cfg.Search.Sharder, jobsPerQuery, logger),
},
[]pipeline.Middleware{cacheWare, statusCodeWare, retryWare},
next)
Expand All @@ -174,7 +181,7 @@ func New(cfg Config, next pipeline.RoundTripper, o overrides.Interface, reader t
pipeline.NewWeightRequestWare(pipeline.Default, cfg.Weights),
multiTenantMiddleware(cfg, logger),
tenantValidatorWare,
newAsyncTagSharder(reader, o, cfg.Search.Sharder, parseTagsRequest, logger),
newAsyncTagSharder(reader, o, cfg.Search.Sharder, parseTagsRequest, jobsPerQuery, logger),
},
[]pipeline.Middleware{cacheWare, statusCodeWare, retryWare},
next)
Expand All @@ -187,7 +194,7 @@ func New(cfg Config, next pipeline.RoundTripper, o overrides.Interface, reader t
pipeline.NewWeightRequestWare(pipeline.Default, cfg.Weights),
multiTenantMiddleware(cfg, logger),
tenantValidatorWare,
newAsyncTagSharder(reader, o, cfg.Search.Sharder, parseTagValuesRequest, logger),
newAsyncTagSharder(reader, o, cfg.Search.Sharder, parseTagValuesRequest, jobsPerQuery, logger),
},
[]pipeline.Middleware{cacheWare, statusCodeWare, retryWare},
next)
Expand All @@ -200,7 +207,7 @@ func New(cfg Config, next pipeline.RoundTripper, o overrides.Interface, reader t
pipeline.NewWeightRequestWare(pipeline.Default, cfg.Weights),
multiTenantMiddleware(cfg, logger),
tenantValidatorWare,
newAsyncTagSharder(reader, o, cfg.Search.Sharder, parseTagValuesRequestV2, logger),
newAsyncTagSharder(reader, o, cfg.Search.Sharder, parseTagValuesRequestV2, jobsPerQuery, logger),
},
[]pipeline.Middleware{cacheWare, statusCodeWare, retryWare},
next)
Expand Down Expand Up @@ -228,7 +235,7 @@ func New(cfg Config, next pipeline.RoundTripper, o overrides.Interface, reader t
pipeline.NewWeightRequestWare(pipeline.TraceQLMetrics, cfg.Weights),
multiTenantMiddleware(cfg, logger),
tenantValidatorWare,
newAsyncQueryRangeSharder(reader, o, cfg.Metrics.Sharder, false, logger),
newAsyncQueryRangeSharder(reader, o, cfg.Metrics.Sharder, false, jobsPerQuery, logger),
},
[]pipeline.Middleware{cacheWare, statusCodeWare, retryWare},
next)
Expand All @@ -242,7 +249,7 @@ func New(cfg Config, next pipeline.RoundTripper, o overrides.Interface, reader t
pipeline.NewWeightRequestWare(pipeline.TraceQLMetrics, cfg.Weights),
multiTenantMiddleware(cfg, logger),
tenantValidatorWare,
newAsyncQueryRangeSharder(reader, o, cfg.Metrics.Sharder, true, logger),
newAsyncQueryRangeSharder(reader, o, cfg.Metrics.Sharder, true, jobsPerQuery, logger),
},
[]pipeline.Middleware{cacheWare, statusCodeWare, retryWare},
next)
Expand Down
32 changes: 19 additions & 13 deletions modules/frontend/metrics_query_range_sharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,21 @@ import (
"github.com/grafana/tempo/pkg/validation"
"github.com/grafana/tempo/tempodb"
"github.com/grafana/tempo/tempodb/backend"
"github.com/prometheus/client_golang/prometheus"
)

const (
defaultStreamingShards = 200
)

type queryRangeSharder struct {
next pipeline.AsyncRoundTripper[combiner.PipelineResponse]
reader tempodb.Reader
overrides overrides.Interface
cfg QueryRangeSharderConfig
logger log.Logger
instantMode bool
next pipeline.AsyncRoundTripper[combiner.PipelineResponse]
reader tempodb.Reader
overrides overrides.Interface
cfg QueryRangeSharderConfig
logger log.Logger
instantMode bool
jobsPerQuery *prometheus.HistogramVec
}

type QueryRangeSharderConfig struct {
Expand All @@ -52,15 +54,16 @@ type QueryRangeSharderConfig struct {
}

// newAsyncQueryRangeSharder creates a sharding middleware for search
func newAsyncQueryRangeSharder(reader tempodb.Reader, o overrides.Interface, cfg QueryRangeSharderConfig, instantMode bool, logger log.Logger) pipeline.AsyncMiddleware[combiner.PipelineResponse] {
func newAsyncQueryRangeSharder(reader tempodb.Reader, o overrides.Interface, cfg QueryRangeSharderConfig, instantMode bool, jobsPerQuery *prometheus.HistogramVec, logger log.Logger) pipeline.AsyncMiddleware[combiner.PipelineResponse] {
return pipeline.AsyncMiddlewareFunc[combiner.PipelineResponse](func(next pipeline.AsyncRoundTripper[combiner.PipelineResponse]) pipeline.AsyncRoundTripper[combiner.PipelineResponse] {
return queryRangeSharder{
next: next,
reader: reader,
overrides: o,
instantMode: instantMode,
cfg: cfg,
logger: logger,
next: next,
reader: reader,
overrides: o,
instantMode: instantMode,
cfg: cfg,
logger: logger,
jobsPerQuery: jobsPerQuery,
}
})
}
Expand Down Expand Up @@ -150,6 +153,9 @@ func (s queryRangeSharder) RoundTrip(pipelineRequest pipeline.Request) (pipeline
span.SetAttributes(attribute.Int64("totalJobs", int64(jobMetadata.TotalJobs)))
span.SetAttributes(attribute.Int64("totalBlocks", int64(jobMetadata.TotalBlocks)))
span.SetAttributes(attribute.Int64("totalBlockBytes", int64(jobMetadata.TotalBytes)))
if s.jobsPerQuery != nil {
s.jobsPerQuery.WithLabelValues(metricsOp).Observe(float64(jobMetadata.TotalJobs))
}

return pipeline.NewAsyncSharderChan(ctx, s.cfg.ConcurrentRequests, reqCh, pipeline.NewAsyncResponse(jobMetadata), s.next), nil
}
Expand Down
15 changes: 10 additions & 5 deletions modules/frontend/search_sharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/grafana/tempo/pkg/validation"
"github.com/grafana/tempo/tempodb"
"github.com/grafana/tempo/tempodb/backend"
"github.com/prometheus/client_golang/prometheus"
)

const (
Expand Down Expand Up @@ -51,20 +52,22 @@ type asyncSearchSharder struct {
reader tempodb.Reader
overrides overrides.Interface

cfg SearchSharderConfig
logger log.Logger
cfg SearchSharderConfig
logger log.Logger
jobsPerQuery *prometheus.HistogramVec
}

// newAsyncSearchSharder creates a sharding middleware for search
func newAsyncSearchSharder(reader tempodb.Reader, o overrides.Interface, cfg SearchSharderConfig, logger log.Logger) pipeline.AsyncMiddleware[combiner.PipelineResponse] {
func newAsyncSearchSharder(reader tempodb.Reader, o overrides.Interface, cfg SearchSharderConfig, jobsPerQuery *prometheus.HistogramVec, logger log.Logger) pipeline.AsyncMiddleware[combiner.PipelineResponse] {
return pipeline.AsyncMiddlewareFunc[combiner.PipelineResponse](func(next pipeline.AsyncRoundTripper[combiner.PipelineResponse]) pipeline.AsyncRoundTripper[combiner.PipelineResponse] {
return asyncSearchSharder{
next: next,
reader: reader,
overrides: o,

cfg: cfg,
logger: logger,
cfg: cfg,
logger: logger,
jobsPerQuery: jobsPerQuery,
}
})
}
Expand Down Expand Up @@ -125,6 +128,8 @@ func (s asyncSearchSharder) RoundTrip(pipelineRequest pipeline.Request) (pipelin
s.logger.Log("msg", "search: failed to build backend requests", "err", err)
})

s.jobsPerQuery.WithLabelValues(searchOp).Observe(float64(jobMetrics.TotalJobs))

// execute requests
return pipeline.NewAsyncSharderChan(ctx, s.cfg.ConcurrentRequests, reqCh, pipeline.NewAsyncResponse(jobMetrics), s.next), nil
}
Expand Down
18 changes: 13 additions & 5 deletions modules/frontend/search_sharder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -818,7 +818,7 @@ func TestTotalJobsIncludesIngester(t *testing.T) {
TargetBytesPerRequest: defaultTargetBytesPerRequest,
MostRecentShards: defaultMostRecentShards,
IngesterShards: 1,
}, log.NewNopLogger())
}, newJobsPerQueryHistogram(), log.NewNopLogger())
testRT := sharder.Wrap(next)

path := fmt.Sprintf("/?start=%d&end=%d", now-1, now+1)
Expand Down Expand Up @@ -864,7 +864,7 @@ func TestSearchSharderRoundTripBadRequest(t *testing.T) {
MostRecentShards: defaultMostRecentShards,
MaxDuration: 5 * time.Minute,
MaxSpansPerSpanSet: 100,
}, log.NewNopLogger())
}, newJobsPerQueryHistogram(), log.NewNopLogger())
testRT := sharder.Wrap(next)

// no org id
Expand Down Expand Up @@ -904,7 +904,7 @@ func TestSearchSharderRoundTripBadRequest(t *testing.T) {
TargetBytesPerRequest: defaultTargetBytesPerRequest,
MostRecentShards: defaultMostRecentShards,
MaxDuration: 5 * time.Minute,
}, log.NewNopLogger())
}, newJobsPerQueryHistogram(), log.NewNopLogger())
testRT = sharder.Wrap(next)

req = httptest.NewRequest("GET", "/?start=1000&end=1500", nil)
Expand Down Expand Up @@ -980,6 +980,14 @@ func TestMaxDuration(t *testing.T) {
assert.Equal(t, 10*time.Minute, actual)
}

func newJobsPerQueryHistogram() *prometheus.HistogramVec {
return prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "test_query_frontend_jobs_per_query",
Help: "Test histogram for jobs per query.",
Buckets: prometheus.DefBuckets,
}, []string{"op"})
}

func TestHashTraceQLQuery(t *testing.T) {
// exact same queries should have the same hash
h1 := hashForSearchRequest(&tempopb.SearchRequest{Query: "{ span.foo = `bar` }"})
Expand Down Expand Up @@ -1238,7 +1246,7 @@ func TestSearchSharderReturnsConsistentShards(t *testing.T) {
MostRecentShards: mostRecentShards,
TargetBytesPerRequest: defaultTargetBytesPerRequest,
ConcurrentRequests: 5,
}, log.NewNopLogger())
}, newJobsPerQueryHistogram(), log.NewNopLogger())

// Create request with the test scenario time range
path := fmt.Sprintf("/?tags=service%%3Dapi&limit=100&start=%d&end=%d",
Expand Down Expand Up @@ -1383,7 +1391,7 @@ func TestDefaultSpansPerSpanSet(t *testing.T) {
TargetBytesPerRequest: defaultTargetBytesPerRequest,
DefaultSpansPerSpanSet: tc.configDefault,
MaxSpansPerSpanSet: tc.maxSpansPerSpanSet,
}, log.NewNopLogger())
}, newJobsPerQueryHistogram(), log.NewNopLogger())
testRT := sharder.Wrap(next)

// Build request URL
Expand Down
Loading