Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ configurable via the throughput_bytes_slo field, and it will populate op="traces
* [BUGFIX] Fix memcached settings for docker compose example [#4346](https://github.com/grafana/tempo/pull/4695) (@ruslan-mikhailov)
* [BUGFIX] Fix setting processors in user configurations overrides via API [#4741](https://github.com/grafana/tempo/pull/4741) (@ruslan-mikhailov)
* [BUGFIX] Fix panic on startup [#4744](https://github.com/grafana/tempo/pull/4744) (@ruslan-mikhailov)
* [BUGFIX] Fix intrinsic tag lookups dropped when max tag lookup response size is exceeded [#4784](https://github.com/grafana/tempo/pull/4784) (@mdisibio)
* [BUGFIX] Correctly cache frontend jobs for query range (TraceQL Metrics). [#4771](https://github.com/grafana/tempo/pull/4771) (@joe-elliott)

# v2.7.1
Expand Down
14 changes: 7 additions & 7 deletions integration/e2e/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/grafana/e2e"
"github.com/grafana/tempo/integration/util"
"github.com/grafana/tempo/pkg/search"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -652,11 +653,12 @@ func callSearchTagValuesV2AndAssert(t *testing.T, svc *e2e.HTTPService, tagName,
func callSearchTagsV2AndAssert(t *testing.T, svc *e2e.HTTPService, scope, query string, expected searchTagsV2Response, start, end int64) {
urlPath := fmt.Sprintf(`/api/v2/search/tags?scope=%s&q=%s`, scope, url.QueryEscape(query))

// expected will not have the intrinsic scope since it's the same every time, add it here.
// Expected will not have the intrinsic results to make the tests simpler,
// they are added here based on the scope.
if scope == "none" || scope == "" || scope == "intrinsic" {
expected.Scopes = append(expected.Scopes, ScopedTags{
Name: "intrinsic",
Tags: []string{"duration", "event:name", "event:timeSinceStart", "instrumentation:name", "instrumentation:version", "kind", "name", "rootName", "rootServiceName", "span:duration", "span:kind", "span:name", "span:status", "span:statusMessage", "status", "statusMessage", "trace:duration", "trace:rootName", "trace:rootService", "traceDuration"},
Tags: search.GetVirtualIntrinsicValues(),
})
}
sort.Slice(expected.Scopes, func(i, j int) bool { return expected.Scopes[i].Name < expected.Scopes[j].Name })
Expand Down Expand Up @@ -776,9 +778,7 @@ func callSearchTagsAndAssert(t *testing.T, svc *e2e.HTTPService, expected search
// parse response
var response searchTagsResponse
require.NoError(t, json.Unmarshal(body, &response))
sort.Strings(response.TagNames)
sort.Strings(expected.TagNames)
require.Equal(t, expected.TagNames, response.TagNames)
require.ElementsMatch(t, expected.TagNames, response.TagNames)
assertMetrics(t, response.Metrics, len(response.TagNames))

// streaming
Expand Down Expand Up @@ -808,8 +808,8 @@ func callSearchTagsAndAssert(t *testing.T, svc *e2e.HTTPService, expected search
if grpcResp.TagNames == nil {
grpcResp.TagNames = []string{}
}
sort.Slice(grpcResp.TagNames, func(i, j int) bool { return grpcResp.TagNames[i] < grpcResp.TagNames[j] })
require.Equal(t, expected.TagNames, grpcResp.TagNames)

require.ElementsMatch(t, expected.TagNames, grpcResp.TagNames)
// assert metrics, and make sure it's non-zero when response is non-empty
if len(grpcResp.TagNames) > 0 {
require.Greater(t, grpcResp.Metrics.InspectedBytes, uint64(100))
Expand Down
4 changes: 2 additions & 2 deletions integration/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,12 +476,12 @@ func SearchAndAssertTraceBackend(t *testing.T, client *httpclient.Client, info *
// by passing a time range and using a query_ingesters_until/backend_after of 0 we can force the queriers
// to look in the backend blocks
func SearchAndAsserTagsBackend(t *testing.T, client *httpclient.Client, start, end int64) {
// There are no tags in recent data
resp, err := client.SearchTags()
require.NoError(t, err)

require.Equal(t, len(resp.TagNames), 0)

// verify trace can be found using attribute and time range
// There are additional tags in the backend
resp, err = client.SearchTagsWithRange(start, end)
require.NoError(t, err)
require.True(t, len(resp.TagNames) > 0)
Expand Down
7 changes: 7 additions & 0 deletions modules/frontend/combiner/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,13 @@ func (c *genericCombiner[T]) AddResponse(r PipelineResponse) error {
return nil
}

func (c *genericCombiner[T]) AddTypedResponse(r T) error {
c.mu.Lock()
defer c.mu.Unlock()

return c.combine(r, c.current, nil)
}

// HTTPFinal, GRPCComplete, and GRPCDiff are all responsible for returning something
// usable in grpc streaming/http response.
// NOTE: returning error is reserved for unexpected errors, HTTP errors will be returned
Expand Down
5 changes: 5 additions & 0 deletions modules/frontend/combiner/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,13 @@ type Combiner interface {
HTTPFinal() (*http.Response, error)
}

type TypedCombiner[T TResponse] interface {
AddTypedResponse(r T) error
}

type GRPCCombiner[T TResponse] interface {
Combiner
TypedCombiner[T]

GRPCFinal() (T, error)
GRPCDiff() (T, error)
Expand Down
3 changes: 2 additions & 1 deletion modules/frontend/metrics_query_range_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"
"github.com/grafana/dskit/user"
"github.com/grafana/tempo/modules/overrides"
"github.com/grafana/tempo/pkg/api"
"github.com/grafana/tempo/pkg/cache"
"github.com/grafana/tempo/pkg/tempopb"
Expand Down Expand Up @@ -52,7 +53,7 @@ func TestQueryRangeHandlerSucceeds(t *testing.T) {
responseFn: func() proto.Message {
return resp
},
}, nil, nil, nil, func(c *Config) {
}, nil, nil, nil, func(c *Config, _ *overrides.Config) {
c.Metrics.Sharder.Interval = time.Hour
})

Expand Down
8 changes: 5 additions & 3 deletions modules/frontend/search_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,7 @@ func BenchmarkSearchPipeline(b *testing.B) {
// frontendWithSettings returns a new frontend with the given settings. any nil options
// are given "happy path" defaults
func frontendWithSettings(t require.TestingT, next pipeline.RoundTripper, rdr tempodb.Reader, cfg *Config, cacheProvider cache.Provider,
opts ...func(*Config),
opts ...func(*Config, *overrides.Config),
) *QueryFrontend {
if next == nil {
next = &mockRoundTripper{
Expand Down Expand Up @@ -802,11 +802,13 @@ func frontendWithSettings(t require.TestingT, next pipeline.RoundTripper, rdr te
}
}

overridesCfg := &overrides.Config{}

for _, o := range opts {
o(cfg)
o(cfg, overridesCfg)
}

o, err := overrides.NewOverrides(overrides.Config{}, nil, prometheus.DefaultRegisterer)
o, err := overrides.NewOverrides(*overridesCfg, nil, prometheus.DefaultRegisterer)
require.NoError(t, err)

f, err := New(*cfg, next, o, rdr, cacheProvider, "", log.NewNopLogger(), nil)
Expand Down
70 changes: 70 additions & 0 deletions modules/frontend/tag_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"github.com/grafana/tempo/modules/frontend/pipeline"
"github.com/grafana/tempo/modules/overrides"
"github.com/grafana/tempo/pkg/api"
"github.com/grafana/tempo/pkg/search"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/traceql"
"google.golang.org/grpc/codes"
)

Expand Down Expand Up @@ -53,6 +55,19 @@ func newTagsStreamingGRPCHandler(cfg Config, next pipeline.AsyncRoundTripper[com
return srv.Send(res)
})

// Add intrinsics first so that they aren't dropped by the response size limit
// NOTE - V1 tag lookup only returns intrinsics when scope is set explicitly.
if req.Scope == api.ParamScopeIntrinsic {
err := comb.AddTypedResponse(&tempopb.SearchTagsResponse{
TagNames: search.GetVirtualIntrinsicValues(),
})
if err != nil {
return err
}
// TODO: Exit early here, no need to issue more requests downstream, but some
// work needed to ensure things are still logged/metriced correctly.
}

start := time.Now()
logTagsRequest(logger, tenant, "SearchTagsStreaming", req.Scope, req.End-req.Start)
err = collector.RoundTrip(httpReq)
Expand Down Expand Up @@ -86,6 +101,26 @@ func newTagsV2StreamingGRPCHandler(cfg Config, next pipeline.AsyncRoundTripper[c
return srv.Send(res)
})

// Add intrinsics first so that they aren't dropped by the response size limit
// NOTE - V2 tag lookup returns intrinsics for both unscoped and explicit scope requests.
if req.Scope == "" ||
req.Scope == api.ParamScopeIntrinsic ||
req.Scope == traceql.AttributeScopeNone.String() {
err := comb.AddTypedResponse(&tempopb.SearchTagsV2Response{
Scopes: []*tempopb.SearchTagsV2Scope{
{
Name: api.ParamScopeIntrinsic,
Tags: search.GetVirtualIntrinsicValues(),
},
},
})
if err != nil {
return err
}
// TODO: For intrinsic scope only, exit early here, no need to issue more requests downstream, but some
// work needed to ensure things are still logged/metriced correctly.
}

start := time.Now()
logTagsRequest(logger, tenant, "SearchTagsV2Streaming", req.Scope, req.End-req.Start)
err = collector.RoundTrip(httpReq)
Expand Down Expand Up @@ -190,6 +225,20 @@ func newTagsHTTPHandler(cfg Config, next pipeline.AsyncRoundTripper[combiner.Pip
scope, _, rangeDur, maxTagsPerScope, staleValueThreshold := parseParams(req)
// build and use round tripper
comb := combiner.NewTypedSearchTags(o.MaxBytesPerTagValuesQuery(tenant), maxTagsPerScope, staleValueThreshold)

// Add intrinsics first so that they aren't dropped by the response size limit
// NOTE - V1 tag lookup only returns intrinsics when scope is set explicitly.
if scope == api.ParamScopeIntrinsic {
err := comb.AddTypedResponse(&tempopb.SearchTagsResponse{
TagNames: search.GetVirtualIntrinsicValues(),
})
if err != nil {
return nil, err
}
// TODO: Exit early here, no need to issue more requests downstream, but some
// work needed to ensure things are still logged/metriced correctly.
}

rt := pipeline.NewHTTPCollector(next, cfg.ResponseConsumers, comb)
start := time.Now()
logTagsRequest(logger, tenant, "SearchTags", scope, rangeDur)
Expand Down Expand Up @@ -224,6 +273,27 @@ func newTagsV2HTTPHandler(cfg Config, next pipeline.AsyncRoundTripper[combiner.P
scope, _, rangeDur, maxTagsPerScope, staleValueThreshold := parseParams(req)
// build and use round tripper
comb := combiner.NewTypedSearchTagsV2(o.MaxBytesPerTagValuesQuery(tenant), maxTagsPerScope, staleValueThreshold)

// Add intrinsics first so that they aren't dropped by the response size limit
// NOTE - V2 tag lookup returns intrinsics for both unscoped and explicit scope requests.
if scope == "" ||
scope == api.ParamScopeIntrinsic ||
scope == traceql.AttributeScopeNone.String() {
err := comb.AddTypedResponse(&tempopb.SearchTagsV2Response{
Scopes: []*tempopb.SearchTagsV2Scope{
{
Name: api.ParamScopeIntrinsic,
Tags: search.GetVirtualIntrinsicValues(),
},
},
})
if err != nil {
return nil, err
}
// TODO: For intrinsic scope only, exit early here, no need to issue more requests downstream, but some
// work needed to ensure things are still logged/metriced correctly.
}

rt := pipeline.NewHTTPCollector(next, cfg.ResponseConsumers, comb)
start := time.Now()
logTagsRequest(logger, tenant, "SearchTagsV2", scope, rangeDur)
Expand Down
98 changes: 98 additions & 0 deletions modules/frontend/tag_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/http/httptest"
"net/url"
"regexp"
"sort"
"testing"
"time"

Expand All @@ -18,7 +19,10 @@ import (
"github.com/gogo/status"
"github.com/gorilla/mux"
"github.com/grafana/dskit/user"
"github.com/grafana/tempo/modules/overrides"
"github.com/grafana/tempo/pkg/api"
"github.com/grafana/tempo/pkg/cache"
"github.com/grafana/tempo/pkg/search"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/util/test"
"github.com/grafana/tempo/tempodb/backend"
Expand Down Expand Up @@ -167,6 +171,100 @@ func runnerTagValuesV2ClientCancelContext(t *testing.T, f *QueryFrontend) {
require.Equal(t, status.Error(codes.Canceled, "context canceled"), err)
}

func TestSearchTagsV2Intrinsics(t *testing.T) {
mockScope := "span"
mockTags := []string{"foo", "bar"}

tcs := []struct {
name string
maxTagBytes int
expected *tempopb.SearchTagsV2Response
}{
{
name: "unlimited",
maxTagBytes: 0,
expected: &tempopb.SearchTagsV2Response{
Scopes: []*tempopb.SearchTagsV2Scope{
{
Name: api.ParamScopeIntrinsic,
Tags: search.GetVirtualIntrinsicValues(),
},
{
Name: mockScope,
Tags: mockTags,
},
},
},
},
{
name: "when_limited_intrinsics_first",
maxTagBytes: 100,
expected: &tempopb.SearchTagsV2Response{
Scopes: []*tempopb.SearchTagsV2Scope{
{
// Only a subset of intrinsic tags will fit
Name: api.ParamScopeIntrinsic,
Tags: search.GetVirtualIntrinsicValues()[0:10],
},
},
},
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
// This is the mocked data returned by querier/ingester jobs downstream.
next := &mockRoundTripper{
responseFn: func() proto.Message {
return &tempopb.SearchTagsV2Response{
Scopes: []*tempopb.SearchTagsV2Scope{
{
Name: mockScope,
Tags: mockTags,
},
},
}
},
}

f := frontendWithSettings(t, next, nil, nil, nil, func(_ *Config, overridesCfg *overrides.Config) {
overridesCfg.Defaults.Read.MaxBytesPerTagValuesQuery = tc.maxTagBytes
})

// http
httpReq := httptest.NewRequest("GET", "/api/v2/search/tags", nil)
httpResp := httptest.NewRecorder()

ctx, cancel := context.WithCancel(httpReq.Context())
defer cancel()
ctx = user.InjectOrgID(ctx, "tenant")
httpReq = httpReq.WithContext(ctx)

f.SearchTagsV2Handler.ServeHTTP(httpResp, httpReq)
require.Equal(t, http.StatusOK, httpResp.Code)

resp := &tempopb.SearchTagsV2Response{}
bytesResp, err := io.ReadAll(httpResp.Body)
require.NoError(t, err)
err = jsonpb.Unmarshal(bytes.NewReader(bytesResp), resp)

require.NoError(t, err)

// Sort scopes to give stable comparison
sort.Slice(tc.expected.Scopes, func(i, j int) bool {
return tc.expected.Scopes[i].Name < tc.expected.Scopes[j].Name
})
sort.Slice(resp.Scopes, func(i, j int) bool {
return resp.Scopes[i].Name < resp.Scopes[j].Name
})
require.Equal(t, len(tc.expected.Scopes), len(resp.Scopes))
for i := range tc.expected.Scopes {
require.ElementsMatch(t, tc.expected.Scopes[i].Tags, resp.Scopes[i].Tags)
}
})
}
}

// todo: a lot of code is replicated between all of these "failure propagates from queriers" tests. we should refactor
// to a framework that tests this against all endpoints
func TestSearchTagsV2FailurePropagatesFromQueriers(t *testing.T) {
Expand Down
Loading