Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
* [ENHANCEMENT] Add a configurable prefix for HTTP endpoints. [#631](https://github.com/grafana/tempo/pull/631)
* [ENHANCEMENT] Add kafka receiver. [#613](https://github.com/grafana/tempo/pull/613)
* [ENHANCEMENT] Upgrade OTel collector to `v0.21.0`. [#613](https://github.com/grafana/tempo/pull/627)
* [ENHANCEMENT] Add support for Cortex Background Cache. [#640](https://github.com/grafana/tempo/pull/640)
* [BUGFIX] Fixes permissions errors on startup in GCS. [#554](https://github.com/grafana/tempo/pull/554)
* [BUGFIX] Fixes error where Dell ECS cannot list objects. [#561](https://github.com/grafana/tempo/pull/561)
* [BUGFIX] Fixes listing blocks in S3 when the list is truncated. [#567](https://github.com/grafana/tempo/pull/567)
Expand Down
9 changes: 6 additions & 3 deletions docs/tempo/website/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,16 @@ storage:
bucket_name: ops-tools-tracing-ops # store traces in this bucket
blocklist_poll: 5m # how often to repoll the backend for new blocks
blocklist_poll_concurrency: 50 # optional. Number of blocks to process in parallel during polling. Default is 50.
cache: memcached # optional cache configuration
memcached: # optional memcached configuration
cache: memcached # optional. Cache configuration
background_cache: # optional. Background cache configuration. Requires having a cache configured.
writeback_goroutines: 10 # at what concurrency to write back to cache. Default is 10.
writeback_buffer: 10000 # how many key batches to buffer for background write-back. Default is 10000.
memcached: # optional. Memcached configuration
consistent_hash: true
host: memcached
service: memcached-client
timeout: 500ms
redis: # optional redis configuration
redis: # optional. Redis configuration
endpoint: redis
timeout: 500ms
pool: # the worker pool is used primarily when finding traces by id, but is also used by other
Expand Down
5 changes: 5 additions & 0 deletions modules/storage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"flag"
"time"

cortex_cache "github.com/cortexproject/cortex/pkg/chunk/cache"
"github.com/grafana/tempo/pkg/util"
"github.com/grafana/tempo/tempodb"
"github.com/grafana/tempo/tempodb/backend"
Expand Down Expand Up @@ -62,6 +63,10 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)
cfg.Trace.Local = &local.Config{}
f.StringVar(&cfg.Trace.Local.Path, util.PrefixConfig(prefix, "trace.local.path"), "", "path to store traces at.")

cfg.Trace.BackgroundCache = &cortex_cache.BackgroundConfig{}
cfg.Trace.BackgroundCache.WriteBackBuffer = 10000
cfg.Trace.BackgroundCache.WriteBackGoroutines = 10

cfg.Trace.Pool = &pool.Config{}
f.IntVar(&cfg.Trace.Pool.MaxWorkers, util.PrefixConfig(prefix, "trace.pool.max-workers"), 50, "Workers in the worker pool.")
f.IntVar(&cfg.Trace.Pool.QueueDepth, util.PrefixConfig(prefix, "trace.pool.queue-depth"), 200, "Work item queue depth.")
Expand Down
25 changes: 10 additions & 15 deletions tempodb/backend/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,20 @@ import (
"context"
"io"

cortex_cache "github.com/cortexproject/cortex/pkg/chunk/cache"
"github.com/google/uuid"
"github.com/grafana/tempo/tempodb/backend"
)

type readerWriter struct {
nextReader backend.Reader
nextWriter backend.Writer
client Client
cache cortex_cache.Cache
}

type Client interface {
Fetch(ctx context.Context, key string) []byte
Store(ctx context.Context, key string, val []byte)
Shutdown()
}
Comment on lines -17 to -21
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Amazing work. Have been wanting to reuse the Cortex Cache interfaces for a long time!


func NewCache(nextReader backend.Reader, nextWriter backend.Writer, client Client) (backend.Reader, backend.Writer, error) {
func NewCache(nextReader backend.Reader, nextWriter backend.Writer, cache cortex_cache.Cache) (backend.Reader, backend.Writer, error) {
rw := &readerWriter{
client: client,
cache: cache,
nextReader: nextReader,
nextWriter: nextWriter,
}
Expand All @@ -48,14 +43,14 @@ func (r *readerWriter) BlockMeta(ctx context.Context, blockID uuid.UUID, tenantI
// Read implements backend.Reader
func (r *readerWriter) Read(ctx context.Context, name string, blockID uuid.UUID, tenantID string) ([]byte, error) {
key := key(blockID, tenantID, name)
val := r.client.Fetch(ctx, key)
if val != nil {
return val, nil
found, vals, _ := r.cache.Fetch(ctx, []string{key})
if len(found) > 0 {
return vals[0], nil
}

val, err := r.nextReader.Read(ctx, name, blockID, tenantID)
if err == nil {
r.client.Store(ctx, key, val)
r.cache.Store(ctx, []string{key}, [][]byte{val})
}

return val, err
Expand All @@ -73,12 +68,12 @@ func (r *readerWriter) ReadRange(ctx context.Context, name string, blockID uuid.
// Shutdown implements backend.Reader
func (r *readerWriter) Shutdown() {
r.nextReader.Shutdown()
r.client.Shutdown()
r.cache.Stop()
}

// Write implements backend.Writer
func (r *readerWriter) Write(ctx context.Context, name string, blockID uuid.UUID, tenantID string, buffer []byte) error {
r.client.Store(ctx, key(blockID, tenantID, name), buffer)
r.cache.Store(ctx, []string{key(blockID, tenantID, name)}, [][]byte{buffer})

return r.nextWriter.Write(ctx, name, blockID, tenantID, buffer)
}
Expand Down
20 changes: 12 additions & 8 deletions tempodb/backend/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"testing"

cortex_cache "github.com/cortexproject/cortex/pkg/chunk/cache"
"github.com/google/uuid"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/backend/util"
Expand All @@ -14,23 +15,26 @@ type mockClient struct {
client map[string][]byte
}

func (m *mockClient) Store(_ context.Context, key string, val []byte) {
m.client[key] = val
func (m *mockClient) Store(_ context.Context, key []string, val [][]byte) {
m.client[key[0]] = val[0]
}

func (m *mockClient) Fetch(_ context.Context, key string) (val []byte) {
val, ok := m.client[key]
func (m *mockClient) Fetch(_ context.Context, key []string) (found []string, bufs [][]byte, missing []string) {
val, ok := m.client[key[0]]
if ok {
return val
found = append(found, key[0])
bufs = append(bufs, val)
} else {
missing = append(missing, key[0])
}
return nil
return
}

func (m *mockClient) Shutdown() {
func (m *mockClient) Stop() {
}

// NewMockClient makes a new mockClient.
func NewMockClient() Client {
func NewMockClient() cortex_cache.Cache {
return &mockClient{
client: map[string][]byte{},
}
Expand Down
31 changes: 3 additions & 28 deletions tempodb/backend/cache/memcached/memcached.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
package memcached

import (
"context"
"time"

"github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus"

cortex_cache "github.com/cortexproject/cortex/pkg/chunk/cache"
"github.com/grafana/tempo/tempodb/backend/cache"
)

type Config struct {
Expand All @@ -17,11 +15,7 @@ type Config struct {
TTL time.Duration `yaml:"ttl"`
}

type Client struct {
client *cortex_cache.Memcached
}

func NewClient(cfg *Config, logger log.Logger) cache.Client {
func NewClient(cfg *Config, cfgBackground *cortex_cache.BackgroundConfig, logger log.Logger) cortex_cache.Cache {
if cfg.ClientConfig.MaxIdleConns == 0 {
cfg.ClientConfig.MaxIdleConns = 16
}
Expand All @@ -38,26 +32,7 @@ func NewClient(cfg *Config, logger log.Logger) cache.Client {
BatchSize: 0, // we are currently only requesting one key at a time, which is bad. we could restructure Find() to batch request all blooms at once
Parallelism: 0,
}
return &Client{
client: cortex_cache.NewMemcached(memcachedCfg, client, "tempo", prometheus.DefaultRegisterer, logger),
}
}

// Store implements cache.Store
func (m *Client) Store(ctx context.Context, key string, val []byte) {
m.client.Store(ctx, []string{key}, [][]byte{val})
}

// Fetch implements cache.Fetch
func (m *Client) Fetch(ctx context.Context, key string) []byte {
found, vals, _ := m.client.Fetch(ctx, []string{key})
if len(found) > 0 {
return vals[0]
}
return nil
}
cache := cortex_cache.NewMemcached(memcachedCfg, client, "tempo", prometheus.DefaultRegisterer, logger)

// Shutdown implements cache.Shutdown
func (m *Client) Shutdown() {
m.client.Stop()
return cortex_cache.NewBackground("tempo", *cfgBackground, cache, prometheus.DefaultRegisterer)
}
32 changes: 4 additions & 28 deletions tempodb/backend/cache/redis/redis.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package redis

import (
"context"
"time"

"github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus"

cortex_cache "github.com/cortexproject/cortex/pkg/chunk/cache"
"github.com/grafana/tempo/tempodb/backend/cache"
)

type Config struct {
Expand All @@ -16,11 +15,7 @@ type Config struct {
TTL time.Duration `yaml:"ttl"`
}

type Client struct {
client *cortex_cache.RedisCache
}

func NewClient(cfg *Config, logger log.Logger) cache.Client {
func NewClient(cfg *Config, cfgBackground *cortex_cache.BackgroundConfig, logger log.Logger) cortex_cache.Cache {
if cfg.ClientConfig.Timeout == 0 {
cfg.ClientConfig.Timeout = 100 * time.Millisecond
}
Expand All @@ -29,26 +24,7 @@ func NewClient(cfg *Config, logger log.Logger) cache.Client {
}

client := cortex_cache.NewRedisClient(&cfg.ClientConfig)
return &Client{
client: cortex_cache.NewRedisCache("tempo", client, logger),
}
}

// Store implements cache.Store
func (r *Client) Store(ctx context.Context, key string, val []byte) {
r.client.Store(ctx, []string{key}, [][]byte{val})
}

// Fetch implements cache.Fetch
func (r *Client) Fetch(ctx context.Context, key string) []byte {
found, vals, _ := r.client.Fetch(ctx, []string{key})
if len(found) > 0 {
return vals[0]
}
return nil
}
cache := cortex_cache.NewRedisCache("tempo", client, logger)

// Shutdown implements cache.Shutdown
func (r *Client) Shutdown() {
r.client.Stop()
return cortex_cache.NewBackground("tempo", *cfgBackground, cache, prometheus.DefaultRegisterer)
}
8 changes: 5 additions & 3 deletions tempodb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"time"

cortex_cache "github.com/cortexproject/cortex/pkg/chunk/cache"
"github.com/grafana/tempo/tempodb/backend/azure"
"github.com/grafana/tempo/tempodb/backend/cache/memcached"
"github.com/grafana/tempo/tempodb/backend/cache/redis"
Expand Down Expand Up @@ -36,9 +37,10 @@ type Config struct {
Azure *azure.Config `yaml:"azure"`

// caches
Cache string `yaml:"cache"`
Memcached *memcached.Config `yaml:"memcached"`
Redis *redis.Config `yaml:"redis"`
Cache string `yaml:"cache"`
BackgroundCache *cortex_cache.BackgroundConfig `yaml:"background_cache"`
Memcached *memcached.Config `yaml:"memcached"`
Redis *redis.Config `yaml:"redis"`
}

// CompactorConfig contains compaction configuration options
Expand Down
12 changes: 6 additions & 6 deletions tempodb/tempodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

cortex_cache "github.com/cortexproject/cortex/pkg/chunk/cache"
log_util "github.com/cortexproject/cortex/pkg/util/log"
"github.com/opentracing/opentracing-go"
ot_log "github.com/opentracing/opentracing-go/log"

"github.com/grafana/tempo/pkg/boundedwaitgroup"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/backend/azure"
Expand All @@ -34,6 +32,8 @@ import (
"github.com/grafana/tempo/tempodb/encoding/common"
"github.com/grafana/tempo/tempodb/pool"
"github.com/grafana/tempo/tempodb/wal"
"github.com/opentracing/opentracing-go"
ot_log "github.com/opentracing/opentracing-go/log"
)

const (
Expand Down Expand Up @@ -159,13 +159,13 @@ func New(cfg *Config, logger log.Logger) (Reader, Writer, Compactor, error) {
return nil, nil, nil, err
}

var cacheBackend cache.Client
var cacheBackend cortex_cache.Cache

switch cfg.Cache {
case "redis":
cacheBackend = redis.NewClient(cfg.Redis, logger)
cacheBackend = redis.NewClient(cfg.Redis, cfg.BackgroundCache, logger)
case "memcached":
cacheBackend = memcached.NewClient(cfg.Memcached, logger)
cacheBackend = memcached.NewClient(cfg.Memcached, cfg.BackgroundCache, logger)
}

if cacheBackend != nil {
Expand Down