Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion modules/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,8 @@ func defaultIngester(t *testing.T, tmpDir string) (*Ingester, []*tempopb.Trace,
},
Block: &encoding.BlockConfig{
IndexDownsampleBytes: 2,
BloomFP: .01,
BloomFP: 0.01,
BloomFilterShardSize: 100_000,
Encoding: backend.EncLZ4_1M,
IndexPageSizeBytes: 1000,
},
Expand Down
3 changes: 2 additions & 1 deletion modules/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,8 @@ func defaultInstance(t require.TestingT, tmpDir string) *instance {
},
Block: &encoding.BlockConfig{
IndexDownsampleBytes: 2,
BloomFP: .01,
BloomFP: 0.01,
BloomFilterShardSize: 100_000,
Encoding: backend.EncLZ4_1M,
IndexPageSizeBytes: 1000,
},
Expand Down
3 changes: 2 additions & 1 deletion modules/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ func TestReturnAllHits(t *testing.T) {
Block: &encoding.BlockConfig{
Encoding: backend.EncNone,
IndexDownsampleBytes: 10,
BloomFP: .05,
BloomFP: 0.01,
BloomFilterShardSize: 100_000,
IndexPageSizeBytes: 1000,
},
WAL: &wal.Config{
Expand Down
4 changes: 3 additions & 1 deletion modules/storage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

cortex_cache "github.com/cortexproject/cortex/pkg/chunk/cache"

"github.com/grafana/tempo/pkg/util"
"github.com/grafana/tempo/tempodb"
"github.com/grafana/tempo/tempodb/backend"
Expand Down Expand Up @@ -37,7 +38,8 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)
cfg.Trace.WAL.Encoding = backend.EncNone

cfg.Trace.Block = &encoding.BlockConfig{}
f.Float64Var(&cfg.Trace.Block.BloomFP, util.PrefixConfig(prefix, "trace.block.bloom-filter-false-positive"), .05, "Bloom False Positive.")
f.Float64Var(&cfg.Trace.Block.BloomFP, util.PrefixConfig(prefix, "trace.block.bloom-filter-false-positive"), .05, "Bloom Filter False Positive.")
f.IntVar(&cfg.Trace.Block.BloomFilterShardSize, util.PrefixConfig(prefix, "trace.block.bloom-filter-shard-size"), 250*1024, "Bloom Filter Shard Size.")
Comment thread
annanay25 marked this conversation as resolved.
Outdated
f.IntVar(&cfg.Trace.Block.IndexDownsampleBytes, util.PrefixConfig(prefix, "trace.block.index-downsample-bytes"), 1024*1024, "Number of bytes (before compression) per index record.")
f.IntVar(&cfg.Trace.Block.IndexPageSizeBytes, util.PrefixConfig(prefix, "trace.block.index-page-size-bytes"), 250*1024, "Number of bytes per index page.")
cfg.Trace.Block.Encoding = backend.EncZstd
Expand Down
1 change: 1 addition & 0 deletions tempodb/backend/block_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type BlockMeta struct {
IndexPageSize uint32 `json:"indexPageSize"` // Size of each index page in bytes
TotalRecords uint32 `json:"totalRecords"` // Total Records stored in the index file
DataEncoding string `json:"dataEncoding"` // DataEncoding is a string provided externally, but tracked by tempodb that indicates the way the bytes are encoded
BloomShardCount uint16 `json:"bloomShards"` // Number of bloom filter shards
}

func NewBlockMeta(tenantID string, blockID uuid.UUID, version string, encoding Encoding, dataEncoding string) *BlockMeta {
Expand Down
27 changes: 27 additions & 0 deletions tempodb/backend/block_meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package backend

import (
"bytes"
"encoding/json"
"math/rand"
"testing"

Expand Down Expand Up @@ -41,3 +42,29 @@ func TestBlockMeta(t *testing.T) {
assert.Equal(t, 1, bytes.Compare(b.MaxID, b.MinID))
assert.Equal(t, 2, b.TotalObjects)
}

func TestBlockMetaParsing(t *testing.T) {
inputJSON := `
{
"format": "v0",
"blockID": "00000000-0000-0000-0000-000000000000",
"minID": "AAAAAAAAAAAAOO0z0LnnHg==",
"maxID": "AAAAAAAAAAD/o61w2bYIDg==",
"tenantID": "single-tenant",
"startTime": "2021-01-01T00:00:00.0000000Z",
"endTime": "2021-01-02T00:00:00.0000000Z",
"totalObjects": 10,
"size": 12345,
"compactionLevel": 0,
"encoding": "zstd",
"indexPageSize": 250000,
"totalRecords": 124356,
"dataEncoding": "",
"bloomShards": 244
}
`

blockMeta := BlockMeta{}
err := json.Unmarshal([]byte(inputJSON), &blockMeta)
assert.NoError(t, err, "expected to be able to unmarshal from JSON")
}
5 changes: 3 additions & 2 deletions tempodb/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ import (
"testing"

"github.com/google/uuid"
"github.com/stretchr/testify/assert"

"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/encoding/common"
"github.com/stretchr/testify/assert"
)

const objectName = "test"
Expand Down Expand Up @@ -107,7 +108,7 @@ func TestCompaction(t *testing.T) {
BlockID: blockID,
}

shardNum := common.GetShardNum()
shardNum := common.ValidateShardCount(int(fakeMeta.BloomShardCount))
fakeBloom := make([][]byte, shardNum)
fakeIndex := make([]byte, 20)
fakeTraces := make([]byte, 200)
Expand Down
1 change: 1 addition & 0 deletions tempodb/compactor_bookmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func TestCurrentClear(t *testing.T) {
Block: &encoding.BlockConfig{
IndexDownsampleBytes: 17,
BloomFP: .01,
BloomFilterShardSize: 100_000,
Encoding: backend.EncGZIP,
IndexPageSizeBytes: 1000,
},
Expand Down
5 changes: 5 additions & 0 deletions tempodb/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func TestCompaction(t *testing.T) {
Block: &encoding.BlockConfig{
IndexDownsampleBytes: 11,
BloomFP: .01,
BloomFilterShardSize: 100_000,
Encoding: backend.EncLZ4_4M,
IndexPageSizeBytes: 1000,
},
Expand Down Expand Up @@ -192,6 +193,7 @@ func TestSameIDCompaction(t *testing.T) {
Block: &encoding.BlockConfig{
IndexDownsampleBytes: 11,
BloomFP: .01,
BloomFilterShardSize: 100_000,
Encoding: backend.EncSnappy,
IndexPageSizeBytes: 1000,
},
Expand Down Expand Up @@ -279,6 +281,7 @@ func TestCompactionUpdatesBlocklist(t *testing.T) {
Block: &encoding.BlockConfig{
IndexDownsampleBytes: 11,
BloomFP: .01,
BloomFilterShardSize: 100_000,
Encoding: backend.EncNone,
IndexPageSizeBytes: 1000,
},
Expand Down Expand Up @@ -345,6 +348,7 @@ func TestCompactionMetrics(t *testing.T) {
Block: &encoding.BlockConfig{
IndexDownsampleBytes: 11,
BloomFP: .01,
BloomFilterShardSize: 100_000,
Encoding: backend.EncNone,
IndexPageSizeBytes: 1000,
},
Expand Down Expand Up @@ -415,6 +419,7 @@ func TestCompactionIteratesThroughTenants(t *testing.T) {
Block: &encoding.BlockConfig{
IndexDownsampleBytes: 11,
BloomFP: .01,
BloomFilterShardSize: 100_000,
Encoding: backend.EncLZ4_64k,
IndexPageSizeBytes: 1000,
},
Expand Down
2 changes: 1 addition & 1 deletion tempodb/encoding/backend_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (b *BackendBlock) Find(ctx context.Context, id common.ID) ([]byte, error) {

span.SetTag("block", b.meta.BlockID.String())

shardKey := common.ShardKeyForTraceID(id)
shardKey := common.ShardKeyForTraceID(id, int(b.meta.BloomShardCount))
blockID := b.meta.BlockID
tenantID := b.meta.TenantID

Expand Down
4 changes: 2 additions & 2 deletions tempodb/encoding/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func bloomName(shard int) string {

// writeBlockMeta writes the bloom filter, meta and index to the passed in backend.Writer
func writeBlockMeta(ctx context.Context, w backend.Writer, meta *backend.BlockMeta, indexBytes []byte, b *common.ShardedBloomFilter) error {
blooms, err := b.WriteTo()
blooms, err := b.Write()
if err != nil {
return err
}
Expand Down Expand Up @@ -81,7 +81,7 @@ func CopyBlock(ctx context.Context, meta *backend.BlockMeta, src backend.Reader,
}

// Bloom
for i := 0; i < common.GetShardNum(); i++ {
for i := 0; i < common.ValidateShardCount(int(meta.BloomShardCount)); i++ {
Comment thread
joe-elliott marked this conversation as resolved.
err = copy(bloomName(i))
if err != nil {
return err
Expand Down
59 changes: 40 additions & 19 deletions tempodb/encoding/common/bloom.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,40 +3,53 @@ package common
import (
"bytes"

"github.com/grafana/tempo/pkg/util"
"github.com/willf/bloom"

"github.com/grafana/tempo/pkg/util"
)

const shardNum = 10
const legacyShardCount = 10

type ShardedBloomFilter struct {
blooms []*bloom.BloomFilter
}

func NewWithEstimates(n uint, fp float64) *ShardedBloomFilter {
b := &ShardedBloomFilter{
blooms: make([]*bloom.BloomFilter, shardNum),
// NewBloom creates a ShardedBloomFilter
func NewBloom(fp float64, shardSize, estimatedObjects uint) *ShardedBloomFilter {
Comment thread
annanay25 marked this conversation as resolved.
// estimate the number of shards needed. an approximate value is enough
var shardCount uint
var kPerBloom uint
for {
Comment thread
annanay25 marked this conversation as resolved.
Outdated
shardCount++
var m, k uint
// m: number of bits in the filter
// k: number of hash functions
if m, k = bloom.EstimateParameters(estimatedObjects/shardCount, fp); m < shardSize {
kPerBloom = k
break
}
}

itemsPerBloom := n / shardNum
if itemsPerBloom == 0 {
itemsPerBloom = 1
b := &ShardedBloomFilter{
blooms: make([]*bloom.BloomFilter, shardCount),
}
for i := 0; i < shardNum; i++ {
b.blooms[i] = bloom.NewWithEstimates(itemsPerBloom, fp)

for i := 0; i < int(shardCount); i++ {
// New(m uint, k uint) creates a new Bloom filter with _m_ bits and _k_ hashing functions
b.blooms[i] = bloom.New(shardSize*8, kPerBloom)
}

return b
}

func (b *ShardedBloomFilter) Add(traceID []byte) {
shardKey := ShardKeyForTraceID(traceID)
shardKey := ShardKeyForTraceID(traceID, len(b.blooms))
b.blooms[shardKey].Add(traceID)
}

// WriteTo is a wrapper around bloom.WriteTo
func (b *ShardedBloomFilter) WriteTo() ([][]byte, error) {
bloomBytes := make([][]byte, shardNum)
// Write is a wrapper around bloom.WriteTo
func (b *ShardedBloomFilter) Write() ([][]byte, error) {
bloomBytes := make([][]byte, len(b.blooms))
for i, f := range b.blooms {
bloomBuffer := &bytes.Buffer{}
_, err := f.WriteTo(bloomBuffer)
Expand All @@ -48,16 +61,24 @@ func (b *ShardedBloomFilter) WriteTo() ([][]byte, error) {
return bloomBytes, nil
}

func ShardKeyForTraceID(traceID []byte) int {
return int(util.TokenForTraceID(traceID)) % shardNum
func (b *ShardedBloomFilter) GetShardCount() int {
return len(b.blooms)
}

// Test implements bloom.Test -> required only for testing
func (b *ShardedBloomFilter) Test(traceID []byte) bool {
shardKey := ShardKeyForTraceID(traceID)
shardKey := ShardKeyForTraceID(traceID, len(b.blooms))
return b.blooms[shardKey].Test(traceID)
}

func GetShardNum() int {
return shardNum
func ShardKeyForTraceID(traceID []byte, shardCount int) int {
return int(util.TokenForTraceID(traceID)) % ValidateShardCount(shardCount)
}

// For backward compatibility
func ValidateShardCount(shardCount int) int {
Comment thread
joe-elliott marked this conversation as resolved.
if shardCount == 0 {
return legacyShardCount
}
return shardCount
}
Loading