Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
* [ENHANCEMENT] Update poller to make use of previous results and reduce backend load. [#2652](https://github.com/grafana/tempo/pull/2652) (@zalegrala)
* [ENHANCEMENT] Improve TraceQL regex performance in certain queries. [#3139](https://github.com/grafana/tempo/pull/3139) (@joe-elliott)
* [ENHANCEMENT] Improve TraceQL performance in complex queries. [#3113](https://github.com/grafana/tempo/pull/3113) (@joe-elliott)
* [ENHANCEMENT] Added a `frontend-search` cache role for job search caching. [#3225](https://github.com/grafana/tempo/pull/3225) (@joe-elliott)
* [BUGFIX] Prevent building parquet iterators that would loop forever. [#3159](https://github.com/grafana/tempo/pull/3159) (@mapno)
* [BUGFIX] Sanitize name in mapped dimensions in span-metrics processor [#3171](https://github.com/grafana/tempo/pull/3171) (@mapno)

Expand Down
2 changes: 1 addition & 1 deletion cmd/tempo/app/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ func (t *App) initQueryFrontend() (services.Service, error) {
t.frontend = v1

// create query frontend
queryFrontend, err := frontend.New(t.cfg.Frontend, cortexTripper, t.Overrides, t.store, t.cfg.HTTPAPIPrefix, log.Logger, prometheus.DefaultRegisterer)
queryFrontend, err := frontend.New(t.cfg.Frontend, cortexTripper, t.Overrides, t.store, t.cacheProvider, t.cfg.HTTPAPIPrefix, log.Logger, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions docs/sources/tempo/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1502,6 +1502,7 @@ cache:
# parquet-footer - Parquet footer values. Useful for search and trace by id lookup.
# parquet-column-idx - Parquet column index values. Useful for search and trace by id lookup.
# parquet-offset-idx - Parquet offset index values. Useful for search and trace by id lookup.
# frontend-search - Frontend search job results.

- roles:
- <role1>
Expand Down
1 change: 1 addition & 0 deletions modules/cache/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func allRoles() map[cache.Role]struct{} {
cache.RoleParquetColumnIdx,
cache.RoleParquetOffsetIdx,
cache.RoleTraceIDIdx,
cache.RoleFrontendSearch,
}

roles := map[cache.Role]struct{}{}
Expand Down
69 changes: 69 additions & 0 deletions modules/frontend/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package frontend

import (
"bytes"
"context"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gogo/protobuf/jsonpb"
"github.com/gogo/protobuf/proto"
"github.com/grafana/tempo/pkg/cache"
)

type frontendCache struct {
c cache.Cache
}

func newFrontendCache(cacheProvider cache.Provider, role cache.Role, logger log.Logger) *frontendCache {
var c cache.Cache
if cacheProvider != nil {
c = cacheProvider.CacheFor(role)
}

level.Info(logger).Log("msg", "init frontend cache", "role", role, "enabled", c != nil)

return &frontendCache{
c: c,
}
}

// store stores the response body in the cache. the caller assumes the responsibility of closing the response body
func (c *frontendCache) store(ctx context.Context, key string, buffer []byte) {
if c.c == nil {
return
}

if key == "" {
return
}

if len(buffer) == 0 {
return
}

c.c.Store(ctx, []string{key}, [][]byte{buffer})
}

// fetch fetches the response body from the cache. the caller assumes the responsibility of closing the response body.
func (c *frontendCache) fetch(key string, pb proto.Message) bool {
if c.c == nil {
return false
}

if len(key) == 0 {
return false
}

_, bufs, _ := c.c.Fetch(context.Background(), []string{key})
if len(bufs) != 1 {
return false
}

err := (&jsonpb.Unmarshaler{AllowUnknownFields: true}).Unmarshal(bytes.NewReader(bufs[0]), pb)
if err != nil {
return false
}

return true
}
60 changes: 60 additions & 0 deletions modules/frontend/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package frontend

import (
"bytes"
"context"
"io"
"net/http/httptest"
"testing"

"github.com/go-kit/log"
"github.com/gogo/protobuf/jsonpb"
"github.com/grafana/tempo/pkg/cache"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/util/test"
"github.com/stretchr/testify/require"
)

func TestNilProvider(t *testing.T) {
testKey := "key"

c := newFrontendCache(nil, cache.RoleBloom, log.NewNopLogger())
require.NotNil(t, c)

rec := httptest.NewRecorder()

bodyBuffer, err := io.ReadAll(rec.Result().Body)
require.NoError(t, err)

c.store(context.Background(), testKey, bodyBuffer)
found := c.fetch(testKey, &tempopb.SearchResponse{})

require.False(t, found)
}

func TestCacheCaches(t *testing.T) {
expected := &tempopb.SearchTagsResponse{
TagNames: []string{"foo", "bar"},
}

// marshal mesage to bytes
buf := bytes.NewBuffer([]byte{})
err := (&jsonpb.Marshaler{}).Marshal(buf, expected)
require.NoError(t, err)

testKey := "key"
testData := buf.Bytes()

p := test.NewMockProvider()
c := newFrontendCache(p, cache.RoleBloom, log.NewNopLogger())
require.NotNil(t, c)

// create response
c.store(context.Background(), testKey, testData)

actual := &tempopb.SearchTagsResponse{}
found := c.fetch(testKey, actual)

require.True(t, found)
require.Equal(t, expected, actual)
}
21 changes: 14 additions & 7 deletions modules/frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/grafana/tempo/modules/overrides"
"github.com/grafana/tempo/pkg/api"
"github.com/grafana/tempo/pkg/cache"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/tempodb"
)
Expand All @@ -27,12 +28,13 @@ type streamingSearchHandler func(req *tempopb.SearchRequest, srv tempopb.Streami

type QueryFrontend struct {
TraceByIDHandler, SearchHandler, SearchTagsHandler, SpanMetricsSummaryHandler, SearchWSHandler http.Handler
cacheProvider cache.Provider
streamingSearch streamingSearchHandler
logger log.Logger
}

// New returns a new QueryFrontend
func New(cfg Config, next http.RoundTripper, o overrides.Interface, reader tempodb.Reader, apiPrefix string, logger log.Logger, registerer prometheus.Registerer) (*QueryFrontend, error) {
func New(cfg Config, next http.RoundTripper, o overrides.Interface, reader tempodb.Reader, cacheProvider cache.Provider, apiPrefix string, logger log.Logger, registerer prometheus.Registerer) (*QueryFrontend, error) {
level.Info(logger).Log("msg", "creating middleware in query frontend")

if cfg.TraceByID.QueryShards < minQueryShards || cfg.TraceByID.QueryShards > maxQueryShards {
Expand All @@ -53,9 +55,12 @@ func New(cfg Config, next http.RoundTripper, o overrides.Interface, reader tempo

retryWare := newRetryWare(cfg.MaxRetries, registerer)

// tracebyid middleware
// cache
searchCache := newFrontendCache(cacheProvider, cache.RoleFrontendSearch, logger)

// middleware
traceByIDMiddleware := MergeMiddlewares(newTraceByIDMiddleware(cfg, o, logger), retryWare)
searchMiddleware := MergeMiddlewares(newSearchMiddleware(cfg, o, reader, logger), retryWare)
searchMiddleware := MergeMiddlewares(newSearchMiddleware(cfg, o, reader, searchCache, logger), retryWare)
searchTagsMiddleware := MergeMiddlewares(newSearchTagsMiddleware(), retryWare)

spanMetricsMiddleware := MergeMiddlewares(newSpanMetricsMiddleware(), retryWare)
Expand All @@ -70,8 +75,9 @@ func New(cfg Config, next http.RoundTripper, o overrides.Interface, reader tempo
SearchHandler: newHandler(search, searchSLOPostHook(cfg.Search.SLO), searchSLOPreHook, logger),
SearchTagsHandler: newHandler(searchTags, nil, nil, logger),
SpanMetricsSummaryHandler: newHandler(metrics, nil, nil, logger),
SearchWSHandler: newSearchStreamingWSHandler(cfg, o, retryWare.Wrap(next), reader, apiPrefix, logger),
streamingSearch: newSearchStreamingGRPCHandler(cfg, o, retryWare.Wrap(next), reader, apiPrefix, logger),
SearchWSHandler: newSearchStreamingWSHandler(cfg, o, retryWare.Wrap(next), reader, searchCache, apiPrefix, logger),
cacheProvider: cacheProvider,
streamingSearch: newSearchStreamingGRPCHandler(cfg, o, retryWare.Wrap(next), reader, searchCache, apiPrefix, logger),
logger: logger,
}, nil
}
Expand Down Expand Up @@ -170,9 +176,10 @@ func newTraceByIDMiddleware(cfg Config, o overrides.Interface, logger log.Logger
}

// newSearchMiddleware creates a new frontend middleware to handle search and search tags requests.
func newSearchMiddleware(cfg Config, o overrides.Interface, reader tempodb.Reader, logger log.Logger) Middleware {
func newSearchMiddleware(cfg Config, o overrides.Interface, reader tempodb.Reader, c *frontendCache, logger log.Logger) Middleware {
return MiddlewareFunc(func(next http.RoundTripper) http.RoundTripper {
searchRT := NewRoundTripper(next, newSearchSharder(reader, o, cfg.Search.Sharder, newSearchProgress, logger))
ss := newSearchSharder(reader, o, cfg.Search.Sharder, newSearchProgress, c, logger)
searchRT := NewRoundTripper(next, ss)

return RoundTripperFunc(func(r *http.Request) (*http.Response, error) {
// backend search queries require sharding, so we pass through a special roundtripper
Expand Down
12 changes: 6 additions & 6 deletions modules/frontend/frontend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestFrontendRoundTripsSearch(t *testing.T) {
},
SLO: testSLOcfg,
},
}, next, nil, nil, "", log.NewNopLogger(), nil)
}, next, nil, nil, nil, "", log.NewNopLogger(), nil)
require.NoError(t, err)

req := httptest.NewRequest("GET", "/", nil)
Expand Down Expand Up @@ -69,7 +69,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
},
SLO: testSLOcfg,
},
}, nil, nil, nil, "", log.NewNopLogger(), nil)
}, nil, nil, nil, nil, "", log.NewNopLogger(), nil)
assert.EqualError(t, err, "frontend query shards should be between 2 and 100000 (both inclusive)")

assert.Nil(t, f)
Expand All @@ -86,7 +86,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
},
SLO: testSLOcfg,
},
}, nil, nil, nil, "", log.NewNopLogger(), nil)
}, nil, nil, nil, nil, "", log.NewNopLogger(), nil)
assert.EqualError(t, err, "frontend query shards should be between 2 and 100000 (both inclusive)")
assert.Nil(t, f)

Expand All @@ -102,7 +102,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
},
SLO: testSLOcfg,
},
}, nil, nil, nil, "", log.NewNopLogger(), nil)
}, nil, nil, nil, nil, "", log.NewNopLogger(), nil)
assert.EqualError(t, err, "frontend search concurrent requests should be greater than 0")
assert.Nil(t, f)

Expand All @@ -118,7 +118,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
},
SLO: testSLOcfg,
},
}, nil, nil, nil, "", log.NewNopLogger(), nil)
}, nil, nil, nil, nil, "", log.NewNopLogger(), nil)
assert.EqualError(t, err, "frontend search target bytes per request should be greater than 0")
assert.Nil(t, f)

Expand All @@ -136,7 +136,7 @@ func TestFrontendBadConfigFails(t *testing.T) {
},
SLO: testSLOcfg,
},
}, nil, nil, nil, "", log.NewNopLogger(), nil)
}, nil, nil, nil, nil, "", log.NewNopLogger(), nil)
assert.EqualError(t, err, "query backend after should be less than or equal to query ingester until")
assert.Nil(t, f)
}
10 changes: 7 additions & 3 deletions modules/frontend/search_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,14 @@ func (p *diffSearchProgress) finalResult() *shardedSearchResults {
}

// newSearchStreamingGRPCHandler returns a handler that streams results from the HTTP handler
func newSearchStreamingGRPCHandler(cfg Config, o overrides.Interface, downstream http.RoundTripper, reader tempodb.Reader, apiPrefix string, logger log.Logger) streamingSearchHandler {
func newSearchStreamingGRPCHandler(cfg Config, o overrides.Interface, downstream http.RoundTripper, reader tempodb.Reader, searchCache *frontendCache, apiPrefix string, logger log.Logger) streamingSearchHandler {
searcher := streamingSearcher{
logger: logger,
downstream: downstream,
reader: reader,
postSLOHook: searchSLOPostHook(cfg.Search.SLO),
o: o,
searchCache: searchCache,
cfg: &cfg,
}

Expand All @@ -129,13 +130,14 @@ func newSearchStreamingGRPCHandler(cfg Config, o overrides.Interface, downstream
}
}

func newSearchStreamingWSHandler(cfg Config, o overrides.Interface, downstream http.RoundTripper, reader tempodb.Reader, apiPrefix string, logger log.Logger) http.Handler {
func newSearchStreamingWSHandler(cfg Config, o overrides.Interface, downstream http.RoundTripper, reader tempodb.Reader, searchCache *frontendCache, apiPrefix string, logger log.Logger) http.Handler {
searcher := streamingSearcher{
logger: logger,
downstream: downstream,
reader: reader,
postSLOHook: searchSLOPostHook(cfg.Search.SLO),
o: o,
searchCache: searchCache,
cfg: &cfg,
}

Expand Down Expand Up @@ -231,6 +233,7 @@ type streamingSearcher struct {
reader tempodb.Reader
postSLOHook handlerPostHook
o overrides.Interface
searchCache *frontendCache
cfg *Config
}

Expand All @@ -254,7 +257,8 @@ func (s *streamingSearcher) handle(r *http.Request, forwardResults func(*tempopb
return p
}
// build roundtripper
rt := NewRoundTripper(s.downstream, newSearchSharder(s.reader, s.o, s.cfg.Search.Sharder, fn, s.logger))
ss := newSearchSharder(s.reader, s.o, s.cfg.Search.Sharder, fn, s.searchCache, s.logger)
rt := NewRoundTripper(s.downstream, ss)

type roundTripResult struct {
resp *http.Response
Expand Down
2 changes: 1 addition & 1 deletion modules/frontend/search_streaming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func testHandler(t *testing.T, next http.RoundTripper) streamingSearchHandler {
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000000"),
},
},
}, "", log.NewNopLogger())
}, &frontendCache{}, "", log.NewNopLogger())

return handler
}
Expand Down
Loading