Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@
* [ENHANCEMENT] Chore: delete spanlogger. [4312](https://github.com/grafana/tempo/pull/4312) (@javiermolinar)
* [ENHANCEMENT] Add `invalid_utf8` to reasons spanmetrics will discard spans. [#4293](https://github.com/grafana/tempo/pull/4293) (@zalegrala)
* [ENHANCEMENT] Reduce frontend and querier allocations by dropping HTTP headers early in the pipeline. [#4298](https://github.com/grafana/tempo/pull/4298) (@joe-elliott)
* [ENHANCEMENT] Reduce ingester working set by improving prelloc behavior. [#4344](https://github.com/grafana/tempo/pull/4344) (@joe-elliott)
* [ENHANCEMENT] Use Promtheus fast regexp for TraceQL regular expression matchers. [#4329](https://github.com/grafana/tempo/pull/4329) (@joe-elliott)
* [ENHANCEMENT] Reduce ingester working set by improving prelloc behavior. [#4344](https://github.com/grafana/tempo/pull/4344),[#4369](https://github.com/grafana/tempo/pull/4369) (@joe-elliott)
Added tunable prealloc env vars PREALLOC_BKT_SIZE, PREALLOC_NUM_BUCKETS, PREALLOC_MIN_BUCKET and metric tempo_ingester_prealloc_miss_bytes_total to observe and tune prealloc behavior.
* [ENHANCEMENT] Use Prometheus fast regexp for TraceQL regular expression matchers. [#4329](https://github.com/grafana/tempo/pull/4329) (@joe-elliott)
**BREAKING CHANGE** All regular expression matchers will now be fully anchored. `span.foo =~ "bar"` will now be evaluated as `span.foo =~ "^bar$"`
* [BUGFIX] Replace hedged requests roundtrips total with a counter. [#4063](https://github.com/grafana/tempo/pull/4063) [#4078](https://github.com/grafana/tempo/pull/4078) (@galalen)
* [BUGFIX] Metrics generators: Correctly drop from the ring before stopping ingestion to reduce drops during a rollout. [#4101](https://github.com/grafana/tempo/pull/4101) (@joe-elliott)
Expand Down
91 changes: 56 additions & 35 deletions pkg/tempopb/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,79 +11,100 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
)

var metricAllocOutPool = promauto.NewCounter(prometheus.CounterOpts{
Namespace: "tempo",
Name: "ingester_prealloc_miss_bytes_total",
Help: "The total number of alloc'ed bytes that missed the sync pools.",
})
var (
metricMissOver prometheus.Counter
metricMissUnder prometheus.Counter
)

func init() {
metricAllocOutPool := promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "tempo",
Name: "ingester_prealloc_miss_bytes_total",
Help: "The total number of alloc'ed bytes that missed the sync pools.",
}, []string{"direction"})

metricMissOver = metricAllocOutPool.WithLabelValues("over")
metricMissUnder = metricAllocOutPool.WithLabelValues("under")
}

// Pool is a linearly bucketed pool for variably sized byte slices.
type Pool struct {
buckets []sync.Pool
bktSize int
// make is the function used to create an empty slice when none exist yet.
make func(int) []byte
buckets []sync.Pool
bktSize int
minBucket int
}

// New returns a new Pool with size buckets for minSize to maxSize
func New(maxSize, bktSize int, makeFunc func(int) []byte) *Pool {
if maxSize < 1 {
panic("invalid maximum pool size")
func New(minBucket, numBuckets, bktSize int) *Pool {
if minBucket < 0 {
panic("invalid min bucket size")
}
if bktSize < 1 {
panic("invalid factor")
}
if maxSize%bktSize != 0 {
panic("invalid bucket size")
}

bkts := maxSize / bktSize

p := &Pool{
buckets: make([]sync.Pool, bkts),
bktSize: bktSize,
make: makeFunc,
if numBuckets < 1 {
panic("invalid num buckets")
}

return p
return &Pool{
buckets: make([]sync.Pool, numBuckets),
bktSize: bktSize,
minBucket: minBucket,
}
}

// Get returns a new byte slices that fits the given size.
func (p *Pool) Get(sz int) []byte {
if sz < 0 {
sz = 0 // just panic?
panic("requested negative size")
}

// Find the right bucket.
bkt := sz / p.bktSize
bkt := p.bucketFor(sz)

if bkt < 0 {
metricMissUnder.Add(float64(sz))
return make([]byte, 0, sz)
}

if bkt >= len(p.buckets) {
metricAllocOutPool.Add(float64(sz)) // track the number of bytes alloc'ed outside the pool for future tuning
return p.make(sz)
metricMissOver.Add(float64(sz))
return make([]byte, 0, sz)
}

b := p.buckets[bkt].Get()
if b == nil {
sz := (bkt + 1) * p.bktSize
b = p.make(sz)
alignedSz := (bkt+1)*p.bktSize + p.minBucket
b = make([]byte, 0, alignedSz)
}
return b.([]byte)
}

// Put adds a slice to the right bucket in the pool.
func (p *Pool) Put(s []byte) {
func (p *Pool) Put(s []byte) int {
c := cap(s)

if c%p.bktSize != 0 {
return
// valid slice?
if (c-p.minBucket)%p.bktSize != 0 {
return -1
}
bkt := (c / p.bktSize) - 1
bkt := p.bucketFor(c) // -1 puts the slice in the pool below. it will be larger than all requested slices for this bucket
if bkt < 0 {
return
return -1
}
if bkt >= len(p.buckets) {
return
return -1
}

p.buckets[bkt].Put(s) // nolint: staticcheck

return bkt // for testing
}

func (p *Pool) bucketFor(sz int) int {
if sz <= p.minBucket {
return -1
}

return (sz - p.minBucket - 1) / p.bktSize
}
93 changes: 71 additions & 22 deletions pkg/tempopb/pool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,46 +9,54 @@ import (
"github.com/stretchr/testify/require"
)

func makeFunc(size int) []byte {
return make([]byte, 0, size)
}

func TestPool(t *testing.T) {
testPool := New(20, 4, makeFunc)
func TestPoolGet(t *testing.T) {
testPool := New(5, 2, 7)
cases := []struct {
size int
expectedCap int
tooLarge bool
}{
{
size: -5,
expectedCap: 4,
{ // under the smallest pool size, should return an unaligned slice
size: 3,
expectedCap: 3,
},
{
size: 0,
expectedCap: 4,
{ // minBucket is exclusive. 5 is technically an unaligned slice
size: 5,
expectedCap: 5,
},
{
size: 3,
expectedCap: 4,
size: 6,
expectedCap: 12,
},
{
size: 10,
size: 12,
expectedCap: 12,
},
{
size: 23,
expectedCap: 23,
size: 15,
expectedCap: 19,
},
{ // over the largest pool size, should return an unaligned slice
size: 20,
expectedCap: 20,
tooLarge: true,
},
}
for _, c := range cases {
ret := testPool.Get(c.size)
require.Equal(t, c.expectedCap, cap(ret))
testPool.Put(ret)
for i := 0; i < 10; i++ {
ret := testPool.Get(c.size)
require.Equal(t, c.expectedCap, cap(ret))
putBucket := testPool.Put(ret)

if !c.tooLarge {
require.Equal(t, testPool.bucketFor(cap(ret)), putBucket)
}
}
}
}

func TestPoolSlicesAreAlwaysLargeEnough(t *testing.T) {
testPool := New(1025, 5, makeFunc)
testPool := New(100, 200, 5)

for i := 0; i < 10000; i++ {
size := rand.Intn(1000)
Expand All @@ -58,8 +66,49 @@ func TestPoolSlicesAreAlwaysLargeEnough(t *testing.T) {
size = rand.Intn(1000)
ret := testPool.Get(size)

require.True(t, cap(ret) >= size)
require.True(t, cap(ret) >= size, "cap: %d, size: %d", cap(ret), size)

testPool.Put(ret)
}
}

func TestBucketFor(t *testing.T) {
testPool := New(5, 10, 5)
cases := []struct {
size int
expected int
}{
{
size: 0,
expected: -1,
},
{
size: 5,
expected: -1,
},
{
size: 6,
expected: 0,
},
{
size: 10,
expected: 0,
},
{
size: 11,
expected: 1,
},
{
size: 15,
expected: 1,
},
{
size: 16,
expected: 2,
},
}
for _, c := range cases {
ret := testPool.bucketFor(c.size)
require.Equal(t, c.expected, ret, "size: %d", c.size)
}
}
35 changes: 32 additions & 3 deletions pkg/tempopb/prealloc.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,21 @@
package tempopb

import "github.com/grafana/tempo/pkg/tempopb/pool"
import (
"os"
"strconv"

var bytePool = pool.New(100_000, 400, func(size int) []byte { return make([]byte, 0, size) })
"github.com/grafana/tempo/pkg/tempopb/pool"
)

var bytePool *pool.Pool

func init() {
bktSize := intFromEnv("PREALLOC_BKT_SIZE", 400)
numBuckets := intFromEnv("PREALLOC_NUM_BUCKETS", 250)
minBucket := intFromEnv("PREALLOC_MIN_BUCKET", 0)

bytePool = pool.New(minBucket, numBuckets, bktSize)
}

// PreallocBytes is a (repeated bytes slices) which preallocs slices on Unmarshal.
type PreallocBytes struct {
Expand Down Expand Up @@ -35,6 +48,22 @@ func (r *PreallocBytes) Size() (n int) {
// ReuseByteSlices puts the byte slice back into bytePool for reuse.
func ReuseByteSlices(buffs [][]byte) {
for _, b := range buffs {
bytePool.Put(b[:0])
_ = bytePool.Put(b[:0])
}
}

func intFromEnv(env string, defaultValue int) int {
// get the value from the environment
val, ok := os.LookupEnv(env)
if !ok {
return defaultValue
}

// try to parse the value
intVal, err := strconv.Atoi(val)
if err != nil {
panic("failed to parse " + env + " as int")
}

return intVal
}