Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion modules/blockbuilder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (c *Config) Validate() error {
return fmt.Errorf("block config validation failed: %w", err)
}

if err := wal.ValidateConfig(&c.WAL); err != nil {
if err := c.WAL.Validate(); err != nil {
return fmt.Errorf("wal config validation failed: %w", err)
}

Expand Down
25 changes: 20 additions & 5 deletions modules/generator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ const (

// Config for a generator.
type Config struct {
Ring RingConfig `yaml:"ring"`
Processor ProcessorConfig `yaml:"processor"`
Registry registry.Config `yaml:"registry"`
Storage storage.Config `yaml:"storage"`
TracesWAL wal.Config `yaml:"traces_storage"`
Ring RingConfig `yaml:"ring"`
Processor ProcessorConfig `yaml:"processor"`
Registry registry.Config `yaml:"registry"`
Storage storage.Config `yaml:"storage"`
TracesWAL wal.Config `yaml:"traces_storage"`
TracesQueryWAL wal.Config `yaml:"traces_query_storage"`
// MetricsIngestionSlack is the max amount of time passed since a span's end time
// for the span to be considered in metrics generation
MetricsIngestionSlack time.Duration `yaml:"metrics_ingestion_time_range_slack"`
Expand All @@ -54,6 +55,8 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)
cfg.Storage.RegisterFlagsAndApplyDefaults(prefix, f)
cfg.TracesWAL.RegisterFlags(f)
cfg.TracesWAL.Version = encoding.DefaultEncoding().Version()
cfg.TracesQueryWAL.RegisterFlags(f)
cfg.TracesQueryWAL.Version = encoding.DefaultEncoding().Version()

// setting default for max span age before discarding to 30s
cfg.MetricsIngestionSlack = 30 * time.Second
Expand All @@ -76,6 +79,18 @@ func (cfg *Config) Validate() error {
}
}

// Only validate if being used
if cfg.TracesWAL.Filepath != "" {
if err := cfg.TracesWAL.Validate(); err != nil {
return err
}
}
if cfg.TracesQueryWAL.Filepath != "" {
if err := cfg.TracesQueryWAL.Validate(); err != nil {
return err
}
}

return nil
}

Expand Down
23 changes: 16 additions & 7 deletions modules/generator/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (g *Generator) stopping(_ error) error {
// Mark as read-only after we have removed ourselves from the ring
g.stopIncomingRequests()

// Stop reading from queue and wait for oustanding data to be processed and committed
// Stop reading from queue and wait for outstanding data to be processed and committed
if g.cfg.Ingest.Enabled {
g.stopKafka()
}
Expand Down Expand Up @@ -319,22 +319,31 @@ func (g *Generator) createInstance(id string) (*instance, error) {
}

// Create traces wal if configured
var tracesWAL *tempodb_wal.WAL
var tracesWAL, tracesQueryWAL *tempodb_wal.WAL

if g.cfg.TracesWAL.Filepath != "" {
// Create separate wals per tenant by prefixing path with tenant ID
cfg := g.cfg.TracesWAL
cfg.Filepath = path.Join(cfg.Filepath, id)
tracesWAL, err = tempodb_wal.New(&cfg)
if err != nil {
_ = wal.Close()
return nil, err
}
}

tracesWALCfg := g.cfg.TracesWAL
tracesWALCfg.Filepath = path.Join(tracesWALCfg.Filepath, id)

tracesWAL, err = tempodb_wal.New(&tracesWALCfg)
if g.cfg.TracesQueryWAL.Filepath != "" {
// Create separate wals per tenant by prefixing path with tenant ID
cfg := g.cfg.TracesQueryWAL
cfg.Filepath = path.Join(cfg.Filepath, id)
tracesQueryWAL, err = tempodb_wal.New(&cfg)
if err != nil {
_ = wal.Close()
return nil, err
}
}

inst, err := newInstance(g.cfg, id, g.overrides, wal, reg, g.logger, tracesWAL, g.store)
inst, err := newInstance(g.cfg, id, g.overrides, wal, reg, g.logger, tracesWAL, tracesQueryWAL, g.store)
if err != nil {
_ = wal.Close()
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions modules/generator/generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func BenchmarkPushSpans(b *testing.B) {
wal, err := storage.New(walcfg, o, tenant, reg, log)
require.NoError(b, err)

inst, err := newInstance(cfg, tenant, o, wal, reg, log, nil, nil)
inst, err := newInstance(cfg, tenant, o, wal, reg, log, nil, nil, nil)
require.NoError(b, err)
defer inst.shutdown()

Expand Down Expand Up @@ -234,7 +234,7 @@ func BenchmarkCollect(b *testing.B) {
wal, err := storage.New(walcfg, o, tenant, reg, log)
require.NoError(b, err)

inst, err := newInstance(cfg, tenant, o, wal, reg, log, nil, nil)
inst, err := newInstance(cfg, tenant, o, wal, reg, log, nil, nil, nil)
require.NoError(b, err)
defer inst.shutdown()

Expand Down
111 changes: 93 additions & 18 deletions modules/generator/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/grafana/tempo/modules/generator/storage"
"github.com/grafana/tempo/pkg/tempopb"
v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1"
"github.com/grafana/tempo/pkg/traceql"
"github.com/grafana/tempo/tempodb/wal"

"go.uber.org/atomic"
Expand Down Expand Up @@ -74,33 +75,36 @@ type instance struct {
registry *registry.ManagedRegistry
wal storage.Storage

traceWAL *wal.WAL
writer tempodb.Writer
traceWAL *wal.WAL
traceQueryWAL *wal.WAL
writer tempodb.Writer

// processorsMtx protects the processors map, not the processors itself
processorsMtx sync.RWMutex
// processors is a map of processor name -> processor, only one instance of a processor can be
// active at any time
processors map[string]processor.Processor
processors map[string]processor.Processor
queuebasedLocalBlocks *localblocks.Processor

shutdownCh chan struct{}

reg prometheus.Registerer
logger log.Logger
}

func newInstance(cfg *Config, instanceID string, overrides metricsGeneratorOverrides, wal storage.Storage, reg prometheus.Registerer, logger log.Logger, traceWAL *wal.WAL, writer tempodb.Writer) (*instance, error) {
func newInstance(cfg *Config, instanceID string, overrides metricsGeneratorOverrides, wal storage.Storage, reg prometheus.Registerer, logger log.Logger, traceWAL, rf1TraceWAL *wal.WAL, writer tempodb.Writer) (*instance, error) {
logger = log.With(logger, "tenant", instanceID)

i := &instance{
cfg: cfg,
instanceID: instanceID,
overrides: overrides,

registry: registry.New(&cfg.Registry, overrides, instanceID, wal, logger),
wal: wal,
traceWAL: traceWAL,
writer: writer,
registry: registry.New(&cfg.Registry, overrides, instanceID, wal, logger),
wal: wal,
traceWAL: traceWAL,
traceQueryWAL: rf1TraceWAL,
writer: writer,

processors: make(map[string]processor.Processor),

Expand Down Expand Up @@ -304,6 +308,16 @@ func (i *instance) addProcessor(processorName string, cfg ProcessorConfig) error
return err
}
newProcessor = p

// Add the non-flushing alternate if configured
if i.traceQueryWAL != nil {
nonFlushingConfig := cfg.LocalBlocks
nonFlushingConfig.FlushToStorage = false
i.queuebasedLocalBlocks, err = localblocks.New(nonFlushingConfig, i.instanceID, i.traceQueryWAL, i.writer, i.overrides)
if err != nil {
return err
}
}
default:
level.Error(i.logger).Log(
"msg", fmt.Sprintf("processor does not exist, supported processors: [%s]", strings.Join(SupportedProcessors, ", ")),
Expand Down Expand Up @@ -335,6 +349,11 @@ func (i *instance) removeProcessor(processorName string) {
delete(i.processors, processorName)

deletedProcessor.Shutdown(context.Background())

if processorName == localblocks.Name && i.queuebasedLocalBlocks != nil {
i.queuebasedLocalBlocks.Shutdown(context.Background())
i.queuebasedLocalBlocks = nil
}
}

// updateProcessorMetrics updates the active processor metrics. Must be called under a read lock.
Expand Down Expand Up @@ -370,6 +389,11 @@ func (i *instance) pushSpansFromQueue(ctx context.Context, req *tempopb.PushSpan
}
processor.PushSpans(ctx, req)
}

// Now we push to the non-flushing local blocks if present
if i.queuebasedLocalBlocks != nil {
i.queuebasedLocalBlocks.PushSpans(ctx, req)
}
}

func (i *instance) preprocessSpans(req *tempopb.PushSpansRequest) {
Expand Down Expand Up @@ -418,23 +442,74 @@ func (i *instance) GetMetrics(ctx context.Context, req *tempopb.SpanMetricsReque
}

func (i *instance) QueryRange(ctx context.Context, req *tempopb.QueryRangeRequest) (resp *tempopb.QueryRangeResponse, err error) {
var processors []*localblocks.Processor

i.processorsMtx.RLock()
for _, processor := range i.processors {
switch p := processor.(type) {
case *localblocks.Processor:
r, err := p.QueryRange(ctx, req)
if err != nil {
return resp, err
}
processors = append(processors, p)
}
}

rr := r.ToProto(req)
return &tempopb.QueryRangeResponse{
Series: rr,
}, nil
default:
if i.queuebasedLocalBlocks != nil {
processors = append(processors, i.queuebasedLocalBlocks)
}

i.processorsMtx.RUnlock()

if len(processors) == 0 {
return resp, fmt.Errorf("localblocks processor not found")
}

ctx, cancel := context.WithCancel(ctx)
defer cancel()

expr, err := traceql.Parse(req.Query)
if err != nil {
return nil, fmt.Errorf("compiling query: %w", err)
}

unsafe := i.overrides.UnsafeQueryHints(i.instanceID)

timeOverlapCutoff := i.cfg.Processor.LocalBlocks.Metrics.TimeOverlapCutoff
if v, ok := expr.Hints.GetFloat(traceql.HintTimeOverlapCutoff, unsafe); ok && v >= 0 && v <= 1.0 {
timeOverlapCutoff = v
}

e := traceql.NewEngine()

// Compile the raw version of the query for head and wal blocks
// These aren't cached and we put them all into the same evaluator
// for efficiency.
rawEval, err := e.CompileMetricsQueryRange(req, int(req.Exemplars), timeOverlapCutoff, unsafe)
if err != nil {
return nil, err
}

// This is a summation version of the query for complete blocks
// which can be cached. They are timeseries, so they need the job-level evaluator.
jobEval, err := traceql.NewEngine().CompileMetricsQueryRangeNonRaw(req, traceql.AggregateModeSum)
if err != nil {
return nil, err
}

for _, p := range processors {
err = p.QueryRange(ctx, req, rawEval, jobEval)
if err != nil {
return nil, err
}
}

return resp, fmt.Errorf("localblocks processor not found")
// Combine the raw results into the job results
walResults := rawEval.Results().ToProto(req)
jobEval.ObserveSeries(walResults)

r := jobEval.Results()
rr := r.ToProto(req)
return &tempopb.QueryRangeResponse{
Series: rr,
}, nil
}

func (i *instance) updatePushMetrics(bytesIngested int, spanCount int, expiredSpanCount int) {
Expand Down
6 changes: 3 additions & 3 deletions modules/generator/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ func Test_instance_concurrency(t *testing.T) {
servicegraphs.Name: {},
}

instance1, err := newInstance(&Config{}, "test", overrides, &noopStorage{}, prometheus.DefaultRegisterer, log.NewNopLogger(), nil, nil)
instance1, err := newInstance(&Config{}, "test", overrides, &noopStorage{}, prometheus.DefaultRegisterer, log.NewNopLogger(), nil, nil, nil)
assert.NoError(t, err)

instance2, err := newInstance(&Config{}, "test", overrides, &noopStorage{}, prometheus.DefaultRegisterer, log.NewNopLogger(), nil, nil)
instance2, err := newInstance(&Config{}, "test", overrides, &noopStorage{}, prometheus.DefaultRegisterer, log.NewNopLogger(), nil, nil, nil)
assert.NoError(t, err)

end := make(chan struct{})
Expand Down Expand Up @@ -87,7 +87,7 @@ func Test_instance_updateProcessors(t *testing.T) {
logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stdout))
overrides := mockOverrides{}

instance, err := newInstance(&cfg, "test", &overrides, &noopStorage{}, prometheus.DefaultRegisterer, logger, nil, nil)
instance, err := newInstance(&cfg, "test", &overrides, &noopStorage{}, prometheus.DefaultRegisterer, logger, nil, nil, nil)
assert.NoError(t, err)

// stop the update goroutine
Expand Down
20 changes: 14 additions & 6 deletions modules/generator/processor/localblocks/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,24 @@ func TestProcessorDoesNotRace(t *testing.T) {
},
}
overrides = &mockOverrides{}
e = traceql.NewEngine()
)

p, err := New(cfg, tenant, wal, &mockWriter{}, overrides)
require.NoError(t, err)

qr := &tempopb.QueryRangeRequest{
Query: "{} | rate() by (resource.service.name)",
Start: uint64(time.Now().Add(-5 * time.Minute).UnixNano()),
End: uint64(time.Now().UnixNano()),
Step: uint64(30 * time.Second),
}
me, err := e.CompileMetricsQueryRange(qr, 0, 0, false)
require.NoError(t, err)

je, err := e.CompileMetricsQueryRangeNonRaw(qr, traceql.AggregateModeSum)
require.NoError(t, err)

var (
end = make(chan struct{})
wg = sync.WaitGroup{}
Expand Down Expand Up @@ -176,12 +189,7 @@ func TestProcessorDoesNotRace(t *testing.T) {
})

go concurrent(func() {
_, err := p.QueryRange(ctx, &tempopb.QueryRangeRequest{
Query: "{} | rate() by (resource.service.name)",
Start: uint64(time.Now().Add(-5 * time.Minute).UnixNano()),
End: uint64(time.Now().UnixNano()),
Step: uint64(30 * time.Second),
})
err := p.QueryRange(ctx, qr, me, je)
require.NoError(t, err)
})

Expand Down
Loading