Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
* [ENHANCEMENT] Add a configurable prefix for HTTP endpoints. [#631](https://github.com/grafana/tempo/pull/631)
* [ENHANCEMENT] Add kafka receiver. [#613](https://github.com/grafana/tempo/pull/613)
* [ENHANCEMENT] Upgrade OTel collector to `v0.21.0`. [#613](https://github.com/grafana/tempo/pull/627)
* [ENHANCEMENT] Add support for Cortex Background Cache. [#640](https://github.com/grafana/tempo/pull/640)
* [BUGFIX] Fixes permissions errors on startup in GCS. [#554](https://github.com/grafana/tempo/pull/554)
* [BUGFIX] Fixes error where Dell ECS cannot list objects. [#561](https://github.com/grafana/tempo/pull/561)
* [BUGFIX] Fixes listing blocks in S3 when the list is truncated. [#567](https://github.com/grafana/tempo/pull/567)
Expand Down
9 changes: 6 additions & 3 deletions docs/tempo/website/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,16 @@ storage:
bucket_name: ops-tools-tracing-ops # store traces in this bucket
blocklist_poll: 5m # how often to repoll the backend for new blocks
blocklist_poll_concurrency: 50 # optional. Number of blocks to process in parallel during polling. Default is 50.
cache: memcached # optional cache configuration
memcached: # optional memcached configuration
cache: memcached # optional. Cache configuration
background_cache: # optional. Background cache configuration
Comment thread
annanay25 marked this conversation as resolved.
Outdated
writeback_goroutines: 10 # at what concurrency to write back to cache. Default is 10.
writeback_buffer: 10000 # how many key batches to buffer for background write-back. Default is 10000.
memcached: # optional. Memcached configuration
consistent_hash: true
host: memcached
service: memcached-client
timeout: 500ms
redis: # optional redis configuration
redis: # optional. Redis configuration
endpoint: redis
timeout: 500ms
pool: # the worker pool is used primarily when finding traces by id, but is also used by other
Expand Down
5 changes: 5 additions & 0 deletions modules/storage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"flag"
"time"

cortex_cache "github.com/cortexproject/cortex/pkg/chunk/cache"
"github.com/grafana/tempo/pkg/util"
"github.com/grafana/tempo/tempodb"
"github.com/grafana/tempo/tempodb/backend"
Expand Down Expand Up @@ -62,6 +63,10 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)
cfg.Trace.Local = &local.Config{}
f.StringVar(&cfg.Trace.Local.Path, util.PrefixConfig(prefix, "trace.local.path"), "", "path to store traces at.")

cfg.Trace.BackgroundCache = &cortex_cache.BackgroundConfig{}
f.IntVar(&cfg.Trace.BackgroundCache.WriteBackBuffer, util.PrefixConfig(prefix, "trace.background_cache.writeback_buffer"), 10000, "Key batches to buffer for background write-back.")
f.IntVar(&cfg.Trace.BackgroundCache.WriteBackGoroutines, util.PrefixConfig(prefix, "trace.background_cache.writeback_goroutines"), 10, "Concurrency to write back to cache.")
Comment thread
dgzlopes marked this conversation as resolved.
Outdated

cfg.Trace.Pool = &pool.Config{}
f.IntVar(&cfg.Trace.Pool.MaxWorkers, util.PrefixConfig(prefix, "trace.pool.max-workers"), 50, "Workers in the worker pool.")
f.IntVar(&cfg.Trace.Pool.QueueDepth, util.PrefixConfig(prefix, "trace.pool.queue-depth"), 200, "Work item queue depth.")
Expand Down
25 changes: 10 additions & 15 deletions tempodb/backend/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,20 @@ import (
"context"
"io"

cortex_cache "github.com/cortexproject/cortex/pkg/chunk/cache"
"github.com/google/uuid"
"github.com/grafana/tempo/tempodb/backend"
)

type readerWriter struct {
nextReader backend.Reader
nextWriter backend.Writer
client Client
cache cortex_cache.Cache
}

type Client interface {
Fetch(ctx context.Context, key string) []byte
Store(ctx context.Context, key string, val []byte)
Shutdown()
}
Comment on lines -17 to -21
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Amazing work. Have been wanting to reuse the Cortex Cache interfaces for a long time!


func NewCache(nextReader backend.Reader, nextWriter backend.Writer, client Client) (backend.Reader, backend.Writer, error) {
func NewCache(nextReader backend.Reader, nextWriter backend.Writer, cache cortex_cache.Cache) (backend.Reader, backend.Writer, error) {
rw := &readerWriter{
client: client,
cache: cache,
nextReader: nextReader,
nextWriter: nextWriter,
}
Expand All @@ -48,14 +43,14 @@ func (r *readerWriter) BlockMeta(ctx context.Context, blockID uuid.UUID, tenantI
// Read implements backend.Reader
func (r *readerWriter) Read(ctx context.Context, name string, blockID uuid.UUID, tenantID string) ([]byte, error) {
key := key(blockID, tenantID, name)
val := r.client.Fetch(ctx, key)
if val != nil {
return val, nil
found, vals, _ := r.cache.Fetch(ctx, []string{key})
if len(found) > 0 {
return vals[0], nil
}

val, err := r.nextReader.Read(ctx, name, blockID, tenantID)
if err == nil {
r.client.Store(ctx, key, val)
r.cache.Store(ctx, []string{key}, [][]byte{val})
}

return val, err
Expand All @@ -73,12 +68,12 @@ func (r *readerWriter) ReadRange(ctx context.Context, name string, blockID uuid.
// Shutdown implements backend.Reader
func (r *readerWriter) Shutdown() {
r.nextReader.Shutdown()
r.client.Shutdown()
r.cache.Stop()
}

// Write implements backend.Writer
func (r *readerWriter) Write(ctx context.Context, name string, blockID uuid.UUID, tenantID string, buffer []byte) error {
r.client.Store(ctx, key(blockID, tenantID, name), buffer)
r.cache.Store(ctx, []string{key(blockID, tenantID, name)}, [][]byte{buffer})

return r.nextWriter.Write(ctx, name, blockID, tenantID, buffer)
}
Expand Down
20 changes: 12 additions & 8 deletions tempodb/backend/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"testing"

cortex_cache "github.com/cortexproject/cortex/pkg/chunk/cache"
"github.com/google/uuid"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/backend/util"
Expand All @@ -14,23 +15,26 @@ type mockClient struct {
client map[string][]byte
}

func (m *mockClient) Store(_ context.Context, key string, val []byte) {
m.client[key] = val
func (m *mockClient) Store(_ context.Context, key []string, val [][]byte) {
m.client[key[0]] = val[0]
}

func (m *mockClient) Fetch(_ context.Context, key string) (val []byte) {
val, ok := m.client[key]
func (m *mockClient) Fetch(_ context.Context, key []string) (found []string, bufs [][]byte, missing []string) {
val, ok := m.client[key[0]]
if ok {
return val
found = append(found, key[0])
bufs = append(bufs, val)
} else {
missing = append(missing, key[0])
}
return nil
return
}

func (m *mockClient) Shutdown() {
func (m *mockClient) Stop() {
}

// NewMockClient makes a new mockClient.
func NewMockClient() Client {
func NewMockClient() cortex_cache.Cache {
return &mockClient{
client: map[string][]byte{},
}
Expand Down
31 changes: 3 additions & 28 deletions tempodb/backend/cache/memcached/memcached.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
package memcached

import (
"context"
"time"

"github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus"

cortex_cache "github.com/cortexproject/cortex/pkg/chunk/cache"
"github.com/grafana/tempo/tempodb/backend/cache"
)

type Config struct {
Expand All @@ -17,11 +15,7 @@ type Config struct {
TTL time.Duration `yaml:"ttl"`
}

type Client struct {
client *cortex_cache.Memcached
}

func NewClient(cfg *Config, logger log.Logger) cache.Client {
func NewClient(cfg *Config, cfgBackground *cortex_cache.BackgroundConfig, logger log.Logger) cortex_cache.Cache {
if cfg.ClientConfig.MaxIdleConns == 0 {
cfg.ClientConfig.MaxIdleConns = 16
}
Expand All @@ -38,26 +32,7 @@ func NewClient(cfg *Config, logger log.Logger) cache.Client {
BatchSize: 0, // we are currently only requesting one key at a time, which is bad. we could restructure Find() to batch request all blooms at once
Parallelism: 0,
}
return &Client{
client: cortex_cache.NewMemcached(memcachedCfg, client, "tempo", prometheus.DefaultRegisterer, logger),
}
}

// Store implements cache.Store
func (m *Client) Store(ctx context.Context, key string, val []byte) {
m.client.Store(ctx, []string{key}, [][]byte{val})
}

// Fetch implements cache.Fetch
func (m *Client) Fetch(ctx context.Context, key string) []byte {
found, vals, _ := m.client.Fetch(ctx, []string{key})
if len(found) > 0 {
return vals[0]
}
return nil
}
cache := cortex_cache.NewMemcached(memcachedCfg, client, "tempo", prometheus.DefaultRegisterer, logger)

// Shutdown implements cache.Shutdown
func (m *Client) Shutdown() {
m.client.Stop()
return cortex_cache.NewBackground("tempo", *cfgBackground, cache, prometheus.DefaultRegisterer)
}
32 changes: 4 additions & 28 deletions tempodb/backend/cache/redis/redis.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package redis

import (
"context"
"time"

"github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus"

cortex_cache "github.com/cortexproject/cortex/pkg/chunk/cache"
"github.com/grafana/tempo/tempodb/backend/cache"
)

type Config struct {
Expand All @@ -16,11 +15,7 @@ type Config struct {
TTL time.Duration `yaml:"ttl"`
}

type Client struct {
client *cortex_cache.RedisCache
}

func NewClient(cfg *Config, logger log.Logger) cache.Client {
func NewClient(cfg *Config, cfgBackground *cortex_cache.BackgroundConfig, logger log.Logger) cortex_cache.Cache {
if cfg.ClientConfig.Timeout == 0 {
cfg.ClientConfig.Timeout = 100 * time.Millisecond
}
Expand All @@ -29,26 +24,7 @@ func NewClient(cfg *Config, logger log.Logger) cache.Client {
}

client := cortex_cache.NewRedisClient(&cfg.ClientConfig)
return &Client{
client: cortex_cache.NewRedisCache("tempo", client, logger),
}
}

// Store implements cache.Store
func (r *Client) Store(ctx context.Context, key string, val []byte) {
r.client.Store(ctx, []string{key}, [][]byte{val})
}

// Fetch implements cache.Fetch
func (r *Client) Fetch(ctx context.Context, key string) []byte {
found, vals, _ := r.client.Fetch(ctx, []string{key})
if len(found) > 0 {
return vals[0]
}
return nil
}
cache := cortex_cache.NewRedisCache("tempo", client, logger)

// Shutdown implements cache.Shutdown
func (r *Client) Shutdown() {
r.client.Stop()
return cortex_cache.NewBackground("tempo", *cfgBackground, cache, prometheus.DefaultRegisterer)
}
8 changes: 5 additions & 3 deletions tempodb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"time"

cortex_cache "github.com/cortexproject/cortex/pkg/chunk/cache"
"github.com/grafana/tempo/tempodb/backend/azure"
"github.com/grafana/tempo/tempodb/backend/cache/memcached"
"github.com/grafana/tempo/tempodb/backend/cache/redis"
Expand Down Expand Up @@ -36,9 +37,10 @@ type Config struct {
Azure *azure.Config `yaml:"azure"`

// caches
Cache string `yaml:"cache"`
Memcached *memcached.Config `yaml:"memcached"`
Redis *redis.Config `yaml:"redis"`
Cache string `yaml:"cache"`
BackgroundCache *cortex_cache.BackgroundConfig `yaml:"background_cache"`
Memcached *memcached.Config `yaml:"memcached"`
Redis *redis.Config `yaml:"redis"`
}

// CompactorConfig contains compaction configuration options
Expand Down
12 changes: 6 additions & 6 deletions tempodb/tempodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

cortex_cache "github.com/cortexproject/cortex/pkg/chunk/cache"
log_util "github.com/cortexproject/cortex/pkg/util/log"
"github.com/opentracing/opentracing-go"
ot_log "github.com/opentracing/opentracing-go/log"

"github.com/grafana/tempo/pkg/boundedwaitgroup"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/backend/azure"
Expand All @@ -34,6 +32,8 @@ import (
"github.com/grafana/tempo/tempodb/encoding/common"
"github.com/grafana/tempo/tempodb/pool"
"github.com/grafana/tempo/tempodb/wal"
"github.com/opentracing/opentracing-go"
ot_log "github.com/opentracing/opentracing-go/log"
)

const (
Expand Down Expand Up @@ -159,13 +159,13 @@ func New(cfg *Config, logger log.Logger) (Reader, Writer, Compactor, error) {
return nil, nil, nil, err
}

var cacheBackend cache.Client
var cacheBackend cortex_cache.Cache

switch cfg.Cache {
case "redis":
cacheBackend = redis.NewClient(cfg.Redis, logger)
cacheBackend = redis.NewClient(cfg.Redis, cfg.BackgroundCache, logger)
case "memcached":
cacheBackend = memcached.NewClient(cfg.Memcached, logger)
cacheBackend = memcached.NewClient(cfg.Memcached, cfg.BackgroundCache, logger)
}

if cacheBackend != nil {
Expand Down