Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
* [CHANGE] Deprecate metrics-generator no-local-blocks [#6707](https://github.com/grafana/tempo/pull/6707) (@javiermolinar)
* [CHANGE] Own local block and partition ring helpers [#6808](https://github.com/grafana/tempo/pull/6808) (@javiermolinar)
* [CHANGE] Track invalid trace and span id discards [#6799](https://github.com/grafana/tempo/pull/6799) (@javiermolinar)
* [CHANGE] Deprecate `query_frontend.rf1_after` and query all blocks regardless of replication factor for non-metrics paths. Simplifies 2.x to 3.0 migration. [#6969](https://github.com/grafana/tempo/pull/6969) (@mapno)
* [FEATURE] Add automemlimit support for automatic GOMEMLIMIT configuration. Enable with `memory.automemlimit_enabled: true`. [#6313](https://github.com/grafana/tempo/pull/6313) (@oleg-kozlyuk)
* [FEATURE] Support comparison operators in TraceQL Metrics queries [#6474](https://github.com/grafana/tempo/pull/6474) (@ruslan-mikhailov)
* [FEATURE] Add new include_any filter policy for spanmetrics filter [#6392](https://github.com/grafana/tempo/pull/6392) (@javiermolinar)
Expand Down
7 changes: 0 additions & 7 deletions cmd/tempo-vulture/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"time"

"github.com/go-test/deep"
"github.com/grafana/tempo/pkg/api"
zaplogfmt "github.com/jsternberg/zap-logfmt"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"
Expand Down Expand Up @@ -45,7 +44,6 @@ var (
tempoRecentTracesCutoffDuration time.Duration
tempoPushTLS bool

rf1After time.Time
tempoQueryLiveStores bool
logger *zap.Logger

Expand Down Expand Up @@ -134,7 +132,6 @@ func init() {
flag.IntVar(&validationCycles, "validation-cycles", 3, "Number of write/read cycles to perform in validation mode")
flag.DurationVar(&validationTimeout, "validation-timeout", 5*time.Minute, "Maximum time to run validation mode before timing out")

flag.Var(newTimeVar(&rf1After), "rhythm-rf1-after", "Timestamp (RFC3339) after which only blocks with RF==1 are included in search and ID lookups")
flag.BoolVar(&tempoQueryLiveStores, "tempo-query-livestore", false, "When to query live stores")
}

Expand Down Expand Up @@ -174,10 +171,6 @@ func main() {
panic(err)
}

if !rf1After.IsZero() {
httpClient.SetQueryParam(api.URLParamRF1After, rf1After.Format(time.RFC3339))
}

tickerWrite, tickerRead, tickerSearch, tickerMetrics, err := initTickers(
vultureConfig.tempoWriteBackoffDuration,
vultureConfig.tempoReadBackoffDuration,
Expand Down
7 changes: 7 additions & 0 deletions cmd/tempo/app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,13 @@
warnings = append(warnings, warnPartitionAssigmentCollision)
}

if !c.Frontend.RF1After.IsZero() {
warnings = append(warnings, ConfigWarning{
Message: "query_frontend.rf1_after is deprecated and will be removed in a future release.",
Comment thread
mattdurham marked this conversation as resolved.
Explain: "Non-metric query paths now query all blocks regardless of replication factor. This setting is ignored.",
})
}

Check notice on line 247 in cmd/tempo/app/config.go

View workflow job for this annotation

GitHub Actions / Coverage Annotations

Uncovered lines

Lines 243-247 are not covered by tests

return warnings
}

Expand Down
1 change: 0 additions & 1 deletion docs/sources/tempo/configuration/manifest.md
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,6 @@ query_frontend:
mcp_server:
enabled: false
max_query_expression_size_bytes: 131072
Comment thread
mapno marked this conversation as resolved.
rf1_after: 0001-01-01T00:00:00Z
metrics_generator:
ring:
kvstore:
Expand Down
1 change: 0 additions & 1 deletion example/docker-compose/debug/tempo.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ metrics_generator:
send_exemplars: true

query_frontend:
rf1_after: "1999-01-01T00:00:00Z"
mcp_server:
enabled: true

Expand Down
1 change: 0 additions & 1 deletion example/docker-compose/distributed/tempo.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ metrics_generator:
send_exemplars: true

query_frontend:
rf1_after: "1999-01-01T00:00:00Z"
mcp_server:
enabled: true

Expand Down
1 change: 0 additions & 1 deletion example/docker-compose/multitenant/tempo.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ querier:
metrics_generator:

query_frontend:
rf1_after: "1999-01-01T00:00:00Z"
mcp_server:
enabled: true

Expand Down
1 change: 0 additions & 1 deletion example/docker-compose/single-binary/tempo.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ metrics_generator:
send_exemplars: true

query_frontend:
rf1_after: "1999-01-01T00:00:00Z"
mcp_server:
enabled: true

Expand Down
3 changes: 0 additions & 3 deletions integration/util/config-base.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@ overrides:
metrics_generator:
processors: [service-graphs, span-metrics]

query_frontend:
rf1_after: "2025-01-01T00:00:00Z"

distributor:
receivers:
otlp:
Expand Down
6 changes: 1 addition & 5 deletions modules/frontend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type Config struct {
AllowedHeaders []string `yaml:"allowed_headers,omitempty"`

// RF1After specifies the time after which RF1 logic is applied.
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The field-level comment for RF1After still says it "specifies the time after which RF1 logic is applied", but the same line now marks it as deprecated/ignored. Can we update the comment to avoid implying it still affects behavior (and maybe point users to the new behavior: non-metrics query paths ignore RF entirely)?

Suggested change
// RF1After specifies the time after which RF1 logic is applied.
// RF1After is deprecated and ignored. Non-metrics query paths ignore RF entirely.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's marked as deprecated just below.

RF1After time.Time `yaml:"rf1_after" category:"advanced"`
RF1After time.Time `yaml:"rf1_after,omitempty" category:"advanced"` // Deprecated: it's ignored

// QueryEndCutoff prevents querying incomplete recent data.
QueryEndCutoff time.Duration `yaml:"query_end_cutoff,omitempty"`
Expand All @@ -63,10 +63,6 @@ type TraceByIDConfig struct {
ConcurrentShards int `yaml:"concurrent_shards,omitempty"`
SLO SLOConfig `yaml:",inline"`
ExternalEnabled bool `yaml:"external_enabled,omitempty"`

// RF1After specifies the time after which RF1 logic is applied, injected by the configuration
// or determined at runtime based on search request parameters.
RF1After time.Time `yaml:"-"`
}

type MetricsConfig struct {
Expand Down
6 changes: 1 addition & 5 deletions modules/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,6 @@ func New(cfg Config, next pipeline.RoundTripper, o overrides.Interface, reader t
NativeHistogramMinResetDuration: 1 * time.Hour,
}, []string{"op"})

// Propagate RF1After to search and traceByID sharders
cfg.Search.Sharder.RF1After = cfg.RF1After
cfg.TraceByID.RF1After = cfg.RF1After

adjustEndWareSeconds := pipeline.NewAdjustStartEndWare(cfg.Search.Sharder.QueryBackendAfter, cfg.QueryEndCutoff, false)
adjustEndWareNanos := pipeline.NewAdjustStartEndWare(cfg.Metrics.Sharder.QueryBackendAfter, cfg.QueryEndCutoff, true) // metrics queries work in nanoseconds
retryWare := pipeline.NewRetryWare(cfg.MaxRetries, cfg.Weights.RetryWithWeights, registerer)
Expand Down Expand Up @@ -380,7 +376,7 @@ func blockMetasForSearch(allBlocks []*backend.BlockMeta, start, end time.Time, f
// block start is before or equal to search end AND block end is after or equal to search start
if !m.StartTime.After(end) && // block start <= search end
!m.EndTime.Before(start) && // block end >= search start
filterFn(m) { // This check skips generator blocks (RF=1)
filterFn(m) {
blocks = append(blocks, m)
}
}
Expand Down
28 changes: 14 additions & 14 deletions modules/frontend/search_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func runnerRequests(t *testing.T, f *QueryFrontend) {
expectedErr error
}{
{
name: "access 2 blocks x 2 jobs = 4",
name: "access 4 blocks x 2 jobs = 8",
request: &tempopb.SearchRequest{
Query: "{resource.service.name = `test`}",
Start: 1,
Expand All @@ -180,12 +180,12 @@ func runnerRequests(t *testing.T, f *QueryFrontend) {
StartTimeUnixNano: math.MaxUint64,
}},
Metrics: &tempopb.SearchMetrics{
InspectedTraces: 4,
InspectedBytes: 4,
TotalBlocks: 2,
TotalJobs: 4,
TotalBlockBytes: 4 * defaultTargetBytesPerRequest,
CompletedJobs: 4,
InspectedTraces: 8,
InspectedBytes: 8,
TotalBlocks: 4,
TotalJobs: 8,
TotalBlockBytes: 8 * defaultTargetBytesPerRequest,
CompletedJobs: 8,
},
},
},
Expand All @@ -202,7 +202,7 @@ func runnerRequests(t *testing.T, f *QueryFrontend) {
expectedErr: status.Error(codes.InvalidArgument, "invalid TraceQL query: parse error at line 1, col 1: syntax error: unexpected IDENTIFIER"),
},
{
name: "multitenant - 4 jobs x 2 tenants = 8",
name: "multitenant - 8 jobs x 2 tenants = 16",
tenant: "tenant-1|tenant-2",
request: &tempopb.SearchRequest{
Query: "{resource.service.name = `test`}",
Expand All @@ -218,12 +218,12 @@ func runnerRequests(t *testing.T, f *QueryFrontend) {
StartTimeUnixNano: math.MaxUint64,
}},
Metrics: &tempopb.SearchMetrics{
InspectedTraces: 8,
InspectedBytes: 8,
TotalBlocks: 4,
TotalJobs: 8,
TotalBlockBytes: 8 * defaultTargetBytesPerRequest,
CompletedJobs: 8,
InspectedTraces: 16,
InspectedBytes: 16,
TotalBlocks: 8,
TotalJobs: 16,
TotalBlockBytes: 16 * defaultTargetBytesPerRequest,
CompletedJobs: 16,
},
},
},
Expand Down
12 changes: 1 addition & 11 deletions modules/frontend/search_sharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ type SearchSharderConfig struct {
MostRecentShards int `yaml:"most_recent_shards,omitempty"`
DefaultSpansPerSpanSet uint32 `yaml:"default_spans_per_span_set,omitempty"`
MaxSpansPerSpanSet uint32 `yaml:"max_spans_per_span_set,omitempty"`

// RF1After specifies the time after which RF1 logic is applied, injected by the configuration
// or determined at runtime based on search request parameters.
RF1After time.Time `yaml:"-"`
}

type asyncSearchSharder struct {
Expand Down Expand Up @@ -154,13 +150,7 @@ func (s *asyncSearchSharder) backendRequests(ctx context.Context, tenantID strin
startT := time.Unix(int64(start), 0)
endT := time.Unix(int64(end), 0)

// Use RF1After from the request if it's not zero, otherwise use the config value
rf1After := searchReq.RF1After
if rf1After.IsZero() {
rf1After = s.cfg.RF1After
}

blocks := blockMetasForSearch(s.reader.BlockMetas(tenantID), startT, endT, rf1FilterFn(rf1After))
blocks := blockMetasForSearch(s.reader.BlockMetas(tenantID), startT, endT, acceptAllBlocks)

// calculate metrics to return to the caller
resp.TotalBlocks = len(blocks)
Expand Down
32 changes: 0 additions & 32 deletions modules/frontend/search_sharder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1156,38 +1156,6 @@ func TestBackendShards(t *testing.T) {
}
}

func TestRF1After(t *testing.T) {
// Define a set of block metadata with different replication factors and time ranges
blockMetas := []*backend.BlockMeta{
{StartTime: time.Unix(100, 0), EndTime: time.Unix(200, 0), ReplicationFactor: backend.DefaultReplicationFactor},
{StartTime: time.Unix(100, 0), EndTime: time.Unix(200, 0), ReplicationFactor: backend.MetricsGeneratorReplicationFactor},
{StartTime: time.Unix(200, 0), EndTime: time.Unix(300, 0), ReplicationFactor: backend.DefaultReplicationFactor},
{StartTime: time.Unix(200, 0), EndTime: time.Unix(300, 0), ReplicationFactor: backend.MetricsGeneratorReplicationFactor},
}

// Create a request for processing, including a query string that specifies `rf1After` as a filter
r := httptest.NewRequest("GET", "/?q={}&rf1After=1970-01-01T00:01:30Z&bar&limit=50&start=50&end=300", nil)
searchReq, err := api.ParseSearchRequest(r)
require.NoError(t, err)

ctx, cancelCause := context.WithCancelCause(context.Background())
pipelineRequest := pipeline.NewHTTPRequest(r)

// Initialize the search sharder with mock metadata for testing
s := &asyncSearchSharder{
cfg: SearchSharderConfig{
MostRecentShards: defaultMostRecentShards,
},
}
s.reader = &mockReader{metas: blockMetas}

// Execute backend requests and validate the result
searchJobResponse := &combiner.SearchJobResponse{}
s.backendRequests(ctx, "test", pipelineRequest, searchReq, searchJobResponse, make(chan pipeline.Request), cancelCause)

require.Equal(t, 2, searchJobResponse.TotalBlocks) // Verify the expected number of blocks after filtering
}

func TestSearchSharderReturnsConsistentShards(t *testing.T) {
now := time.Now()

Expand Down
13 changes: 1 addition & 12 deletions modules/frontend/tag_sharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ func (r *tagsSearchRequest) keyPrefix() string {
return cacheKeyPrefixSearchTag
}

func (r *tagsSearchRequest) rf1After() time.Time { return r.request.RF1After }

func (r *tagsSearchRequest) newWithRange(start, end uint32) tagSearchReq {
newReq := r.request
newReq.Start = start
Expand Down Expand Up @@ -96,8 +94,6 @@ func (r *tagValueSearchRequest) keyPrefix() string {
return cacheKeyPrefixSearchTagValues
}

func (r *tagValueSearchRequest) rf1After() time.Time { return r.request.RF1After }

func (r *tagValueSearchRequest) newWithRange(start, end uint32) tagSearchReq {
newReq := r.request
newReq.Start = start
Expand Down Expand Up @@ -170,8 +166,6 @@ type tagSearchReq interface {
// should only be based on the content the request is searching for
hash() uint64
keyPrefix() string

rf1After() time.Time
}

type searchTagSharder struct {
Expand Down Expand Up @@ -273,15 +267,10 @@ func (s searchTagSharder) backendRequests(ctx context.Context, tenantID string,
return 0
}

rf1After := searchReq.rf1After()
if rf1After.IsZero() {
rf1After = s.cfg.RF1After
}

// get block metadata of blocks in start, end duration
startT := time.Unix(int64(start), 0)
endT := time.Unix(int64(end), 0)
blocks := blockMetasForSearch(s.reader.BlockMetas(tenantID), startT, endT, rf1FilterFn(rf1After))
blocks := blockMetasForSearch(s.reader.BlockMetas(tenantID), startT, endT, acceptAllBlocks)

targetBytesPerRequest := s.cfg.TargetBytesPerRequest
// the callback function is nil, so we will use it just for counting the total number of jobs
Expand Down
2 changes: 0 additions & 2 deletions modules/frontend/tag_sharder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ func (r *fakeReq) keyPrefix() string {
return ""
}

func (r *fakeReq) rf1After() time.Time { return time.Time{} }

func (r *fakeReq) buildSearchTagRequest(subR *http.Request) (*http.Request, error) {
newReq := subR.Clone(subR.Context())
q := subR.URL.Query()
Expand Down
4 changes: 2 additions & 2 deletions modules/frontend/traceid_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func newTraceIDHandler(cfg Config, next pipeline.AsyncRoundTripper[combiner.Pipe
}

// validate start and end parameter
_, _, _, _, _, _, reqErr := api.ValidateAndSanitizeRequest(req)
_, _, _, _, _, reqErr := api.ValidateAndSanitizeRequest(req)
if reqErr != nil {
return httpInvalidRequest(reqErr), nil
}
Expand Down Expand Up @@ -101,7 +101,7 @@ func newTraceIDV2Handler(cfg Config, next pipeline.AsyncRoundTripper[combiner.Pi
}

// validate start and end parameter
_, _, _, _, _, _, reqErr := api.ValidateAndSanitizeRequest(req)
_, _, _, _, _, reqErr := api.ValidateAndSanitizeRequest(req)
if reqErr != nil {
return httpInvalidRequest(reqErr), nil
}
Expand Down
9 changes: 0 additions & 9 deletions modules/frontend/traceid_sharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package frontend
import (
"encoding/hex"
"net/http"
"time"

"github.com/go-kit/log" //nolint:all //deprecated
"github.com/grafana/tempo/modules/frontend/combiner"
Expand Down Expand Up @@ -100,13 +99,6 @@ func (s *asyncTraceSharder) buildShardedRequests(parent pipeline.Request) ([]pip
}
reqs = append(reqs, req)

var rf1After string
if val := parent.HTTPRequest().URL.Query().Get(api.URLParamRF1After); val != "" {
rf1After = val
} else if !s.cfg.RF1After.IsZero() {
rf1After = s.cfg.RF1After.Format(time.RFC3339)
}

// Job 1: external job (if enabled)
if s.cfg.ExternalEnabled {
req, err = cloneRequestforQueriers(parent, userID, func(r *http.Request) (*http.Request, error) {
Expand All @@ -130,7 +122,6 @@ func (s *asyncTraceSharder) buildShardedRequests(parent pipeline.Request) ([]pip
params[querier.BlockStartKey] = hex.EncodeToString(s.blockBoundaries[i-1])
params[querier.BlockEndKey] = hex.EncodeToString(s.blockBoundaries[i])
params[querier.QueryModeKey] = querier.QueryModeBlocks
params[api.URLParamRF1After] = rf1After

return api.BuildQueryRequest(r, params), nil
})
Expand Down
12 changes: 1 addition & 11 deletions modules/frontend/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"io"
"net/http"
"strings"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand All @@ -26,13 +25,4 @@ func extractTenant(req *http.Request, logger log.Logger) (string, *http.Response
return tenant, nil
}

func rf1FilterFn(rf1After time.Time) func(m *backend.BlockMeta) bool {
return func(m *backend.BlockMeta) bool {
if rf1After.IsZero() {
return m.ReplicationFactor == backend.DefaultReplicationFactor
}

return (m.ReplicationFactor == backend.DefaultReplicationFactor && m.StartTime.Before(rf1After)) ||
(m.ReplicationFactor == backend.MetricsGeneratorReplicationFactor && m.StartTime.After(rf1After))
}
}
func acceptAllBlocks(_ *backend.BlockMeta) bool { return true }
Loading
Loading