Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
* [ENHANCEMENT] Do deep validation for filter policies in user configurable overrides API [#6407](https://github.com/grafana/tempo/pull/6407) (@electron0zero)
* [ENHANCEMENT] Allow span_name_sanitization to be set via user-configurable overrides API [#6411](https://github.com/grafana/tempo/pull/6411) (@Logiraptor)
* [ENHANCEMENT] Add `fail_on_high_lag` parameter to allow live-store to fail if it is lagged [#6363](https://github.com/grafana/tempo/pull/6363) (@ruslan-mikhailov)
* [ENHANCEMENT] Add support for per-tenant left-padding of trace IDs [#6439](https://github.com/grafana/tempo/pull/6489) (@mapno)
* [BUGFIX] Force live-store to rehydrate from Kafka lookback period when local data is missing (e.g. PVC wipe, new node) instead of resuming from the committed consumer group offset [#6428](https://github.com/grafana/tempo/pull/6428) (@oleg-kozlyuk-grafana)
* [ENHANCEMENT] Add new metric for generator ring size: `tempo_distributor_metrics_generator_tenant_ring_size` [#5686](https://github.com/grafana/tempo/pull/5686) (@zalegrala)
* [BUGFIX] Fix query-frontend unable to convert dedicated column blob option [#6377](https://github.com/grafana/tempo/pull/6377) (@stoewer)
Expand Down
5 changes: 5 additions & 0 deletions docs/sources/tempo/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1910,6 +1910,11 @@ overrides:
# in the front-end configuration is used.
[max_metrics_duration: <duration> | default = 0s]

# Per-user option to left-pad trace IDs with zeros to 32 hex characters in search API responses.
# When enabled, trace IDs like "8efff798038103d269b633813fc703" will be returned as
# "008efff798038103d269b633813fc703" to comply with the OpenTelemetry and W3C Trace Context specifications.
[left_pad_trace_ids: <bool> | default = false]

# Compaction related overrides
compaction:
# Per-user block retention. If this value is set to 0 (default),
Expand Down
20 changes: 17 additions & 3 deletions modules/frontend/combiner/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/grafana/tempo/pkg/search"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/traceql"
"github.com/grafana/tempo/pkg/util"
)

// SearchJobResponse wraps shardtracker.JobMetadata and implements PipelineResponse.
Expand All @@ -32,7 +33,7 @@ var _ PipelineResponse = (*SearchJobResponse)(nil)
var _ GRPCCombiner[*tempopb.SearchResponse] = (*genericCombiner[*tempopb.SearchResponse])(nil)

// NewSearch returns a search combiner
func NewSearch(limit int, keepMostRecent bool, marshalingFormat api.MarshallingFormat) Combiner {
func NewSearch(limit int, keepMostRecent bool, marshalingFormat api.MarshallingFormat, padTraceIDs bool) Combiner {
metadataCombiner := traceql.NewMetadataCombiner(limit, keepMostRecent)
diffTraces := map[string]struct{}{}
completedThroughTracker := &shardtracker.CompletionTracker{}
Expand Down Expand Up @@ -80,6 +81,9 @@ func NewSearch(limit int, keepMostRecent bool, marshalingFormat api.MarshallingF
final.Traces = metadataCombiner.Metadata()
final.Metrics = metricsCombiner.Metrics
addRootSpanNotReceivedText(final.Traces)
if padTraceIDs {
padTraceIDsInResponse(final.Traces)
}
return final, nil
},
diff: func(current *tempopb.SearchResponse) (*tempopb.SearchResponse, error) {
Expand Down Expand Up @@ -113,6 +117,9 @@ func NewSearch(limit int, keepMostRecent bool, marshalingFormat api.MarshallingF
}

addRootSpanNotReceivedText(diff.Traces)
if padTraceIDs {
padTraceIDsInResponse(diff.Traces)
}

return diff, nil
},
Expand All @@ -138,6 +145,13 @@ func addRootSpanNotReceivedText(results []*tempopb.TraceSearchMetadata) {
}
}

func NewTypedSearch(limit int, keepMostRecent bool, marshalingFormat api.MarshallingFormat) GRPCCombiner[*tempopb.SearchResponse] {
return NewSearch(limit, keepMostRecent, marshalingFormat).(GRPCCombiner[*tempopb.SearchResponse])
func NewTypedSearch(limit int, keepMostRecent bool, marshalingFormat api.MarshallingFormat, padTraceIDs bool) GRPCCombiner[*tempopb.SearchResponse] {
return NewSearch(limit, keepMostRecent, marshalingFormat, padTraceIDs).(GRPCCombiner[*tempopb.SearchResponse])
}

// padTraceIDsInResponse left-pads all trace IDs in the given search metadata to 32 hex characters.
func padTraceIDsInResponse(traces []*tempopb.TraceSearchMetadata) {
for _, t := range traces {
t.TraceID = util.PadTraceIDString(t.TraceID)
}
}
112 changes: 97 additions & 15 deletions modules/frontend/combiner/search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,33 +26,33 @@ func TestSearchProgressShouldQuitAnyProtobuf(t *testing.T) {

func testSearchProgressShouldQuitAny(t *testing.T, marshalingFormat api.MarshallingFormat) {
// new combiner should not quit
c := NewSearch(0, false, marshalingFormat)
c := NewSearch(0, false, marshalingFormat, false)
should := c.ShouldQuit()
require.False(t, should)

// 500 response should quit
c = NewSearch(0, false, marshalingFormat)
c = NewSearch(0, false, marshalingFormat, false)
err := c.AddResponse(toHTTPResponseWithFormat(t, &tempopb.SearchResponse{}, 500, nil, marshalingFormat))
require.NoError(t, err)
should = c.ShouldQuit()
require.True(t, should)

// 429 response should quit
c = NewSearch(0, false, marshalingFormat)
c = NewSearch(0, false, marshalingFormat, false)
err = c.AddResponse(toHTTPResponseWithFormat(t, &tempopb.SearchResponse{}, 429, nil, marshalingFormat))
require.NoError(t, err)
should = c.ShouldQuit()
require.True(t, should)

// unparseable body should not quit, but should return an error
c = NewSearch(0, false, marshalingFormat)
c = NewSearch(0, false, marshalingFormat, false)
err = c.AddResponse(&testPipelineResponse{r: &http.Response{Body: io.NopCloser(strings.NewReader("foo")), StatusCode: 200}})
require.Error(t, err)
should = c.ShouldQuit()
require.False(t, should)

// under limit should not quit
c = NewSearch(2, false, marshalingFormat)
c = NewSearch(2, false, marshalingFormat, false)
err = c.AddResponse(toHTTPResponseWithFormat(t, &tempopb.SearchResponse{
Traces: []*tempopb.TraceSearchMetadata{
{
Expand All @@ -65,7 +65,7 @@ func testSearchProgressShouldQuitAny(t *testing.T, marshalingFormat api.Marshall
require.False(t, should)

// over limit should quit
c = NewSearch(1, false, marshalingFormat)
c = NewSearch(1, false, marshalingFormat, false)
err = c.AddResponse(toHTTPResponseWithFormat(t, &tempopb.SearchResponse{
Traces: []*tempopb.TraceSearchMetadata{
{
Expand All @@ -91,33 +91,33 @@ func TestSearchProgressShouldQuitMostRecentProtobuf(t *testing.T) {

func testSearchProgressShouldQuitMostRecent(t *testing.T, marshalingFormat api.MarshallingFormat) {
// new combiner should not quit
c := NewSearch(0, true, marshalingFormat)
c := NewSearch(0, true, marshalingFormat, false)
should := c.ShouldQuit()
require.False(t, should)

// 500 response should quit
c = NewSearch(0, true, marshalingFormat)
c = NewSearch(0, true, marshalingFormat, false)
err := c.AddResponse(toHTTPResponseWithFormat(t, &tempopb.SearchResponse{}, 500, nil, marshalingFormat))
require.NoError(t, err)
should = c.ShouldQuit()
require.True(t, should)

// 429 response should quit
c = NewSearch(0, true, marshalingFormat)
c = NewSearch(0, true, marshalingFormat, false)
err = c.AddResponse(toHTTPResponseWithFormat(t, &tempopb.SearchResponse{}, 429, nil, marshalingFormat))
require.NoError(t, err)
should = c.ShouldQuit()
require.True(t, should)

// unparseable body should not quit, but should return an error
c = NewSearch(0, true, marshalingFormat)
c = NewSearch(0, true, marshalingFormat, false)
err = c.AddResponse(&testPipelineResponse{r: &http.Response{Body: io.NopCloser(strings.NewReader("foo")), StatusCode: 200}})
require.Error(t, err)
should = c.ShouldQuit()
require.False(t, should)

// under limit should not quit
c = NewSearch(2, true, marshalingFormat)
c = NewSearch(2, true, marshalingFormat, false)
err = c.AddResponse(toHTTPResponseWithFormat(t, &tempopb.SearchResponse{
Traces: []*tempopb.TraceSearchMetadata{
{
Expand All @@ -130,7 +130,7 @@ func testSearchProgressShouldQuitMostRecent(t *testing.T, marshalingFormat api.M
require.False(t, should)

// over limit but no search job response, should not quit
c = NewSearch(1, true, marshalingFormat)
c = NewSearch(1, true, marshalingFormat, false)
err = c.AddResponse(toHTTPResponseWithFormat(t, &tempopb.SearchResponse{
Traces: []*tempopb.TraceSearchMetadata{
{
Expand Down Expand Up @@ -198,7 +198,7 @@ func testSearchCombinesResults(t *testing.T, marshalingFormat api.MarshallingFor
start := time.Date(1, 2, 3, 4, 5, 6, 7, time.UTC)
traceID := "traceID"

c := NewSearch(10, keepMostRecent, marshalingFormat)
c := NewSearch(10, keepMostRecent, marshalingFormat, false)
sr := toHTTPResponseWithFormat(t, &tempopb.SearchResponse{
Traces: []*tempopb.TraceSearchMetadata{
{
Expand Down Expand Up @@ -420,7 +420,7 @@ func testSearchResponseCombiner(t *testing.T, marshalingFormat api.MarshallingFo

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
combiner := NewTypedSearch(20, keepMostRecent, marshalingFormat)
combiner := NewTypedSearch(20, keepMostRecent, marshalingFormat, false)

err := combiner.AddResponse(tc.response1)
require.NoError(t, err)
Expand Down Expand Up @@ -691,7 +691,7 @@ func testCombinerShards(t *testing.T, marshalingFormat api.MarshallingFormat) {

// apply tests one at a time to the combiner and check expected results

combiner := NewTypedSearch(5, true, marshalingFormat)
combiner := NewTypedSearch(5, true, marshalingFormat, false)
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
if tc.pipelineResponse != nil {
Expand All @@ -705,3 +705,85 @@ func testCombinerShards(t *testing.T, marshalingFormat api.MarshallingFormat) {
})
}
}

func TestSearchCombinerPadTraceIDs(t *testing.T) {
tests := []struct {
name string
padTraceIDs bool
marshalingFmt api.MarshallingFormat
inputTraceIDs []string
expectedIDs []string
useDiff bool
}{
{
name: "padding enabled with JSON format",
padTraceIDs: true,
marshalingFmt: api.MarshallingFormatJSON,
inputTraceIDs: []string{"8efff798038103d269b633813fc703"},
expectedIDs: []string{"008efff798038103d269b633813fc703"},
},
{
name: "padding enabled with JSON format - 64-bit trace ID",
padTraceIDs: true,
marshalingFmt: api.MarshallingFormatJSON,
inputTraceIDs: []string{"1234567890abcdef"},
expectedIDs: []string{"00000000000000001234567890abcdef"},
},
{
name: "padding disabled with JSON format",
padTraceIDs: false,
marshalingFmt: api.MarshallingFormatJSON,
inputTraceIDs: []string{"8efff798038103d269b633813fc703"},
expectedIDs: []string{"8efff798038103d269b633813fc703"},
},
{
name: "padding enabled with protobuf format",
padTraceIDs: true,
marshalingFmt: api.MarshallingFormatProtobuf,
inputTraceIDs: []string{"8efff798038103d269b633813fc703"},
expectedIDs: []string{"008efff798038103d269b633813fc703"},
},
{
name: "padding enabled - already 32 chars",
padTraceIDs: true,
marshalingFmt: api.MarshallingFormatJSON,
inputTraceIDs: []string{"1234567890abcdef1234567890abcdef"},
expectedIDs: []string{"1234567890abcdef1234567890abcdef"},
},
{
name: "padding enabled via diff",
padTraceIDs: true,
marshalingFmt: api.MarshallingFormatJSON,
inputTraceIDs: []string{"8efff798038103d269b633813fc703"},
expectedIDs: []string{"008efff798038103d269b633813fc703"},
useDiff: true,
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
traces := make([]*tempopb.TraceSearchMetadata, 0, len(tc.inputTraceIDs))
for _, id := range tc.inputTraceIDs {
traces = append(traces, &tempopb.TraceSearchMetadata{TraceID: id})
}

c := NewTypedSearch(10, false, tc.marshalingFmt, tc.padTraceIDs)
err := c.AddResponse(toHTTPResponseWithFormat(t, &tempopb.SearchResponse{Traces: traces}, 200, nil, tc.marshalingFmt))
require.NoError(t, err)

var resp *tempopb.SearchResponse
if tc.useDiff {
resp, err = c.GRPCDiff()
} else {
resp, err = c.GRPCFinal()
}
require.NoError(t, err)
require.NotNil(t, resp)
require.Len(t, resp.Traces, len(tc.expectedIDs))

for i, expectedID := range tc.expectedIDs {
require.Equal(t, expectedID, resp.Traces[i].TraceID)
}
})
}
}
4 changes: 2 additions & 2 deletions modules/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func New(cfg Config, next pipeline.RoundTripper, o overrides.Interface, reader t

traces := newTraceIDHandler(cfg, tracePipeline, o, combiner.NewTypedTraceByID, logger, dataAccessController)
tracesV2 := newTraceIDV2Handler(cfg, tracePipeline, o, combiner.NewTypedTraceByIDV2, logger, dataAccessController)
search := newSearchHTTPHandler(cfg, searchPipeline, logger, dataAccessController)
search := newSearchHTTPHandler(cfg, searchPipeline, o, logger, dataAccessController)
searchTags := newTagsHTTPHandler(cfg, searchTagsPipeline, o, logger, dataAccessController)
searchTagsV2 := newTagsV2HTTPHandler(cfg, searchTagsPipeline, o, logger, dataAccessController)
searchTagValues := newTagValuesHTTPHandler(cfg, searchTagValuesPipeline, o, logger, dataAccessController)
Expand All @@ -281,7 +281,7 @@ func New(cfg Config, next pipeline.RoundTripper, o overrides.Interface, reader t
MetricsQueryRangeHandler: newHandler(cfg.Config.LogQueryRequestHeaders, queryRange, logger),

// grpc/streaming
streamingSearch: newSearchStreamingGRPCHandler(cfg, searchPipeline, apiPrefix, logger, dataAccessController),
streamingSearch: newSearchStreamingGRPCHandler(cfg, searchPipeline, apiPrefix, o, logger, dataAccessController),
streamingTags: newTagsStreamingGRPCHandler(cfg, searchTagsPipeline, apiPrefix, o, logger, dataAccessController),
streamingTagsV2: newTagsV2StreamingGRPCHandler(cfg, searchTagsPipeline, apiPrefix, o, logger, dataAccessController),
streamingTagValues: newTagValuesStreamingGRPCHandler(cfg, searchTagValuesPipeline, apiPrefix, o, logger, dataAccessController),
Expand Down
8 changes: 4 additions & 4 deletions modules/frontend/pipeline/responses_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func TestAsyncResponsesDoesNotLeak(t *testing.T) {
bridge := &pipelineBridge{
next: tc.finalRT(cancel),
}
httpCollector := NewHTTPCollector(sharder{next: bridge}, 0, combiner.NewSearch(0, false, api.HeaderAcceptJSON))
httpCollector := NewHTTPCollector(sharder{next: bridge}, 0, combiner.NewSearch(0, false, api.HeaderAcceptJSON, false))

_, _ = httpCollector.RoundTrip(req)

Expand All @@ -327,7 +327,7 @@ func TestAsyncResponsesDoesNotLeak(t *testing.T) {
bridge := &pipelineBridge{
next: tc.finalRT(cancel),
}
grpcCollector := NewGRPCCollector[*tempopb.SearchResponse](sharder{next: bridge}, 0, combiner.NewTypedSearch(0, false, api.HeaderAcceptJSON), func(_ *tempopb.SearchResponse) error { return nil })
grpcCollector := NewGRPCCollector[*tempopb.SearchResponse](sharder{next: bridge}, 0, combiner.NewTypedSearch(0, false, api.HeaderAcceptJSON, false), func(_ *tempopb.SearchResponse) error { return nil })

_ = grpcCollector.RoundTrip(req)

Expand All @@ -351,7 +351,7 @@ func TestAsyncResponsesDoesNotLeak(t *testing.T) {
}

s := sharder{next: sharder{next: bridge}, funcSharder: true}
grpcCollector := NewGRPCCollector[*tempopb.SearchResponse](s, 0, combiner.NewTypedSearch(0, false, api.HeaderAcceptJSON), func(_ *tempopb.SearchResponse) error { return nil })
grpcCollector := NewGRPCCollector[*tempopb.SearchResponse](s, 0, combiner.NewTypedSearch(0, false, api.HeaderAcceptJSON, false), func(_ *tempopb.SearchResponse) error { return nil })

_ = grpcCollector.RoundTrip(req)

Expand All @@ -374,7 +374,7 @@ func TestAsyncResponsesDoesNotLeak(t *testing.T) {
}

s := sharder{next: sharder{next: bridge, funcSharder: true}}
grpcCollector := NewGRPCCollector[*tempopb.SearchResponse](s, 0, combiner.NewTypedSearch(0, false, api.HeaderAcceptJSON), func(_ *tempopb.SearchResponse) error { return nil })
grpcCollector := NewGRPCCollector[*tempopb.SearchResponse](s, 0, combiner.NewTypedSearch(0, false, api.HeaderAcceptJSON, false), func(_ *tempopb.SearchResponse) error { return nil })

_ = grpcCollector.RoundTrip(req)

Expand Down
13 changes: 7 additions & 6 deletions modules/frontend/search_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@ import (
"github.com/grafana/tempo/modules/frontend/pipeline"
"google.golang.org/grpc/codes"

"github.com/grafana/tempo/modules/overrides"
"github.com/grafana/tempo/pkg/api"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/traceql"
)

// newSearchStreamingGRPCHandler returns a handler that streams results from the HTTP handler
func newSearchStreamingGRPCHandler(cfg Config, next pipeline.AsyncRoundTripper[combiner.PipelineResponse], apiPrefix string, logger log.Logger, dataAccessController DataAccessController) streamingSearchHandler {
func newSearchStreamingGRPCHandler(cfg Config, next pipeline.AsyncRoundTripper[combiner.PipelineResponse], apiPrefix string, o overrides.Interface, logger log.Logger, dataAccessController DataAccessController) streamingSearchHandler {
postSLOHook := searchSLOPostHook(cfg.Search.SLO)
downstreamPath := path.Join(apiPrefix, api.PathSearch)

Expand Down Expand Up @@ -54,7 +55,7 @@ func newSearchStreamingGRPCHandler(cfg Config, next pipeline.AsyncRoundTripper[c
tenant, _ := user.ExtractOrgID(ctx)
start := time.Now()

comb, err := newCombiner(req, cfg.Search.Sharder, api.MarshallingFormatProtobuf)
comb, err := newCombiner(req, cfg.Search.Sharder, api.MarshallingFormatProtobuf, o.LeftPadTraceIDs(tenant))
if err != nil {
level.Error(logger).Log("msg", "search streaming: could not create combiner", "err", err)
return status.Error(codes.InvalidArgument, err.Error())
Expand Down Expand Up @@ -82,7 +83,7 @@ func newSearchStreamingGRPCHandler(cfg Config, next pipeline.AsyncRoundTripper[c
}

// newSearchHTTPHandler returns a handler that returns a single response from the HTTP handler
func newSearchHTTPHandler(cfg Config, next pipeline.AsyncRoundTripper[combiner.PipelineResponse], logger log.Logger, dataAccessController DataAccessController) http.RoundTripper {
func newSearchHTTPHandler(cfg Config, next pipeline.AsyncRoundTripper[combiner.PipelineResponse], o overrides.Interface, logger log.Logger, dataAccessController DataAccessController) http.RoundTripper {
postSLOHook := searchSLOPostHook(cfg.Search.SLO)

return RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
Expand All @@ -109,7 +110,7 @@ func newSearchHTTPHandler(cfg Config, next pipeline.AsyncRoundTripper[combiner.P
// check marshalling format
marshallingFormat := api.MarshalingFormatFromAcceptHeader(req.Header)

comb, err := newCombiner(searchReq, cfg.Search.Sharder, marshallingFormat)
comb, err := newCombiner(searchReq, cfg.Search.Sharder, marshallingFormat, o.LeftPadTraceIDs(tenant))
if err != nil {
level.Error(logger).Log("msg", "search: could not create combiner", "err", err)
return httpInvalidRequest(err), nil
Expand All @@ -136,7 +137,7 @@ func newSearchHTTPHandler(cfg Config, next pipeline.AsyncRoundTripper[combiner.P
})
}

func newCombiner(req *tempopb.SearchRequest, cfg SearchSharderConfig, marshalingFormat api.MarshallingFormat) (combiner.GRPCCombiner[*tempopb.SearchResponse], error) {
func newCombiner(req *tempopb.SearchRequest, cfg SearchSharderConfig, marshalingFormat api.MarshallingFormat, padTraceIDs bool) (combiner.GRPCCombiner[*tempopb.SearchResponse], error) {
limit, err := adjustLimit(req.Limit, cfg.DefaultLimit, cfg.MaxLimit)
if err != nil {
return nil, err
Expand All @@ -155,7 +156,7 @@ func newCombiner(req *tempopb.SearchRequest, cfg SearchSharderConfig, marshaling
}
}

return combiner.NewTypedSearch(int(limit), mostRecent, marshalingFormat), nil
return combiner.NewTypedSearch(int(limit), mostRecent, marshalingFormat, padTraceIDs), nil
}

// adjusts the limit based on provided config
Expand Down
Loading
Loading