Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## main / unreleased

* [BUGFIX] Return Bad Request from frontend if the provided tag is invalid in SearchTagValuesV2 endpoint [#5493](https://github.com/grafana/tempo/pull/5493/) (@carles-grafana)
* [CHANGE] Add test for cost_attribution in user-configurable overrides API [#5481](https://github.com/grafana/tempo/pull/5481/) (@electron0zero)
* [CHANGE] Return Bad Request from all frontend endpoints if the tenant can't be extracted [#5480](https://github.com/grafana/tempo/pull/5480) (@carles-grafana)
* [CHANGE] **BREAKING CHANGE** Migrated Tempo Vulture and Integration Tests from the deprecated Jaeger agent/exporter to the standard OTLP exporter. Vulture now push traces to Tempo OTLP GRCP endpoint.[#5058](https://github.com/grafana/tempo/pull/5058) (@iamrajiv, @javiermolinar)
Expand Down
43 changes: 43 additions & 0 deletions integration/e2e/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,49 @@ func TestStreamingSearch_badRequest(t *testing.T) {
require.Equal(t, codes.InvalidArgument, st.Code())
}

func TestSearchTagValuesV2_badRequest(t *testing.T) {
s, err := e2e.NewScenario("tempo_e2e")
require.NoError(t, err)
defer s.Close()

require.NoError(t, util.CopyFileToSharedDir(s, configAllInOneLocal, "config.yaml"))
tempo := util.NewTempoAllInOne()
require.NoError(t, s.StartAndWaitReady(tempo))

// Test HTTP endpoint returns 400 for invalid tagName
invalidTagName := "app.user.id" // not a valid scoped attribute
req, err := http.NewRequest(http.MethodGet,
fmt.Sprintf("http://%s/api/v2/search/tag/%s/values", tempo.Endpoint(tempoPort), invalidTagName), nil)
require.NoError(t, err)

res, err := http.DefaultClient.Do(req)
require.NoError(t, err)
require.Equal(t, http.StatusBadRequest, res.StatusCode)

body, err := io.ReadAll(res.Body)
require.NoError(t, err)
defer res.Body.Close()

require.Contains(t, string(body), "tag name is not valid intrinsic or scoped attribute")

// Test gRPC endpoint returns InvalidArgument for invalid tagName
grpcClient, err := util.NewSearchGRPCClient(context.Background(), tempo.Endpoint(tempoPort))
require.NoError(t, err)

stream, err := grpcClient.SearchTagValuesV2(context.Background(), &tempopb.SearchTagValuesRequest{
TagName: invalidTagName,
})
require.NoError(t, err)

_, err = stream.Recv()
require.Error(t, err)

st, ok := status.FromError(err)
require.True(t, ok)
require.Equal(t, codes.InvalidArgument, st.Code())
require.Contains(t, st.Message(), "tag name is not valid intrinsic or scoped attribute")
}

func callSearchTagValuesV2AndAssert(t *testing.T, svc *e2e.HTTPService, tagName, query string, expected searchTagValuesV2Response, start, end int64) {
urlPath := fmt.Sprintf(`/api/v2/search/tag/%s/values?q=%s`, tagName, url.QueryEscape(query))

Expand Down
15 changes: 13 additions & 2 deletions modules/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,17 @@ func New(cfg Config, next pipeline.RoundTripper, o overrides.Interface, reader t
[]pipeline.Middleware{cacheWare, statusCodeWare, retryWare},
next)

searchTagValuesV2Pipeline := pipeline.Build(
[]pipeline.AsyncMiddleware[combiner.PipelineResponse]{
headerStripWare,
urlDenyListWare,
pipeline.NewWeightRequestWare(pipeline.Default, cfg.Weights),
multiTenantMiddleware(cfg, logger),
newAsyncTagSharder(reader, o, cfg.Search.Sharder, parseTagValuesRequestV2, logger),
},
[]pipeline.Middleware{cacheWare, statusCodeWare, retryWare},
next)

// metrics summary
metricsPipeline := pipeline.Build(
[]pipeline.AsyncMiddleware[combiner.PipelineResponse]{
Expand Down Expand Up @@ -199,7 +210,7 @@ func New(cfg Config, next pipeline.RoundTripper, o overrides.Interface, reader t
searchTags := newTagsHTTPHandler(cfg, searchTagsPipeline, o, logger)
searchTagsV2 := newTagsV2HTTPHandler(cfg, searchTagsPipeline, o, logger)
searchTagValues := newTagValuesHTTPHandler(cfg, searchTagValuesPipeline, o, logger)
searchTagValuesV2 := newTagValuesV2HTTPHandler(cfg, searchTagValuesPipeline, o, logger)
searchTagValuesV2 := newTagValuesV2HTTPHandler(cfg, searchTagValuesV2Pipeline, o, logger)
metrics := newMetricsSummaryHandler(metricsPipeline, logger)
queryInstant := newMetricsQueryInstantHTTPHandler(cfg, queryInstantPipeline, logger) // Reuses the same pipeline
queryRange := newMetricsQueryRangeHTTPHandler(cfg, queryRangePipeline, logger)
Expand All @@ -222,7 +233,7 @@ func New(cfg Config, next pipeline.RoundTripper, o overrides.Interface, reader t
streamingTags: newTagsStreamingGRPCHandler(cfg, searchTagsPipeline, apiPrefix, o, logger),
streamingTagsV2: newTagsV2StreamingGRPCHandler(cfg, searchTagsPipeline, apiPrefix, o, logger),
streamingTagValues: newTagValuesStreamingGRPCHandler(cfg, searchTagValuesPipeline, apiPrefix, o, logger),
streamingTagValuesV2: newTagValuesV2StreamingGRPCHandler(cfg, searchTagValuesPipeline, apiPrefix, o, logger),
streamingTagValuesV2: newTagValuesV2StreamingGRPCHandler(cfg, searchTagValuesV2Pipeline, apiPrefix, o, logger),
streamingQueryRange: newQueryRangeStreamingGRPCHandler(cfg, queryRangePipeline, apiPrefix, logger),
streamingQueryInstant: newQueryInstantStreamingGRPCHandler(cfg, queryRangePipeline, apiPrefix, logger), // Reuses the same pipeline

Expand Down
36 changes: 27 additions & 9 deletions modules/frontend/tag_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func TestFrontendTags(t *testing.T) {
runnerTagsV2BadRequestOnOrgID,
runnerTagValuesBadRequestOnOrgID,
runnerTagValuesV2BadRequestOnOrgID,
runnerTagValuesV2BadRequestOnInvalidTagName,
runnerTagsV2ClientCancelContext,
runnerTagValuesV2ClientCancelContext,
}
Expand Down Expand Up @@ -95,8 +96,8 @@ func runnerTagValuesBadRequestOnOrgID(t *testing.T, f *QueryFrontend) {

func runnerTagValuesV2BadRequestOnOrgID(t *testing.T, f *QueryFrontend) {
// http
httpReq := httptest.NewRequest("GET", "/api/v2/search/tag/foo/values", nil)
httpReq = mux.SetURLVars(httpReq, map[string]string{"tagName": "foo"})
httpReq := httptest.NewRequest("GET", "/api/v2/search/tag/span.name/values", nil)
httpReq = mux.SetURLVars(httpReq, map[string]string{"tagName": "span.name"})
httpResp := httptest.NewRecorder()
f.SearchTagsValuesV2Handler.ServeHTTP(httpResp, httpReq)
require.Equal(t, "no org id", httpResp.Body.String())
Expand All @@ -108,6 +109,23 @@ func runnerTagValuesV2BadRequestOnOrgID(t *testing.T, f *QueryFrontend) {
require.Equal(t, status.Error(codes.InvalidArgument, "no org id"), err)
}

func runnerTagValuesV2BadRequestOnInvalidTagName(t *testing.T, f *QueryFrontend) {
Comment thread
joe-elliott marked this conversation as resolved.
// http
httpReq := httptest.NewRequest("GET", "/api/v2/search/tag/app.user.id/values", nil)
httpReq = mux.SetURLVars(httpReq, map[string]string{"tagName": "app.user.id"})
httpReq = httpReq.WithContext(user.InjectOrgID(httpReq.Context(), "tenant"))
httpResp := httptest.NewRecorder()
f.SearchTagsValuesV2Handler.ServeHTTP(httpResp, httpReq)

require.Equal(t, http.StatusBadRequest, httpResp.Code)
require.Contains(t, httpResp.Body.String(), "tag name is not valid intrinsic or scoped attribute: app.user.id")

// grpc
grpcReq := &tempopb.SearchTagValuesRequest{TagName: "app.user.id"}
err := f.streamingTagValuesV2(grpcReq, newMockStreamingServer[*tempopb.SearchTagValuesV2Response]("tenant", nil))
require.Equal(t, status.Error(codes.InvalidArgument, "please provide a valid tagName: tag name is not valid intrinsic or scoped attribute: app.user.id"), err)
}

func runnerTagsV2ClientCancelContext(t *testing.T, f *QueryFrontend) {
// http
httpReq := httptest.NewRequest("GET", "/api/v2/search/tags", nil)
Expand Down Expand Up @@ -140,8 +158,8 @@ func runnerTagsV2ClientCancelContext(t *testing.T, f *QueryFrontend) {

func runnerTagValuesV2ClientCancelContext(t *testing.T, f *QueryFrontend) {
// http
httpReq := httptest.NewRequest("GET", "/api/v2/search/tag/foo/values", nil)
httpReq = mux.SetURLVars(httpReq, map[string]string{"tagName": "foo"})
httpReq := httptest.NewRequest("GET", "/api/v2/search/tag/span.name/values", nil)
httpReq = mux.SetURLVars(httpReq, map[string]string{"tagName": "span.name"})
httpResp := httptest.NewRecorder()

ctx, cancel := context.WithCancel(httpReq.Context())
Expand All @@ -165,7 +183,7 @@ func runnerTagValuesV2ClientCancelContext(t *testing.T, f *QueryFrontend) {
cancel()
}()
grpcReq := &tempopb.SearchTagValuesRequest{
TagName: "foo",
TagName: "span.name",
}
err := f.streamingTagValuesV2(grpcReq, srv)
require.Equal(t, status.Error(codes.Canceled, "context canceled"), err)
Expand Down Expand Up @@ -478,8 +496,8 @@ func TestSearchTagValuesV2FailurePropagatesFromQueriers(t *testing.T) {
},
}, nil)

httpReq := httptest.NewRequest("GET", "/api/v2/search/tag/foo/values?start=1&end=10000", nil)
httpReq = mux.SetURLVars(httpReq, map[string]string{"tagName": "foo"})
httpReq := httptest.NewRequest("GET", "/api/v2/search/tag/span.name/values?start=1&end=10000", nil)
httpReq = mux.SetURLVars(httpReq, map[string]string{"tagName": "span.name"})
httpResp := httptest.NewRecorder()

ctx := user.InjectOrgID(httpReq.Context(), "foo")
Expand Down Expand Up @@ -528,7 +546,7 @@ func TestSearchTagValuesV2FailurePropagatesFromQueriers(t *testing.T) {
// grpc
srv := newMockStreamingServer[*tempopb.SearchTagValuesV2Response]("bar", nil)
grpcReq := &tempopb.SearchTagValuesRequest{
TagName: "foo",
TagName: "span.name",
}
err := f.streamingTagValuesV2(grpcReq, srv)
require.Equal(t, tc.expectedErr, err)
Expand Down Expand Up @@ -691,7 +709,7 @@ func TestTagValuesCachedMetrics(t *testing.T) {

// setup query
tenant := "foo"
tagName := "service.name"
tagName := "resource.service.name"
hash := fnv1a.HashString64(tagName)
start := uint32(10)
end := uint32(20)
Expand Down
10 changes: 10 additions & 0 deletions modules/frontend/tag_sharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,16 @@ func parseTagValuesRequest(r *http.Request) (tagSearchReq, error) {
}, nil
}

func parseTagValuesRequestV2(r *http.Request) (tagSearchReq, error) {
searchReq, err := api.ParseSearchTagValuesRequestV2(r)
if err != nil {
return nil, err
}
return &tagValueSearchRequest{
request: *searchReq,
}, nil
}

type parseRequestFunction func(r *http.Request) (tagSearchReq, error)

type tagSearchReq interface {
Expand Down
Loading