Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions modules/generator/overrides.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type metricsGeneratorOverrides interface {
MetricsGeneratorProcessorServiceGraphsEnableClientServerPrefix(userID string) bool
MetricsGeneratorProcessorSpanMetricsTargetInfoExcludedDimensions(userID string) []string
DedicatedColumns(userID string) backend.DedicatedColumns
MaxBytesPerTrace(userID string) int
}

var _ metricsGeneratorOverrides = (overrides.Interface)(nil)
5 changes: 5 additions & 0 deletions modules/generator/overrides_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type mockOverrides struct {
localBlocksTraceIdlePeriod time.Duration
localBlocksCompleteBlockTimeout time.Duration
dedicatedColumns backend.DedicatedColumns
maxBytesPerTrace int
}

var _ metricsGeneratorOverrides = (*mockOverrides)(nil)
Expand Down Expand Up @@ -129,3 +130,7 @@ func (m *mockOverrides) MetricsGeneratorProcessorSpanMetricsTargetInfoExcludedDi
func (m *mockOverrides) DedicatedColumns(string) backend.DedicatedColumns {
return m.dedicatedColumns
}

func (m *mockOverrides) MaxBytesPerTrace(string) int {
return m.maxBytesPerTrace
}
18 changes: 5 additions & 13 deletions modules/generator/processor/localblocks/livetraces.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
package localblocks

import (
"errors"
"hash"
"hash/fnv"
"time"

v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1"
)

var errMaxExceeded = errors.New("asdf")

type liveTrace struct {
id []byte
timestamp time.Time
Expand Down Expand Up @@ -39,21 +36,16 @@ func (l *liveTraces) Len() uint64 {
return uint64(len(l.traces))
}

func (l *liveTraces) Push(batch *v1.ResourceSpans, max uint64) error {
if len(batch.ScopeSpans) == 0 || len(batch.ScopeSpans[0].Spans) == 0 {
return nil
}

traceID := batch.ScopeSpans[0].Spans[0].TraceId
func (l *liveTraces) Push(traceID []byte, batches []*v1.ResourceSpans, maxTraces uint64) bool {
Comment thread
mdisibio marked this conversation as resolved.
Outdated
token := l.token(traceID)

tr := l.traces[token]
if tr == nil {

// Before adding this check against max
// Zero means no limit
if max > 0 && uint64(len(l.traces)) >= max {
return errMaxExceeded
if maxTraces > 0 && uint64(len(l.traces)) >= maxTraces {
return false
}

tr = &liveTrace{
Expand All @@ -62,9 +54,9 @@ func (l *liveTraces) Push(batch *v1.ResourceSpans, max uint64) error {
l.traces[token] = tr
}

tr.Batches = append(tr.Batches, batch)
tr.Batches = append(tr.Batches, batches...)
tr.timestamp = time.Now()
return nil
return true
}

func (l *liveTraces) CutIdle(idleSince time.Time) []*liveTrace {
Expand Down
7 changes: 7 additions & 0 deletions modules/generator/processor/localblocks/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const (
subsystem = "metrics_generator_processor_local_blocks"

reasonLiveTracesExceeded = "live_traces_exceeded"
reasonTraceSizeExceeded = "trace_too_large"
)

var (
Expand All @@ -25,6 +26,12 @@ var (
Name: "spans_total",
Help: "Total number of spans after filtering",
}, []string{"tenant"})
metricDroppedSpans = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "spans_dropped_total",
Help: "Number of spans dropped",
}, []string{"tenant", "reason"})
metricLiveTraces = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Expand Down
40 changes: 31 additions & 9 deletions modules/generator/processor/localblocks/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const timeBuffer = 5 * time.Minute
// ProcessorOverrides is just the set of overrides needed here.
type ProcessorOverrides interface {
DedicatedColumns(string) backend.DedicatedColumns
MaxBytesPerTrace(string) int
}

type Processor struct {
Expand All @@ -53,6 +54,7 @@ type Processor struct {

liveTracesMtx sync.Mutex
liveTraces *liveTraces
traceSizes *traceSizes
}

var _ gen.Processor = (*Processor)(nil)
Expand All @@ -79,6 +81,7 @@ func New(cfg Config, tenant string, wal *wal.WAL, overrides ProcessorOverrides)
walBlocks: map[uuid.UUID]common.WALBlock{},
completeBlocks: map[uuid.UUID]common.BackendBlock{},
liveTraces: newLiveTraces(),
traceSizes: newTraceSizes(),
closeCh: make(chan struct{}),
wg: sync.WaitGroup{},
cache: lru.New(100),
Expand All @@ -103,21 +106,37 @@ func (*Processor) Name() string {
}

func (p *Processor) PushSpans(_ context.Context, req *tempopb.PushSpansRequest) {
p.liveTracesMtx.Lock()
defer p.liveTracesMtx.Unlock()

before := p.liveTraces.Len()
// All requests contain only a single trace so this is sufficient.
if len(req.Batches) == 0 || len(req.Batches[0].ScopeSpans) == 0 || len(req.Batches[0].ScopeSpans[0].Spans) == 0 {
return
}
traceID := req.Batches[0].ScopeSpans[0].Spans[0].TraceId

// Metric total spans regardless of outcome
numSpans := 0
for _, batch := range req.Batches {
err := p.liveTraces.Push(batch, p.Cfg.MaxLiveTraces)
if errors.Is(err, errMaxExceeded) {
metricDroppedTraces.WithLabelValues(p.tenant, reasonLiveTracesExceeded).Inc()
}
for _, ss := range batch.ScopeSpans {
metricTotalSpans.WithLabelValues(p.tenant).Add(float64(len(ss.Spans)))
numSpans += len(ss.Spans)
}
}
metricTotalSpans.WithLabelValues(p.tenant).Add(float64(numSpans))

// Check max trace size
maxSz := p.overrides.MaxBytesPerTrace(p.tenant)
if maxSz > 0 && !p.traceSizes.Allow(traceID, req.Size(), maxSz) {
metricDroppedSpans.WithLabelValues(p.tenant, reasonTraceSizeExceeded).Add(float64(numSpans))
return
}

// Live traces
p.liveTracesMtx.Lock()
defer p.liveTracesMtx.Unlock()

before := p.liveTraces.Len()
if !p.liveTraces.Push(traceID, req.Batches, p.Cfg.MaxLiveTraces) {
metricDroppedTraces.WithLabelValues(p.tenant, reasonLiveTracesExceeded).Inc()
return
}
after := p.liveTraces.Len()

// Number of new traces is the delta
Expand Down Expand Up @@ -599,6 +618,9 @@ func (p *Processor) cutBlocks(immediate bool) error {
return nil
}

// Clear historical trace sizes for traces that weren't seen in this block.
p.traceSizes.ClearIdle(p.lastCutTime)

// Final flush
err := p.headBlock.Flush()
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions modules/generator/processor/localblocks/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ func (m *mockOverrides) DedicatedColumns(string) backend.DedicatedColumns {
return nil
}

func (m *mockOverrides) MaxBytesPerTrace(string) int {
return 0
}

func TestProcessorDoesNotRace(t *testing.T) {
wal, err := wal.New(&wal.Config{
Filepath: t.TempDir(),
Expand Down
65 changes: 65 additions & 0 deletions modules/generator/processor/localblocks/traceSizes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package localblocks

import (
"hash"
"hash/fnv"
"sync"
"time"
)

type traceSizes struct {
mtx sync.Mutex
hash hash.Hash64
sizes map[uint64]*traceSize
}

type traceSize struct {
size int
timestamp time.Time
}

func newTraceSizes() *traceSizes {
return &traceSizes{
hash: fnv.New64(),
sizes: map[uint64]*traceSize{},
Comment thread
mdisibio marked this conversation as resolved.
Outdated
}
}

func (s *traceSizes) token(traceID []byte) uint64 {
s.hash.Reset()
s.hash.Write(traceID)
return s.hash.Sum64()
}

// Allow returns true if the historical total plus incoming size is less than
// or equal to the max. The historical total is kept alive and incremented even
// if not allowed, so that long-running traces are cutoff as expected.
func (s *traceSizes) Allow(traceID []byte, sz, max int) bool {
s.mtx.Lock()
defer s.mtx.Unlock()

token := s.token(traceID)
tr := s.sizes[token]
if tr == nil {
tr = &traceSize{
size: sz,
}
s.sizes[token] = tr
}

tr.timestamp = time.Now()
tr.size += sz

return tr.size <= max
}

func (s *traceSizes) ClearIdle(idleSince time.Time) {
s.mtx.Lock()
defer s.mtx.Unlock()

for token, tr := range s.sizes {
if tr.timestamp.Before(idleSince) {
delete(s.sizes, token)
}
}
}