Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* [ENHANCEMENT] Used frontend MaxExemplars config as single source of truth for exemplar limits. Added a safety cap at the traceql engine entry points. [#6515](https://github.com/grafana/tempo/pull/6515) (@zhxiaogg)
* [CHANGE] Set default `max_result_limit` for search to 256*1024 [#6525](https://github.com/grafana/tempo/pull/6525) (@zhxiaogg)
* [CHANGE] **BREAKING CHANGE** Remove Opencensus receiver [#6523](https://github.com/grafana/tempo/pull/6523) (@javiermolinar)
* [ENHANCEMENT] Block builder: deduplicate spans within traces during block creation and track removed duplicates via `tempo_block_builder_spans_deduped_total` metric [#6539](https://github.com/grafana/tempo/pull/6539) (@zhxiaogg)
* [CHANGE] Upgrade Tempo to Go 1.26.0 [#6443](https://github.com/grafana/tempo/pull/6443) (@stoewer)
* [CHANGE] Allow duplicate dimensions for span metrics and service graphs. This is a valid use case if using different instrumentation libraries, with spans having "deployment.environment" and others "deployment_environment", for example. [#6288](https://github.com/grafana/tempo/pull/6288) (@carles-grafana)
* [CHANGE] Updade default max duration for traceql metrics queries up to one day [#6285](https://github.com/grafana/tempo/pull/6285) (@javiermolinar)
Expand Down
6 changes: 6 additions & 0 deletions modules/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ var (
Name: "owned_partitions",
Help: "Indicates partition ownership by this block-builder (1 = owned).",
}, []string{"partition", "state"})
metricDedupedSpans = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "tempo",
Subsystem: "block_builder",
Name: "spans_deduped_total",
Help: "Total number of duplicate spans removed during block building.",
}, []string{"tenant"})

tracer = otel.Tracer("modules/blockbuilder")

Expand Down
52 changes: 43 additions & 9 deletions modules/blockbuilder/live_traces_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package blockbuilder
import (
"bytes"
"context"
"errors"
"slices"
"sync"

"github.com/grafana/tempo/pkg/livetraces"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/util"
"github.com/grafana/tempo/tempodb/encoding/common"
)

Expand All @@ -28,12 +30,14 @@ type chEntry struct {
// Tracks the min/max timestamps seen across all traces that can be accessed
// once all traces are iterated (unmarshaled), since this can't be known upfront.
type liveTracesIter struct {
mtx sync.Mutex
liveTraces *livetraces.LiveTraces[[]byte]
ch chan []chEntry
chBuf []chEntry
cancel func()
start, end uint64
mtx sync.Mutex
liveTraces *livetraces.LiveTraces[[]byte]
ch chan []chEntry
chBuf []chEntry
cancel func()
start, end uint64
dedupedSpans uint32
exhausted bool
}

func newLiveTracesIter(liveTraces *livetraces.LiveTraces[[]byte]) *liveTracesIter {
Expand Down Expand Up @@ -88,6 +92,12 @@ func (i *liveTracesIter) iter(ctx context.Context) {
return bytes.Compare(a.id, b.id)
})

// h and buffer are reused across all spans to avoid repeated allocations.
h := util.NewTokenHasher()
buffer := make([]byte, 4)
// seen is reused across traces to avoid repeated allocations.
seen := make(map[uint64]struct{}, 1024)

// Begin sending to channel in chunks to reduce channel overhead.
seq := slices.Chunk(entries, 10)
for entries := range seq {
Expand All @@ -109,19 +119,29 @@ func (i *liveTracesIter) iter(ctx context.Context) {
}
}

// Update block timestamp bounds
for _, b := range tr.ResourceSpans {
for _, ss := range b.ScopeSpans {
// Deduplicate spans and update block timestamp bounds in one pass.
for _, rs := range tr.ResourceSpans {
for _, ss := range rs.ScopeSpans {
unique := ss.Spans[:0]
for _, s := range ss.Spans {
token := util.TokenForID(h, buffer, int32(s.Kind), s.SpanId)
if _, ok := seen[token]; ok {
i.dedupedSpans++
continue
}
seen[token] = struct{}{}
unique = append(unique, s)
if i.start == 0 || s.StartTimeUnixNano < i.start {
i.start = s.StartTimeUnixNano
}
if s.EndTimeUnixNano > i.end {
i.end = s.EndTimeUnixNano
}
}
ss.Spans = unique
}
}
clear(seen)

tempopb.ReuseByteSlices(entry.Batches)
delete(i.liveTraces.Traces, e.hash)
Expand All @@ -139,6 +159,8 @@ func (i *liveTracesIter) iter(ctx context.Context) {
return
}
}

i.exhausted = true
}

// MinMaxTimestamps returns the earliest start, and latest end span timestamps,
Expand All @@ -151,6 +173,18 @@ func (i *liveTracesIter) MinMaxTimestamps() (uint64, uint64) {
return i.start, i.end
}

// DedupedSpans returns the total number of duplicate spans that were removed
// across all traces. Returns an error if the iterator has not been fully exhausted.
func (i *liveTracesIter) DedupedSpans() (uint32, error) {
i.mtx.Lock()
defer i.mtx.Unlock()

if !i.exhausted {
return 0, errors.New("iterator must be exhausted before calling DedupedSpans")
}
return i.dedupedSpans, nil
}

func (i *liveTracesIter) Close() {
i.cancel()
}
Expand Down
70 changes: 70 additions & 0 deletions modules/blockbuilder/live_traces_iter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package blockbuilder

import (
"context"
"testing"

"github.com/grafana/tempo/pkg/livetraces"
"github.com/grafana/tempo/pkg/util/test"
"github.com/stretchr/testify/require"
)

func TestLiveTracesIter_DedupSpans(t *testing.T) {
const spanCount = 5

traceID := generateTraceID(t)
tr := test.MakeTraceWithSpanCount(1, spanCount, traceID)

trBytes, err := tr.Marshal()
require.NoError(t, err)

// Push the same trace bytes twice to simulate replicated writes
lt := livetraces.New(func(b []byte) uint64 { return uint64(len(b)) }, 0, 0, "test-tenant")
lt.Push(traceID, trBytes, 0)
lt.Push(traceID, trBytes, 0)

iter := newLiveTracesIter(lt)
ctx := context.Background()

id, resultTr, err := iter.Next(ctx)
require.NoError(t, err)
require.NotNil(t, id)
require.NotNil(t, resultTr)

// Exhaust the iterator
_, _, err = iter.Next(ctx)
require.NoError(t, err)

// Duplicate push should be fully deduped - only the original spans remain
total := 0
for _, rs := range resultTr.ResourceSpans {
for _, ss := range rs.ScopeSpans {
total += len(ss.Spans)
}
}
require.Equal(t, spanCount, total)
n, err := iter.DedupedSpans()
require.NoError(t, err)
require.Equal(t, uint32(spanCount), n)
}

func TestLiveTracesIter_DedupedSpans_ErrorWhenNotExhausted(t *testing.T) {
// Push 11 traces so iter() produces 2 chunks (chunk size is 10).
// Without reading from the channel the buffer fills after the first chunk,
// causing iter() to block on the second send. Cancelling via Close() makes
// it exit early without setting exhausted, so DedupedSpans returns an error.
lt := livetraces.New(func(b []byte) uint64 { return uint64(len(b)) }, 0, 0, "test-tenant")
for j := 0; j < 11; j++ {
traceID := generateTraceID(t)
tr := test.MakeTraceWithSpanCount(1, 1, traceID)
trBytes, err := tr.Marshal()
require.NoError(t, err)
lt.Push(traceID, trBytes, 0)
}

iter := newLiveTracesIter(lt)
iter.Close() // cancel before exhausting

_, err := iter.DedupedSpans() // blocks until iter() exits, then returns error
require.Error(t, err)
}
6 changes: 6 additions & 0 deletions modules/blockbuilder/tenant_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,12 @@ func (s *tenantStore) Flush(ctx context.Context, r tempodb.Reader, w tempodb.Wri
return err
}

if n, err := iter.DedupedSpans(); err != nil {
level.Error(s.logger).Log("msg", "failed to get deduped spans count", "err", err)
} else if n > 0 {
metricDedupedSpans.WithLabelValues(s.tenantID).Add(float64(n))
}

// Update meta timestamps which couldn't be known until we unmarshaled
// all of the traces.
start, end := iter.MinMaxTimestamps()
Expand Down
27 changes: 4 additions & 23 deletions pkg/model/trace/combine.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package trace

import (
"encoding/binary"
"fmt"
"hash"
"hash/fnv"
"sync"

"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/util"
)

// token is uint64 to reduce hash collision rates. Experimentally, it was observed
Expand All @@ -16,23 +14,6 @@ import (
// results in a dropped span during combine.
type token uint64

func newHash() hash.Hash64 {
return fnv.New64()
}

// tokenForID returns a token for use in a hash map given a span id and span kind
// buffer must be a 4 byte slice and is reused for writing the span kind to the hashing function
// kind is used along with the actual id b/c in zipkin traces span id is not guaranteed to be unique
// as it is shared between client and server spans.
func tokenForID(h hash.Hash64, buffer []byte, kind int32, b []byte) token {
binary.LittleEndian.PutUint32(buffer, uint32(kind))

h.Reset()
_, _ = h.Write(b)
_, _ = h.Write(buffer)
return token(h.Sum64())
}

var ErrTraceTooLarge = fmt.Errorf("trace exceeds max size")

// Combiner combines multiple partial traces into one, deduping spans based on
Expand Down Expand Up @@ -77,7 +58,7 @@ func (c *Combiner) ConsumeWithFinal(tr *tempopb.Trace, final bool) (int, error)
return spanCount, nil
}

h := newHash()
h := util.NewTokenHasher()
buffer := make([]byte, 4)

// First call?
Expand All @@ -97,7 +78,7 @@ func (c *Combiner) ConsumeWithFinal(tr *tempopb.Trace, final bool) (int, error)
for _, b := range c.result.ResourceSpans {
for _, ils := range b.ScopeSpans {
for _, s := range ils.Spans {
c.spans[tokenForID(h, buffer, int32(s.Kind), s.SpanId)] = struct{}{}
c.spans[token(util.TokenForID(h, buffer, int32(s.Kind), s.SpanId))] = struct{}{}
}
}
}
Expand All @@ -117,7 +98,7 @@ func (c *Combiner) ConsumeWithFinal(tr *tempopb.Trace, final bool) (int, error)
notFoundSpans := ils.Spans[:0]
for _, s := range ils.Spans {
// if not already encountered, then keep
token := tokenForID(h, buffer, int32(s.Kind), s.SpanId)
token := token(util.TokenForID(h, buffer, int32(s.Kind), s.SpanId))
_, ok := c.spans[token]
if !ok {
notFoundSpans = append(notFoundSpans, s)
Expand Down
16 changes: 3 additions & 13 deletions pkg/model/trace/combine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"testing"

"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/util"
"github.com/grafana/tempo/pkg/util/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -126,7 +127,7 @@ func TestTokenForIDCollision(t *testing.T) {
// Estimate the hash collision rate of tokenForID.

n := 1_000_000
h := newHash()
h := util.NewTokenHasher()
buf := make([]byte, 4)

tokens := map[token]struct{}{}
Expand All @@ -140,7 +141,7 @@ func TestTokenForIDCollision(t *testing.T) {
cpy := append([]byte(nil), spanID...)
IDs = append(IDs, cpy)

tokens[tokenForID(h, buf, 0, spanID)] = struct{}{}
tokens[token(util.TokenForID(h, buf, 0, spanID))] = struct{}{}
}

// Ensure no duplicate span IDs accidentally generated
Expand All @@ -162,17 +163,6 @@ func TestTokenForIDCollision(t *testing.T) {
require.Equal(t, n, len(tokens))
}

func BenchmarkTokenForID(b *testing.B) {
h := newHash()
id := []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08}
buffer := make([]byte, 4)

b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = tokenForID(h, buffer, 0, id)
}
}

func BenchmarkCombine(b *testing.B) {
parts := []int{2, 3, 4, 8}
requests := 100 // 100K spans per part
Expand Down
20 changes: 20 additions & 0 deletions pkg/util/traceid.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package util

import (
"bytes"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"hash"
"hash/fnv"
"strings"
"unsafe"
)
Expand Down Expand Up @@ -68,6 +71,23 @@ var spanKindFNVHashes = [...]uint64{
0x43869769eb4f75c8, // spare 2
}

// NewTokenHasher returns a new hash.Hash64 for use with TokenForID.
func NewTokenHasher() hash.Hash64 {
return fnv.New64()
}

// TokenForID returns a token for use as a key in a hash map given a span ID and span kind.
// h and buffer (must be at least 4 bytes) are provided by the caller for reuse across calls
// to avoid repeated allocations. Use NewTokenHasher to create h.
// kind is included because in zipkin traces the span id is shared between client and server spans.
func TokenForID(h hash.Hash64, buffer []byte, kind int32, id []byte) uint64 {
binary.LittleEndian.PutUint32(buffer, uint32(kind))
h.Reset()
_, _ = h.Write(id)
_, _ = h.Write(buffer[:4])
return h.Sum64()
}

// SpanIDAndKindToToken converts a span ID into a token for use as key in a hash map. The token is generated such
// that it has a low collision probability. In zipkin traces the span id is not guaranteed to be unique as it
// is shared between client and server spans. Therefore, it is sometimes required to take the span kind into account.
Expand Down
Loading
Loading