Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* [CHANGE] Set `autocomplete_filtering_enabled` to `true` by default [#3178](https://github.com/grafana/tempo/pull/3178) (@mapno)
* [CHANGE] Update Alpine image version to 3.19 [#3289](https://github.com/grafana/tempo/pull/3289) (@zalegrala)
* [CHANGE] Introduce localblocks process config option to select only server spans 3303https://github.com/grafana/tempo/pull/3303 (@zalegrala)
* [CHANGE] Localblocks processor honor tenant max trace size limit [3305](https://github.com/grafana/tempo/pull/3305) (@mdisibio)
* [CHANGE] Major cache refactor to allow multiple role based caches to be configured [#3166](https://github.com/grafana/tempo/pull/3166).
**BREAKING CHANGE** Deprecate the following fields. These have all been migrated to a top level "cache:" field.
```
Expand Down
1 change: 1 addition & 0 deletions modules/generator/overrides.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type metricsGeneratorOverrides interface {
MetricsGeneratorProcessorServiceGraphsEnableClientServerPrefix(userID string) bool
MetricsGeneratorProcessorSpanMetricsTargetInfoExcludedDimensions(userID string) []string
DedicatedColumns(userID string) backend.DedicatedColumns
MaxBytesPerTrace(userID string) int
}

var _ metricsGeneratorOverrides = (overrides.Interface)(nil)
5 changes: 5 additions & 0 deletions modules/generator/overrides_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type mockOverrides struct {
localBlocksTraceIdlePeriod time.Duration
localBlocksCompleteBlockTimeout time.Duration
dedicatedColumns backend.DedicatedColumns
maxBytesPerTrace int
}

var _ metricsGeneratorOverrides = (*mockOverrides)(nil)
Expand Down Expand Up @@ -129,3 +130,7 @@ func (m *mockOverrides) MetricsGeneratorProcessorSpanMetricsTargetInfoExcludedDi
func (m *mockOverrides) DedicatedColumns(string) backend.DedicatedColumns {
return m.dedicatedColumns
}

func (m *mockOverrides) MaxBytesPerTrace(string) int {
return m.maxBytesPerTrace
}
16 changes: 4 additions & 12 deletions modules/generator/processor/localblocks/livetraces.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
package localblocks

import (
"errors"
"hash"
"hash/fnv"
"time"

v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1"
)

var errMaxExceeded = errors.New("asdf")

type liveTrace struct {
id []byte
timestamp time.Time
Expand All @@ -25,7 +22,7 @@ type liveTraces struct {
func newLiveTraces() *liveTraces {
return &liveTraces{
hash: fnv.New64(),
traces: map[uint64]*liveTrace{},
traces: make(map[uint64]*liveTrace),
}
}

Expand All @@ -39,12 +36,7 @@ func (l *liveTraces) Len() uint64 {
return uint64(len(l.traces))
}

func (l *liveTraces) Push(batch *v1.ResourceSpans, max uint64) error {
if len(batch.ScopeSpans) == 0 || len(batch.ScopeSpans[0].Spans) == 0 {
return nil
}

traceID := batch.ScopeSpans[0].Spans[0].TraceId
func (l *liveTraces) Push(traceID []byte, batch *v1.ResourceSpans, max uint64) bool {
token := l.token(traceID)

tr := l.traces[token]
Expand All @@ -53,7 +45,7 @@ func (l *liveTraces) Push(batch *v1.ResourceSpans, max uint64) error {
// Before adding this check against max
// Zero means no limit
if max > 0 && uint64(len(l.traces)) >= max {
return errMaxExceeded
return false
}

tr = &liveTrace{
Expand All @@ -64,7 +56,7 @@ func (l *liveTraces) Push(batch *v1.ResourceSpans, max uint64) error {

tr.Batches = append(tr.Batches, batch)
tr.timestamp = time.Now()
return nil
return true
}

func (l *liveTraces) CutIdle(idleSince time.Time) []*liveTrace {
Expand Down
7 changes: 7 additions & 0 deletions modules/generator/processor/localblocks/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const (
subsystem = "metrics_generator_processor_local_blocks"

reasonLiveTracesExceeded = "live_traces_exceeded"
reasonTraceSizeExceeded = "trace_too_large"
)

var (
Expand All @@ -25,6 +26,12 @@ var (
Name: "spans_total",
Help: "Total number of spans after filtering",
}, []string{"tenant"})
metricDroppedSpans = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "spans_dropped_total",
Help: "Number of spans dropped",
}, []string{"tenant", "reason"})
metricLiveTraces = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Expand Down
105 changes: 63 additions & 42 deletions modules/generator/processor/localblocks/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const timeBuffer = 5 * time.Minute
// ProcessorOverrides is just the set of overrides needed here.
type ProcessorOverrides interface {
DedicatedColumns(string) backend.DedicatedColumns
MaxBytesPerTrace(string) int
}

type Processor struct {
Expand All @@ -55,6 +56,7 @@ type Processor struct {

liveTracesMtx sync.Mutex
liveTraces *liveTraces
traceSizes *traceSizes
}

var _ gen.Processor = (*Processor)(nil)
Expand All @@ -81,6 +83,7 @@ func New(cfg Config, tenant string, wal *wal.WAL, overrides ProcessorOverrides)
walBlocks: map[uuid.UUID]common.WALBlock{},
completeBlocks: map[uuid.UUID]common.BackendBlock{},
liveTraces: newLiveTraces(),
traceSizes: newTraceSizes(),
closeCh: make(chan struct{}),
wg: sync.WaitGroup{},
cache: lru.New(100),
Expand Down Expand Up @@ -108,30 +111,43 @@ func (p *Processor) PushSpans(_ context.Context, req *tempopb.PushSpansRequest)
p.liveTracesMtx.Lock()
defer p.liveTracesMtx.Unlock()

var count int
before := p.liveTraces.Len()

// A quick way to reduce the number of spans we have to process
maxSz := p.overrides.MaxBytesPerTrace(p.tenant)

batches := req.Batches
if p.Cfg.FilterServerSpans {
for _, batch := range req.Batches {
if batch, count = filterBatch(batch); batch != nil {
err := p.liveTraces.Push(batch, p.Cfg.MaxLiveTraces)
if errors.Is(err, errMaxExceeded) {
metricDroppedTraces.WithLabelValues(p.tenant, reasonLiveTracesExceeded).Inc()
}
metricTotalSpans.WithLabelValues(p.tenant).Add(float64(count))
}
batches = filterBatches(batches)
}

for _, batch := range batches {

// Spans in the batch are for the same trace.
// We use the first one.
if len(batch.ScopeSpans) == 0 || len(batch.ScopeSpans[0].Spans) == 0 {
return
}
} else {
for _, batch := range req.Batches {
err := p.liveTraces.Push(batch, p.Cfg.MaxLiveTraces)
if errors.Is(err, errMaxExceeded) {
metricDroppedTraces.WithLabelValues(p.tenant, reasonLiveTracesExceeded).Inc()
}
for _, ss := range batch.ScopeSpans {
metricTotalSpans.WithLabelValues(p.tenant).Add(float64(len(ss.Spans)))
}
traceID := batch.ScopeSpans[0].Spans[0].TraceId

// Metric total spans regardless of outcome
numSpans := 0
for _, ss := range batch.ScopeSpans {
numSpans += len(ss.Spans)
}
metricTotalSpans.WithLabelValues(p.tenant).Add(float64(numSpans))

// Check max trace size
if maxSz > 0 && !p.traceSizes.Allow(traceID, batch.Size(), maxSz) {
metricDroppedSpans.WithLabelValues(p.tenant, reasonTraceSizeExceeded).Add(float64(numSpans))
continue
}

// Live traces
if !p.liveTraces.Push(traceID, batch, p.Cfg.MaxLiveTraces) {
metricDroppedTraces.WithLabelValues(p.tenant, reasonLiveTracesExceeded).Inc()
continue
}

}

after := p.liveTraces.Len()
Expand Down Expand Up @@ -615,6 +631,9 @@ func (p *Processor) cutBlocks(immediate bool) error {
return nil
}

// Clear historical trace sizes for traces that weren't seen in this block.
p.traceSizes.ClearIdle(p.lastCutTime)

// Final flush
err := p.headBlock.Flush()
if err != nil {
Expand Down Expand Up @@ -741,36 +760,38 @@ func metricSeriesToProto(series traceqlmetrics.MetricSeries) []*tempopb.KeyValue
return r
}

// filterBatch to only root spans or kind==server. Does not modify the input
// filterBatches to only root spans or kind==server. Does not modify the input
// but returns a new struct referencing the same input pointers. Returns nil
// if there were no matching spans.
func filterBatch(batch *v1.ResourceSpans) (*v1.ResourceSpans, int) {
var keep int
var keepSS []*v1.ScopeSpans
for _, ss := range batch.ScopeSpans {

var keepSpans []*v1.Span
for _, s := range ss.Spans {
if s.Kind == v1.Span_SPAN_KIND_SERVER || len(s.ParentSpanId) == 0 {
keepSpans = append(keepSpans, s)
func filterBatches(batches []*v1.ResourceSpans) []*v1.ResourceSpans {
keep := make([]*v1.ResourceSpans, 0, len(batches))

for _, batch := range batches {
var keepSS []*v1.ScopeSpans
for _, ss := range batch.ScopeSpans {

var keepSpans []*v1.Span
for _, s := range ss.Spans {
if s.Kind == v1.Span_SPAN_KIND_SERVER || len(s.ParentSpanId) == 0 {
keepSpans = append(keepSpans, s)
}
}

if len(keepSpans) > 0 {
keepSS = append(keepSS, &v1.ScopeSpans{
Scope: ss.Scope,
Spans: keepSpans,
})
}
}

if len(keepSpans) > 0 {
keepSS = append(keepSS, &v1.ScopeSpans{
Scope: ss.Scope,
Spans: keepSpans,
if len(keepSS) > 0 {
keep = append(keep, &v1.ResourceSpans{
Resource: batch.Resource,
ScopeSpans: keepSS,
})
keep += len(keepSpans)
}
}

if len(keepSS) > 0 {
return &v1.ResourceSpans{
Resource: batch.Resource,
ScopeSpans: keepSS,
}, keep
}

return nil, 0
return keep
}
4 changes: 4 additions & 0 deletions modules/generator/processor/localblocks/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ func (m *mockOverrides) DedicatedColumns(string) backend.DedicatedColumns {
return nil
}

func (m *mockOverrides) MaxBytesPerTrace(string) int {
return 0
}

func TestProcessorDoesNotRace(t *testing.T) {
wal, err := wal.New(&wal.Config{
Filepath: t.TempDir(),
Expand Down
65 changes: 65 additions & 0 deletions modules/generator/processor/localblocks/traceSizes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package localblocks

import (
"hash"
"hash/fnv"
"sync"
"time"
)

type traceSizes struct {
mtx sync.Mutex
hash hash.Hash64
sizes map[uint64]*traceSize
}

type traceSize struct {
size int
timestamp time.Time
}

func newTraceSizes() *traceSizes {
return &traceSizes{
hash: fnv.New64(),
sizes: make(map[uint64]*traceSize),
}
}

func (s *traceSizes) token(traceID []byte) uint64 {
s.hash.Reset()
s.hash.Write(traceID)
return s.hash.Sum64()
}

// Allow returns true if the historical total plus incoming size is less than
// or equal to the max. The historical total is kept alive and incremented even
// if not allowed, so that long-running traces are cutoff as expected.
func (s *traceSizes) Allow(traceID []byte, sz, max int) bool {
s.mtx.Lock()
defer s.mtx.Unlock()

token := s.token(traceID)
tr := s.sizes[token]
if tr == nil {
tr = &traceSize{
size: sz,
}
s.sizes[token] = tr
}

tr.timestamp = time.Now()
tr.size += sz

return tr.size <= max
}

func (s *traceSizes) ClearIdle(idleSince time.Time) {
s.mtx.Lock()
defer s.mtx.Unlock()

for token, tr := range s.sizes {
if tr.timestamp.Before(idleSince) {
delete(s.sizes, token)
}
}
}