Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* [FEATURE] Flush and query RF1 blocks for TraceQL metric queries [#3628](https://github.com/grafana/tempo/pull/3628) [#3691](https://github.com/grafana/tempo/pull/3691) [#3723](https://github.com/grafana/tempo/pull/3723) (@mapno)
* [ENHANCEMENT] Tag value lookup use protobuf internally for improved latency [#3731](https://github.com/grafana/tempo/pull/3731) (@mdisibio)
* [ENHANCEMENT] TraceQL metrics queries use protobuf internally for improved latency [#3745](https://github.com/grafana/tempo/pull/3745) (@mdisibio)
* [ENHANCEMENT] Add local disk caching of metrics queries in local-blocks processor [#3799](https://github.com/grafana/tempo/pull/3799) (@mdisibio)
* [ENHANCEMENT] Improve use of OTEL semantic conventions on the service graph [#3711](https://github.com/grafana/tempo/pull/3711) (@zalegrala)
* [ENHANCEMENT] Performance improvement for `rate() by ()` queries [#3719](https://github.com/grafana/tempo/pull/3719) (@mapno)
* [ENHANCEMENT] Use multiple goroutines to unmarshal responses in parallel in the query frontend. [#3713](https://github.com/grafana/tempo/pull/3713) (@joe-elliott)
Expand Down
12 changes: 6 additions & 6 deletions modules/frontend/metrics_query_range_sharder.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (s queryRangeSharder) RoundTrip(r *http.Request) (pipeline.Responses[combin
return pipeline.NewBadRequest(errors.New("invalid interval specified: 0")), nil
}

generatorReq := s.generatorRequest(*req, r, tenantID, now, samplingRate)
generatorReq := s.generatorRequest(*req, r, tenantID, now)
reqCh := make(chan *http.Request, 2) // buffer of 2 allows us to insert generatorReq and metrics

if generatorReq != nil {
Expand Down Expand Up @@ -367,10 +367,12 @@ func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID s
continue
}

start, end := traceql.TrimToOverlap(searchReq.Start, searchReq.End, searchReq.Step, uint64(m.StartTime.UnixNano()), uint64(m.EndTime.UnixNano()))

queryRangeReq := &tempopb.QueryRangeRequest{
Query: searchReq.Query,
Start: max(searchReq.Start, uint64(m.StartTime.UnixNano())),
End: min(searchReq.End, uint64(m.EndTime.UnixNano())),
Start: start,
End: end,
Step: searchReq.Step,
// ShardID: uint32, // No sharding with RF=1
// ShardCount: uint32, // No sharding with RF=1
Expand All @@ -385,8 +387,6 @@ func (s *queryRangeSharder) buildBackendRequests(ctx context.Context, tenantID s
FooterSize: m.FooterSize,
DedicatedColumns: dc,
}
alignTimeRange(queryRangeReq)
queryRangeReq.End += queryRangeReq.Step

subR = api.BuildQueryRangeRequest(subR, queryRangeReq)
subR.Header.Set(api.HeaderAccept, api.HeaderAcceptProtobuf)
Expand Down Expand Up @@ -418,7 +418,7 @@ func (s *queryRangeSharder) backendRange(now time.Time, start, end uint64, query
return start, end
}

func (s *queryRangeSharder) generatorRequest(searchReq tempopb.QueryRangeRequest, parent *http.Request, tenantID string, now time.Time, samplingRate float64) *http.Request {
func (s *queryRangeSharder) generatorRequest(searchReq tempopb.QueryRangeRequest, parent *http.Request, tenantID string, now time.Time) *http.Request {
cutoff := uint64(now.Add(-s.cfg.QueryBackendAfter).UnixNano())

// if there's no overlap between the query and ingester range just return nil
Expand Down
1 change: 1 addition & 0 deletions modules/generator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)
cfg.Registry.RegisterFlagsAndApplyDefaults(prefix, f)
cfg.Storage.RegisterFlagsAndApplyDefaults(prefix, f)
cfg.TracesWAL.Version = encoding.DefaultEncoding().Version()
cfg.TracesWAL.IngestionSlack = 2 * time.Minute

// setting default for max span age before discarding to 30s
cfg.MetricsIngestionSlack = 30 * time.Second
Expand Down
130 changes: 23 additions & 107 deletions modules/generator/processor/localblocks/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,8 @@ import (
"github.com/grafana/tempo/modules/ingester"
"github.com/grafana/tempo/pkg/flushqueues"
"github.com/grafana/tempo/tempodb"
"github.com/opentracing/opentracing-go"
"go.uber.org/atomic"

gen "github.com/grafana/tempo/modules/generator/processor"
"github.com/grafana/tempo/pkg/boundedwaitgroup"
"github.com/grafana/tempo/pkg/model"
"github.com/grafana/tempo/pkg/tempopb"
v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1"
Expand All @@ -48,6 +45,8 @@ type Processor struct {
logger kitlog.Logger
Cfg Config
wal *wal.WAL
walR backend.Reader
walW backend.Writer
closeCh chan struct{}
wg sync.WaitGroup
cacheMtx sync.RWMutex
Expand Down Expand Up @@ -90,6 +89,8 @@ func New(cfg Config, tenant string, wal *wal.WAL, writer tempodb.Writer, overrid
logger: log.WithUserID(tenant, log.Logger),
Cfg: cfg,
wal: wal,
walR: backend.NewReader(wal.LocalBackend()),
walW: backend.NewWriter(wal.LocalBackend()),
overrides: overrides,
enc: enc,
walBlocks: map[uuid.UUID]common.WALBlock{},
Expand Down Expand Up @@ -494,108 +495,6 @@ func (p *Processor) GetMetrics(ctx context.Context, req *tempopb.SpanMetricsRequ
return resp, nil
}

// QueryRange returns metrics.
func (p *Processor) QueryRange(ctx context.Context, req *tempopb.QueryRangeRequest) (traceql.SeriesSet, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

p.blocksMtx.RLock()
defer p.blocksMtx.RUnlock()

cutoff := time.Now().Add(-p.Cfg.CompleteBlockTimeout).Add(-timeBuffer)
if req.Start < uint64(cutoff.UnixNano()) {
return nil, fmt.Errorf("time range must be within last %v", p.Cfg.CompleteBlockTimeout)
}

// Blocks to check
blocks := make([]common.BackendBlock, 0, 1+len(p.walBlocks)+len(p.completeBlocks))
if p.headBlock != nil {
blocks = append(blocks, p.headBlock)
}
for _, b := range p.walBlocks {
blocks = append(blocks, b)
}
for _, b := range p.completeBlocks {
blocks = append(blocks, b)
}
if len(blocks) == 0 {
return nil, nil
}

expr, err := traceql.Parse(req.Query)
if err != nil {
return nil, fmt.Errorf("compiling query: %w", err)
}

unsafe := p.overrides.UnsafeQueryHints(p.tenant)

timeOverlapCutoff := p.Cfg.Metrics.TimeOverlapCutoff
if v, ok := expr.Hints.GetFloat(traceql.HintTimeOverlapCutoff, unsafe); ok && v >= 0 && v <= 1.0 {
timeOverlapCutoff = v
}

concurrency := p.Cfg.Metrics.ConcurrentBlocks
if v, ok := expr.Hints.GetInt(traceql.HintConcurrentBlocks, unsafe); ok && v > 0 && v < 100 {
concurrency = uint(v)
}

// Compile the sharded version of the query
eval, err := traceql.NewEngine().CompileMetricsQueryRange(req, false, timeOverlapCutoff, unsafe)
if err != nil {
return nil, err
}

var (
wg = boundedwaitgroup.New(concurrency)
jobErr = atomic.Error{}
)

for _, b := range blocks {
// If a job errored then quit immediately.
if err := jobErr.Load(); err != nil {
return nil, err
}

start := uint64(b.BlockMeta().StartTime.UnixNano())
end := uint64(b.BlockMeta().EndTime.UnixNano())
if start > req.End || end < req.Start {
// Out of time range
continue
}

wg.Add(1)
go func(b common.BackendBlock) {
defer wg.Done()

m := b.BlockMeta()

span, ctx := opentracing.StartSpanFromContext(ctx, "Processor.QueryRange.Block", opentracing.Tags{
"block": m.BlockID,
"blockSize": m.Size,
})
defer span.Finish()

// TODO - caching
f := traceql.NewSpansetFetcherWrapper(func(ctx context.Context, req traceql.FetchSpansRequest) (traceql.FetchSpansResponse, error) {
return b.Fetch(ctx, req, common.DefaultSearchOptions())
})

err := eval.Do(ctx, f, uint64(m.StartTime.UnixNano()), uint64(m.EndTime.UnixNano()))
if err != nil {
jobErr.Store(err)
}
}(b)
}

wg.Wait()

if err := jobErr.Load(); err != nil {
return nil, err
}

return eval.Results(), nil
}

func (p *Processor) metricsCacheGet(key string) *traceqlmetrics.MetricsResults {
p.cacheMtx.RLock()
defer p.cacheMtx.RUnlock()
Expand Down Expand Up @@ -713,9 +612,26 @@ func (p *Processor) writeHeadBlock(id common.ID, tr *tempopb.Trace) error {
}
}

now := uint32(time.Now().Unix())
// Get trace timestamp bounds
var start, end uint64
for _, b := range tr.Batches {
for _, ss := range b.ScopeSpans {
for _, s := range ss.Spans {
if start == 0 || s.StartTimeUnixNano < start {
start = s.StartTimeUnixNano
}
if s.EndTimeUnixNano > end {
end = s.EndTimeUnixNano
}
}
}
}

// Convert from unix nanos to unix seconds
startSeconds := uint32(start / uint64(time.Second))
endSeconds := uint32(end / uint64(time.Second))

err := p.headBlock.AppendTrace(id, tr, now, now)
err := p.headBlock.AppendTrace(id, tr, startSeconds, endSeconds)
if err != nil {
return err
}
Expand Down
Loading