Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions modules/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,15 @@ func (b *BlockBuilder) consumePartition(ctx context.Context, partition int32, ov
startOffset = kgo.NewOffset().AtStart()
}

ends, err := b.kadm.ListEndOffsets(ctx, topic)
if err != nil {
return false, err
}
if err := ends.Error(); err != nil {
return false, err
}
lastPossibleMessage, lastPossibleMessageFound := ends.Lookup(topic, partition)

level.Info(b.logger).Log(
"msg", "consuming partition",
"partition", partition,
Expand Down Expand Up @@ -344,6 +353,11 @@ outer:
}

lastRec = rec

if lastPossibleMessageFound && lastRec.Offset >= lastPossibleMessage.Offset-1 {
// We reached the end so break now and avoid another poll which is expected to be empty.
break outer
}
}
}

Expand Down
3 changes: 0 additions & 3 deletions modules/blockbuilder/blockbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,6 @@ func BenchmarkBlockBuilder(b *testing.B) {
for i := 0; i < b.N; i++ {

var records []*kgo.Record

for i := 0; i < 1000; i++ {
records = append(records, sendReq(b, ctx, client)...)
}
Expand All @@ -520,8 +519,6 @@ func BenchmarkBlockBuilder(b *testing.B) {
size += len(r.Value)
}

b.ResetTimer()

err = bb.consume(ctx)
require.NoError(b, err)

Expand Down
9 changes: 1 addition & 8 deletions modules/blockbuilder/partition_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ package blockbuilder

import (
"context"
"fmt"
"strings"
"sync"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gogo/protobuf/proto"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/util"
"github.com/grafana/tempo/tempodb"
Expand Down Expand Up @@ -64,12 +62,7 @@ func (p *writer) pushBytes(ts time.Time, tenant string, req *tempopb.PushBytesRe
}

for j, trace := range req.Traces {
tr := new(tempopb.Trace) // TODO - Pool?
if err := proto.Unmarshal(trace.Slice, tr); err != nil {
return fmt.Errorf("failed to unmarshal trace: %w", err)
}

if err := i.AppendTrace(req.Ids[j], tr, ts); err != nil {
if err := i.AppendTrace(req.Ids[j], trace.Slice, ts); err != nil {
return err
}
}
Expand Down
104 changes: 74 additions & 30 deletions modules/blockbuilder/tenant_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type tenantStore struct {
blocksMtx sync.Mutex
walBlocks []common.WALBlock

liveTraces *livetraces.LiveTraces
liveTraces *livetraces.LiveTraces[[]byte]
traceSizes *tracesizes.Tracker
}

Expand All @@ -73,7 +73,7 @@ func newTenantStore(tenantID string, partitionID, endTimestamp uint64, cfg Block
headBlockMtx: sync.Mutex{},
blocksMtx: sync.Mutex{},
enc: enc,
liveTraces: livetraces.New(),
liveTraces: livetraces.New[[]byte](func(b []byte) uint64 { return uint64(len(b)) }),
traceSizes: tracesizes.New(),
}

Expand Down Expand Up @@ -122,61 +122,105 @@ func (s *tenantStore) resetHeadBlock() error {
return nil
}

func (s *tenantStore) AppendTrace(traceID []byte, tr *tempopb.Trace, ts time.Time) error {
func (s *tenantStore) AppendTrace(traceID []byte, tr []byte, ts time.Time) error {
maxSz := s.overrides.MaxBytesPerTrace(s.tenantID)

for _, b := range tr.ResourceSpans {
if maxSz > 0 && !s.traceSizes.Allow(traceID, b.Size(), maxSz) {
// Record dropped spans due to trace too large
count := 0
if maxSz > 0 && !s.traceSizes.Allow(traceID, len(tr), maxSz) {
// Record dropped spans due to trace too large
// We have to unmarhal to count the number of spans.
// TODO - There might be a better way
t := &tempopb.Trace{}
if err := t.Unmarshal(tr); err != nil {
return err
}
count := 0
for _, b := range t.ResourceSpans {
for _, ss := range b.ScopeSpans {
count += len(ss.Spans)
}
overrides.RecordDiscardedSpans(count, reasonTraceTooLarge, s.tenantID)
continue
}

s.liveTraces.PushWithTimestamp(ts, traceID, b, 0)
overrides.RecordDiscardedSpans(count, reasonTraceTooLarge, s.tenantID)
return nil
}

s.liveTraces.PushWithTimestamp(ts, traceID, tr, 0)

return nil
}

func (s *tenantStore) CutIdle(since time.Time, immediate bool) error {
idle := s.liveTraces.CutIdle(since, immediate)

slices.SortFunc(idle, func(a, b *livetraces.LiveTrace) int {
slices.SortFunc(idle, func(a, b *livetraces.LiveTrace[[]byte]) int {
return bytes.Compare(a.ID, b.ID)
})

for _, e := range idle {
tr := &tempopb.Trace{
ResourceSpans: e.Batches,
}
var (
unmarshalWg = sync.WaitGroup{}
unmarshalErr = atomic.NewError(nil)
unmarshaled = make([]*tempopb.Trace, len(idle))
starts = make([]uint32, len(idle))
ends = make([]uint32, len(idle))
)

// Get trace timestamp bounds
var start, end uint64
for _, b := range tr.ResourceSpans {
for _, ss := range b.ScopeSpans {
for _, s := range ss.Spans {
if start == 0 || s.StartTimeUnixNano < start {
start = s.StartTimeUnixNano
// Unmarshal and process in parallel, each goroutine handles 1/Nth
for i := 0; i < len(idle) && i < flushConcurrency; i++ {
unmarshalWg.Add(1)
go func(i int) {
defer unmarshalWg.Done()

for j := i; j < len(idle); j += flushConcurrency {
tr := new(tempopb.Trace)

for _, b := range idle[j].Batches {
// This unmarshal appends the batches onto the existing tempopb.Trace
// so we don't need to allocate another container temporarily
err := tr.Unmarshal(b)
if err != nil {
unmarshalErr.Store(err)
return
}
if s.EndTimeUnixNano > end {
end = s.EndTimeUnixNano
}

// Get trace timestamp bounds
var start, end uint64
for _, b := range tr.ResourceSpans {
for _, ss := range b.ScopeSpans {
for _, s := range ss.Spans {
if start == 0 || s.StartTimeUnixNano < start {
start = s.StartTimeUnixNano
}
if s.EndTimeUnixNano > end {
end = s.EndTimeUnixNano
}
}
}
}

// Convert from unix nanos to unix seconds
starts[j] = uint32(start / uint64(time.Second))
ends[j] = uint32(end / uint64(time.Second))
unmarshaled[j] = tr
}
}
}(i)
}

// Convert from unix nanos to unix seconds
startSeconds := uint32(start / uint64(time.Second))
endSeconds := uint32(end / uint64(time.Second))
unmarshalWg.Wait()
if err := unmarshalErr.Load(); err != nil {
return err
}

if err := s.headBlock.AppendTrace(e.ID, tr, startSeconds, endSeconds); err != nil {
for i, tr := range unmarshaled {
if err := s.headBlock.AppendTrace(idle[i].ID, tr, starts[i], ends[i]); err != nil {
return err
}
}

// Return prealloc slices to the pool
for _, i := range idle {
tempopb.ReuseByteSlices(i.Batches)
}

err := s.headBlock.Flush()
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions modules/generator/processor/localblocks/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type Processor struct {
flushqueue *flushqueues.PriorityQueue

liveTracesMtx sync.Mutex
liveTraces *livetraces.LiveTraces
liveTraces *livetraces.LiveTraces[*v1.ResourceSpans]
traceSizes *tracesizes.Tracker

writer tempodb.Writer
Expand Down Expand Up @@ -104,7 +104,7 @@ func New(cfg Config, tenant string, wal *wal.WAL, writer tempodb.Writer, overrid
walBlocks: map[uuid.UUID]common.WALBlock{},
completeBlocks: map[uuid.UUID]*ingester.LocalBlock{},
flushqueue: flushqueues.NewPriorityQueue(metricFlushQueueSize.WithLabelValues(tenant)),
liveTraces: livetraces.New(),
liveTraces: livetraces.New[*v1.ResourceSpans](func(rs *v1.ResourceSpans) uint64 { return uint64(rs.Size()) }),
traceSizes: tracesizes.New(),
closeCh: make(chan struct{}),
wg: sync.WaitGroup{},
Expand Down
4 changes: 3 additions & 1 deletion pkg/ingest/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,9 @@ func (d *Decoder) Decode(data []byte) (*tempopb.PushBytesRequest, error) {
}

func (d *Decoder) Reset() {
d.req.Reset()
// Retain slice capacity
d.req.Ids = d.req.Ids[:0]
d.req.Traces = d.req.Traces[:0]
}

// sovPush calculates the size of varint-encoded uint64.
Expand Down
40 changes: 23 additions & 17 deletions pkg/livetraces/livetraces.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,47 +8,53 @@ import (
v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1"
)

type LiveTrace struct {
type LiveTraceBatchT interface {
*v1.ResourceSpans | []byte
}

type LiveTrace[T LiveTraceBatchT] struct {
ID []byte
timestamp time.Time
Batches []*v1.ResourceSpans
Batches []T

sz uint64
}

type LiveTraces struct {
type LiveTraces[T LiveTraceBatchT] struct {
hash hash.Hash64
Traces map[uint64]*LiveTrace
Traces map[uint64]*LiveTrace[T]

sz uint64
sz uint64
szFunc func(T) uint64
}

func New() *LiveTraces {
return &LiveTraces{
func New[T LiveTraceBatchT](sizeFunc func(T) uint64) *LiveTraces[T] {
return &LiveTraces[T]{
hash: fnv.New64(),
Traces: make(map[uint64]*LiveTrace),
Traces: make(map[uint64]*LiveTrace[T]),
szFunc: sizeFunc,
}
}

func (l *LiveTraces) token(traceID []byte) uint64 {
func (l *LiveTraces[T]) token(traceID []byte) uint64 {
l.hash.Reset()
l.hash.Write(traceID)
return l.hash.Sum64()
}

func (l *LiveTraces) Len() uint64 {
func (l *LiveTraces[T]) Len() uint64 {
return uint64(len(l.Traces))
}

func (l *LiveTraces) Size() uint64 {
func (l *LiveTraces[T]) Size() uint64 {
return l.sz
}

func (l *LiveTraces) Push(traceID []byte, batch *v1.ResourceSpans, max uint64) bool {
func (l *LiveTraces[T]) Push(traceID []byte, batch T, max uint64) bool {
return l.PushWithTimestamp(time.Now(), traceID, batch, max)
}

func (l *LiveTraces) PushWithTimestamp(ts time.Time, traceID []byte, batch *v1.ResourceSpans, max uint64) bool {
func (l *LiveTraces[T]) PushWithTimestamp(ts time.Time, traceID []byte, batch T, max uint64) bool {
token := l.token(traceID)

tr := l.Traces[token]
Expand All @@ -60,13 +66,13 @@ func (l *LiveTraces) PushWithTimestamp(ts time.Time, traceID []byte, batch *v1.R
return false
}

tr = &LiveTrace{
tr = &LiveTrace[T]{
ID: traceID,
}
l.Traces[token] = tr
}

sz := uint64(batch.Size())
sz := l.szFunc(batch)
tr.sz += sz
l.sz += sz

Expand All @@ -75,8 +81,8 @@ func (l *LiveTraces) PushWithTimestamp(ts time.Time, traceID []byte, batch *v1.R
return true
}

func (l *LiveTraces) CutIdle(idleSince time.Time, immediate bool) []*LiveTrace {
res := []*LiveTrace{}
func (l *LiveTraces[T]) CutIdle(idleSince time.Time, immediate bool) []*LiveTrace[T] {
res := []*LiveTrace[T]{}

for k, tr := range l.Traces {
if tr.timestamp.Before(idleSince) || immediate {
Expand Down
3 changes: 2 additions & 1 deletion pkg/livetraces/livetraces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ import (
"testing"
"time"

v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1"
"github.com/grafana/tempo/pkg/util/test"
"github.com/stretchr/testify/require"
)

func TestLiveTracesSizesAndLen(t *testing.T) {
lt := New()
lt := New[*v1.ResourceSpans](func(rs *v1.ResourceSpans) uint64 { return uint64(rs.Size()) })

expectedSz := uint64(0)
expectedLen := uint64(0)
Expand Down