From d6013841f4c4afbf9fb3b50b9051629727d74d97 Mon Sep 17 00:00:00 2001 From: Annanay Date: Mon, 12 Apr 2021 21:04:16 +0530 Subject: [PATCH 01/17] Add configurable bloom filters Signed-off-by: Annanay --- example/docker-compose/etc/tempo-azure.yaml | 1 - .../docker-compose/etc/tempo-gcs-fake.yaml | 1 - example/docker-compose/etc/tempo-local.yaml | 3 +- .../docker-compose/etc/tempo-s3-minio.yaml | 1 - modules/ingester/ingester_test.go | 9 +- modules/ingester/instance_test.go | 9 +- modules/querier/querier_test.go | 9 +- modules/storage/config.go | 6 +- tempodb/backend/block_meta.go | 1 + tempodb/backend/local/local_test.go | 2 +- tempodb/compactor_bookmark_test.go | 9 +- tempodb/compactor_test.go | 45 ++-- tempodb/encoding/backend_block.go | 2 +- tempodb/encoding/block.go | 4 +- tempodb/encoding/common/bloom.go | 77 +++++-- tempodb/encoding/common/bloom_test.go | 29 ++- tempodb/encoding/config.go | 13 +- tempodb/encoding/streaming_block.go | 3 +- tempodb/encoding/streaming_block_test.go | 7 +- tempodb/retention_test.go | 18 +- tempodb/tempodb_test.go | 194 +++--------------- 21 files changed, 184 insertions(+), 259 deletions(-) diff --git a/example/docker-compose/etc/tempo-azure.yaml b/example/docker-compose/etc/tempo-azure.yaml index f00fa7858a7..776952fdd87 100644 --- a/example/docker-compose/etc/tempo-azure.yaml +++ b/example/docker-compose/etc/tempo-azure.yaml @@ -34,7 +34,6 @@ storage: trace: backend: azure # backend configuration to use block: - bloom_filter_false_positive: .05 # bloom filter false positive rate. lower values create larger filters but fewer false positives index_downsample_bytes: 1000 # number of bytes per index record encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd wal: diff --git a/example/docker-compose/etc/tempo-gcs-fake.yaml b/example/docker-compose/etc/tempo-gcs-fake.yaml index ed81e74c9c2..b7827fdc08c 100644 --- a/example/docker-compose/etc/tempo-gcs-fake.yaml +++ b/example/docker-compose/etc/tempo-gcs-fake.yaml @@ -35,7 +35,6 @@ storage: trace: backend: gcs # backend configuration to use block: - bloom_filter_false_positive: .05 # bloom filter false positive rate. lower values create larger filters but fewer false positives index_downsample_bytes: 1000 # number of bytes per index record encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd wal: diff --git a/example/docker-compose/etc/tempo-local.yaml b/example/docker-compose/etc/tempo-local.yaml index 0843053e85e..b629d1ff0bb 100644 --- a/example/docker-compose/etc/tempo-local.yaml +++ b/example/docker-compose/etc/tempo-local.yaml @@ -34,7 +34,8 @@ storage: trace: backend: local # backend configuration to use block: - bloom_filter_false_positive: .05 # bloom filter false positive rate. lower values create larger filters but fewer false positives + bloom_filter_shard_size: 100_000 # size of a single bloom filter shard in bytes + bloom_filter_shard_count: 10 # number of bloom filter shards index_downsample_bytes: 1000 # number of bytes per index record encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd wal: diff --git a/example/docker-compose/etc/tempo-s3-minio.yaml b/example/docker-compose/etc/tempo-s3-minio.yaml index b1d7a1b914a..eb5b7bb4639 100644 --- a/example/docker-compose/etc/tempo-s3-minio.yaml +++ b/example/docker-compose/etc/tempo-s3-minio.yaml @@ -35,7 +35,6 @@ storage: trace: backend: s3 # backend configuration to use block: - bloom_filter_false_positive: .05 # bloom filter false positive rate. lower values create larger filters but fewer false positives index_downsample_bytes: 1000 # number of bytes per index record encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd wal: diff --git a/modules/ingester/ingester_test.go b/modules/ingester/ingester_test.go index 91f66aa2754..bedf584449e 100644 --- a/modules/ingester/ingester_test.go +++ b/modules/ingester/ingester_test.go @@ -193,10 +193,11 @@ func defaultIngester(t *testing.T, tmpDir string) (*Ingester, []*tempopb.Trace, Path: tmpDir, }, Block: &encoding.BlockConfig{ - IndexDownsampleBytes: 2, - BloomFP: .01, - Encoding: backend.EncLZ4_1M, - IndexPageSizeBytes: 1000, + IndexDownsampleBytes: 2, + BloomFilterShardSize: 100_000, + BloomFilterShardCount: 10, + Encoding: backend.EncLZ4_1M, + IndexPageSizeBytes: 1000, }, WAL: &wal.Config{ Filepath: tmpDir, diff --git a/modules/ingester/instance_test.go b/modules/ingester/instance_test.go index 59942e98f77..e5338cba8e2 100644 --- a/modules/ingester/instance_test.go +++ b/modules/ingester/instance_test.go @@ -476,10 +476,11 @@ func defaultInstance(t require.TestingT, tmpDir string) *instance { Path: tmpDir, }, Block: &encoding.BlockConfig{ - IndexDownsampleBytes: 2, - BloomFP: .01, - Encoding: backend.EncLZ4_1M, - IndexPageSizeBytes: 1000, + IndexDownsampleBytes: 2, + BloomFilterShardSize: 100_000, + BloomFilterShardCount: 10, + Encoding: backend.EncLZ4_1M, + IndexPageSizeBytes: 1000, }, WAL: &wal.Config{ Filepath: tmpDir, diff --git a/modules/querier/querier_test.go b/modules/querier/querier_test.go index 77f3fb7d871..408c25d44f4 100644 --- a/modules/querier/querier_test.go +++ b/modules/querier/querier_test.go @@ -54,10 +54,11 @@ func TestReturnAllHits(t *testing.T) { Path: path.Join(tempDir, "traces"), }, Block: &encoding.BlockConfig{ - Encoding: backend.EncNone, - IndexDownsampleBytes: 10, - BloomFP: .05, - IndexPageSizeBytes: 1000, + Encoding: backend.EncNone, + IndexDownsampleBytes: 10, + BloomFilterShardSize: 100_000, + BloomFilterShardCount: 10, + IndexPageSizeBytes: 1000, }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), diff --git a/modules/storage/config.go b/modules/storage/config.go index 97703b5e316..af8959136d8 100644 --- a/modules/storage/config.go +++ b/modules/storage/config.go @@ -34,8 +34,10 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) cfg.Trace.WAL = &wal.Config{} f.StringVar(&cfg.Trace.WAL.Filepath, util.PrefixConfig(prefix, "trace.wal.path"), "/var/tempo/wal", "Path at which store WAL blocks.") - cfg.Trace.Block = &encoding.BlockConfig{} - f.Float64Var(&cfg.Trace.Block.BloomFP, util.PrefixConfig(prefix, "trace.block.bloom-filter-false-positive"), .05, "Bloom False Positive.") + cfg.Trace.Block = &encoding.BlockConfig{ + BloomFilterShardCount: 10, + BloomFilterShardSize: 100_000, // 100KiB + } f.IntVar(&cfg.Trace.Block.IndexDownsampleBytes, util.PrefixConfig(prefix, "trace.block.index-downsample-bytes"), 1024*1024, "Number of bytes (before compression) per index record.") f.IntVar(&cfg.Trace.Block.IndexPageSizeBytes, util.PrefixConfig(prefix, "trace.block.index-page-size-bytes"), 250*1024, "Number of bytes per index page.") cfg.Trace.Block.Encoding = backend.EncZstd diff --git a/tempodb/backend/block_meta.go b/tempodb/backend/block_meta.go index 36d7a36837a..370119e0bb1 100644 --- a/tempodb/backend/block_meta.go +++ b/tempodb/backend/block_meta.go @@ -27,6 +27,7 @@ type BlockMeta struct { Encoding Encoding `json:"encoding"` IndexPageSize uint32 `json:"indexPageSize"` TotalRecords uint32 `json:"totalRecords"` + BloomShardCount uint8 `json:"bloomShardCount"` } func NewBlockMeta(tenantID string, blockID uuid.UUID, version string, encoding Encoding) *BlockMeta { diff --git a/tempodb/backend/local/local_test.go b/tempodb/backend/local/local_test.go index 956331de5f8..fc4886902eb 100644 --- a/tempodb/backend/local/local_test.go +++ b/tempodb/backend/local/local_test.go @@ -107,7 +107,7 @@ func TestCompaction(t *testing.T) { BlockID: blockID, } - shardNum := common.GetShardNum() + shardNum := common.ValidateShardCount(int(fakeMeta.BloomShardCount)) fakeBloom := make([][]byte, shardNum) fakeIndex := make([]byte, 20) fakeTraces := make([]byte, 200) diff --git a/tempodb/compactor_bookmark_test.go b/tempodb/compactor_bookmark_test.go index 47ce6046ce7..8987c29c21e 100644 --- a/tempodb/compactor_bookmark_test.go +++ b/tempodb/compactor_bookmark_test.go @@ -33,10 +33,11 @@ func TestCurrentClear(t *testing.T) { Path: path.Join(tempDir, "traces"), }, Block: &encoding.BlockConfig{ - IndexDownsampleBytes: 17, - BloomFP: .01, - Encoding: backend.EncGZIP, - IndexPageSizeBytes: 1000, + IndexDownsampleBytes: 17, + BloomFilterShardSize: 100_000, + BloomFilterShardCount: 10, + Encoding: backend.EncGZIP, + IndexPageSizeBytes: 1000, }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), diff --git a/tempodb/compactor_test.go b/tempodb/compactor_test.go index a5dda2cc18a..2ed18775003 100644 --- a/tempodb/compactor_test.go +++ b/tempodb/compactor_test.go @@ -62,10 +62,11 @@ func TestCompaction(t *testing.T) { Path: path.Join(tempDir, "traces"), }, Block: &encoding.BlockConfig{ - IndexDownsampleBytes: 11, - BloomFP: .01, - Encoding: backend.EncLZ4_4M, - IndexPageSizeBytes: 1000, + IndexDownsampleBytes: 11, + BloomFilterShardSize: 100_000, + BloomFilterShardCount: 10, + Encoding: backend.EncLZ4_4M, + IndexPageSizeBytes: 1000, }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), @@ -190,10 +191,11 @@ func TestSameIDCompaction(t *testing.T) { Path: path.Join(tempDir, "traces"), }, Block: &encoding.BlockConfig{ - IndexDownsampleBytes: 11, - BloomFP: .01, - Encoding: backend.EncSnappy, - IndexPageSizeBytes: 1000, + IndexDownsampleBytes: 11, + BloomFilterShardSize: 100_000, + BloomFilterShardCount: 10, + Encoding: backend.EncSnappy, + IndexPageSizeBytes: 1000, }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), @@ -277,10 +279,11 @@ func TestCompactionUpdatesBlocklist(t *testing.T) { Path: path.Join(tempDir, "traces"), }, Block: &encoding.BlockConfig{ - IndexDownsampleBytes: 11, - BloomFP: .01, - Encoding: backend.EncNone, - IndexPageSizeBytes: 1000, + IndexDownsampleBytes: 11, + BloomFilterShardSize: 100_000, + BloomFilterShardCount: 10, + Encoding: backend.EncNone, + IndexPageSizeBytes: 1000, }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), @@ -343,10 +346,11 @@ func TestCompactionMetrics(t *testing.T) { Path: path.Join(tempDir, "traces"), }, Block: &encoding.BlockConfig{ - IndexDownsampleBytes: 11, - BloomFP: .01, - Encoding: backend.EncNone, - IndexPageSizeBytes: 1000, + IndexDownsampleBytes: 11, + BloomFilterShardSize: 100_000, + BloomFilterShardCount: 10, + Encoding: backend.EncNone, + IndexPageSizeBytes: 1000, }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), @@ -413,10 +417,11 @@ func TestCompactionIteratesThroughTenants(t *testing.T) { Path: path.Join(tempDir, "traces"), }, Block: &encoding.BlockConfig{ - IndexDownsampleBytes: 11, - BloomFP: .01, - Encoding: backend.EncLZ4_64k, - IndexPageSizeBytes: 1000, + IndexDownsampleBytes: 11, + BloomFilterShardSize: 100_000, + BloomFilterShardCount: 10, + Encoding: backend.EncLZ4_64k, + IndexPageSizeBytes: 1000, }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), diff --git a/tempodb/encoding/backend_block.go b/tempodb/encoding/backend_block.go index d61aa3cd27a..7bc03af0c53 100644 --- a/tempodb/encoding/backend_block.go +++ b/tempodb/encoding/backend_block.go @@ -56,7 +56,7 @@ func (b *BackendBlock) Find(ctx context.Context, id common.ID) ([]byte, error) { span.SetTag("block", b.meta.BlockID.String()) - shardKey := common.ShardKeyForTraceID(id) + shardKey := common.ShardKeyForTraceID(id, int(b.meta.BloomShardCount)) blockID := b.meta.BlockID tenantID := b.meta.TenantID diff --git a/tempodb/encoding/block.go b/tempodb/encoding/block.go index e56a4a1acb9..b0d2d5daba0 100644 --- a/tempodb/encoding/block.go +++ b/tempodb/encoding/block.go @@ -26,7 +26,7 @@ func bloomName(shard int) string { // writeBlockMeta writes the bloom filter, meta and index to the passed in backend.Writer func writeBlockMeta(ctx context.Context, w backend.Writer, meta *backend.BlockMeta, indexBytes []byte, b *common.ShardedBloomFilter) error { - blooms, err := b.WriteTo() + blooms, err := b.Write() if err != nil { return err } @@ -81,7 +81,7 @@ func CopyBlock(ctx context.Context, meta *backend.BlockMeta, src backend.Reader, } // Bloom - for i := 0; i < common.GetShardNum(); i++ { + for i := 0; i < common.ValidateShardCount(int(meta.BloomShardCount)); i++ { err = copy(bloomName(i)) if err != nil { return err diff --git a/tempodb/encoding/common/bloom.go b/tempodb/encoding/common/bloom.go index d7bea5c0570..c556a7ff0bf 100644 --- a/tempodb/encoding/common/bloom.go +++ b/tempodb/encoding/common/bloom.go @@ -2,41 +2,74 @@ package common import ( "bytes" + "math" - "github.com/grafana/tempo/pkg/util" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/willf/bloom" + + "github.com/grafana/tempo/pkg/util" ) -const shardNum = 10 +const legacyShardCount = 10 + +var ( + metricBloomFP = promauto.NewHistogram(prometheus.HistogramOpts{ + Namespace: "tempo", + Name: "bloom_filter_false_positive", + Help: "False positive values for bloom filters created", + // 0.005, 0.020, 0.080, 0.32 + Buckets: prometheus.ExponentialBuckets(0.005, 4, 4), + }) +) type ShardedBloomFilter struct { blooms []*bloom.BloomFilter } -func NewWithEstimates(n uint, fp float64) *ShardedBloomFilter { - b := &ShardedBloomFilter{ - blooms: make([]*bloom.BloomFilter, shardNum), - } +func evaluateK(shardSizeInBits, itemsPerBloom int) (k int) { + // Per https://llimllib.github.io/bloomfilter-tutorial/ under "How many hash functions should I use?" + // the optimal value of k: (m/n)ln(2) + // m: number of bits in the filter + // n: estimated number of objects + // k: number of hash functions + k = int(math.Ceil((float64(shardSizeInBits) / float64(itemsPerBloom)) * (math.Ln2))) - itemsPerBloom := n / shardNum + return +} + +// NewBloom creates a ShardedBloomFilter +func NewBloom(shardSize, shardCount, estimatedObjects int) *ShardedBloomFilter { + itemsPerBloom := estimatedObjects / shardCount if itemsPerBloom == 0 { itemsPerBloom = 1 } - for i := 0; i < shardNum; i++ { - b.blooms[i] = bloom.NewWithEstimates(itemsPerBloom, fp) + + shardSizeInBits := 8 * shardSize + k := evaluateK(shardSizeInBits, itemsPerBloom) + + b := &ShardedBloomFilter{ + blooms: make([]*bloom.BloomFilter, shardCount), + } + for i := 0; i < shardCount; i++ { + // New(m uint, k uint) creates a new Bloom filter with _m_ bits and _k_ hashing functions + b.blooms[i] = bloom.New(uint(shardSizeInBits), uint(k)) } + // metric the false positive rate so we can track if we're making bad blooms + metricBloomFP.Observe(b.blooms[0].EstimateFalsePositiveRate(uint(itemsPerBloom))) + return b } func (b *ShardedBloomFilter) Add(traceID []byte) { - shardKey := ShardKeyForTraceID(traceID) + shardKey := ShardKeyForTraceID(traceID, len(b.blooms)) b.blooms[shardKey].Add(traceID) } -// WriteTo is a wrapper around bloom.WriteTo -func (b *ShardedBloomFilter) WriteTo() ([][]byte, error) { - bloomBytes := make([][]byte, shardNum) +// Write is a wrapper around bloom.WriteTo +func (b *ShardedBloomFilter) Write() ([][]byte, error) { + bloomBytes := make([][]byte, len(b.blooms)) for i, f := range b.blooms { bloomBuffer := &bytes.Buffer{} _, err := f.WriteTo(bloomBuffer) @@ -48,16 +81,20 @@ func (b *ShardedBloomFilter) WriteTo() ([][]byte, error) { return bloomBytes, nil } -func ShardKeyForTraceID(traceID []byte) int { - return int(util.TokenForTraceID(traceID)) % shardNum -} - // Test implements bloom.Test -> required only for testing func (b *ShardedBloomFilter) Test(traceID []byte) bool { - shardKey := ShardKeyForTraceID(traceID) + shardKey := ShardKeyForTraceID(traceID, len(b.blooms)) return b.blooms[shardKey].Test(traceID) } -func GetShardNum() int { - return shardNum +func ShardKeyForTraceID(traceID []byte, shardCount int) int { + return int(util.TokenForTraceID(traceID)) % ValidateShardCount(shardCount) +} + +// For backward compatibility +func ValidateShardCount(shardCount int) int { + if shardCount == 0 { + return legacyShardCount + } + return shardCount } diff --git a/tempodb/encoding/common/bloom_test.go b/tempodb/encoding/common/bloom_test.go index de692237f84..209dde423f3 100644 --- a/tempodb/encoding/common/bloom_test.go +++ b/tempodb/encoding/common/bloom_test.go @@ -2,6 +2,7 @@ package common import ( "bytes" + "math" "math/rand" "testing" @@ -22,8 +23,10 @@ func TestShardedBloom(t *testing.T) { } // create sharded bloom filter - const bloomFP = .01 - b := NewWithEstimates(uint(numTraces), bloomFP) + shardSize := 1000 + shardCount := 5 + estimatedObjects := 1000 + b := NewBloom(shardSize, shardCount, estimatedObjects) // add traceIDs to sharded bloom filter for _, traceID := range traceIDs { @@ -31,18 +34,22 @@ func TestShardedBloom(t *testing.T) { } // get byte representation - bloomBytes, err := b.WriteTo() + bloomBytes, err := b.Write() assert.NoError(t, err) - assert.Len(t, bloomBytes, shardNum) + assert.Len(t, bloomBytes, shardCount) // parse byte representation into willf_bloom.Bloomfilter var filters []*willf_bloom.BloomFilter - for i := 0; i < shardNum; i++ { + for i := 0; i < shardCount; i++ { filters = append(filters, &willf_bloom.BloomFilter{}) } for i, singleBloom := range bloomBytes { _, err = filters[i].ReadFrom(bytes.NewReader(singleBloom)) assert.NoError(t, err) + + // assert that parsed form has the expected _m_ and _k_ + assert.Equal(t, filters[i].Cap(), uint(shardSize*8)) // * 8 because need bits from bytes + assert.Equal(t, filters[i].K(), uint(evaluateK(shardSize*8, estimatedObjects/shardCount))) // * 8 because need bits from bytes } // confirm that the sharded bloom and parsed form give the same result @@ -52,9 +59,15 @@ func TestShardedBloom(t *testing.T) { if !found { missingCount++ } - assert.Equal(t, found, filters[ShardKeyForTraceID(traceID)].Test(traceID)) + assert.Equal(t, found, filters[ShardKeyForTraceID(traceID, shardCount)].Test(traceID)) } - // check that missingCount is less than bloomFP - assert.LessOrEqual(t, float64(missingCount), bloomFP*numTraces) + // get estimated bloom filter false positive + estimatedBloomFP := filters[0].EstimateFalsePositiveRate(uint(numTraces / shardCount)) + // check that missingCount is less than estimatedBloomFP + assert.LessOrEqual(t, float64(missingCount), estimatedBloomFP*numTraces) +} + +func TestEvaluateK(t *testing.T) { + assert.Equal(t, int(math.Ceil(math.Ln2*float64(100))), evaluateK(1000, 10)) } diff --git a/tempodb/encoding/config.go b/tempodb/encoding/config.go index 13dea780ff0..9d28611b71c 100644 --- a/tempodb/encoding/config.go +++ b/tempodb/encoding/config.go @@ -8,10 +8,11 @@ import ( // BlockConfig holds configuration options for newly created blocks type BlockConfig struct { - IndexDownsampleBytes int `yaml:"index_downsample_bytes"` - IndexPageSizeBytes int `yaml:"index_page_size_bytes"` - BloomFP float64 `yaml:"bloom_filter_false_positive"` - Encoding backend.Encoding `yaml:"encoding"` + IndexDownsampleBytes int `yaml:"index_downsample_bytes"` + IndexPageSizeBytes int `yaml:"index_page_size_bytes"` + BloomFilterShardSize int `yaml:"bloom_filter_shard_size"` + BloomFilterShardCount uint8 `yaml:"bloom_filter_shard_count"` + Encoding backend.Encoding `yaml:"encoding"` } // ValidateConfig returns true if the config is valid @@ -24,8 +25,8 @@ func ValidateConfig(b *BlockConfig) error { return fmt.Errorf("Positive index page size required") } - if b.BloomFP <= 0.0 { - return fmt.Errorf("invalid bloom filter fp rate %v", b.BloomFP) + if b.BloomFilterShardSize <= 0 || b.BloomFilterShardCount <= 0 { + return fmt.Errorf("Positive value required for bloom-filter shard size and shard count") } return nil diff --git a/tempodb/encoding/streaming_block.go b/tempodb/encoding/streaming_block.go index 10e91c28b40..4dd41c022ac 100644 --- a/tempodb/encoding/streaming_block.go +++ b/tempodb/encoding/streaming_block.go @@ -34,7 +34,7 @@ func NewStreamingBlock(cfg *BlockConfig, id uuid.UUID, tenantID string, metas [] c := &StreamingBlock{ encoding: latestEncoding(), compactedMeta: backend.NewBlockMeta(tenantID, id, currentVersion, cfg.Encoding), - bloom: common.NewWithEstimates(uint(estimatedObjects), cfg.BloomFP), + bloom: common.NewBloom(cfg.BloomFilterShardSize, int(cfg.BloomFilterShardCount), estimatedObjects), inMetas: metas, cfg: cfg, } @@ -125,6 +125,7 @@ func (c *StreamingBlock) Complete(ctx context.Context, tracker backend.AppendTra meta.TotalRecords = uint32(len(records)) // casting meta.IndexPageSize = uint32(c.cfg.IndexPageSizeBytes) + meta.BloomShardCount = c.cfg.BloomFilterShardCount return bytesFlushed, writeBlockMeta(ctx, w, meta, indexBytes, c.bloom) } diff --git a/tempodb/encoding/streaming_block_test.go b/tempodb/encoding/streaming_block_test.go index 174753a55e1..28370bebe21 100644 --- a/tempodb/encoding/streaming_block_test.go +++ b/tempodb/encoding/streaming_block_test.go @@ -38,9 +38,10 @@ func TestCompactorBlockAddObject(t *testing.T) { numObjects := (rand.Int() % 20) + 1 cb, err := NewStreamingBlock(&BlockConfig{ - BloomFP: .01, - IndexDownsampleBytes: indexDownsample, - Encoding: backend.EncGZIP, + BloomFilterShardSize: 100_000, + BloomFilterShardCount: 10, + IndexDownsampleBytes: indexDownsample, + Encoding: backend.EncGZIP, }, uuid.New(), testTenantID, metas, numObjects) assert.NoError(t, err) diff --git a/tempodb/retention_test.go b/tempodb/retention_test.go index e3fb1c60117..c9b1656f6a5 100644 --- a/tempodb/retention_test.go +++ b/tempodb/retention_test.go @@ -28,10 +28,11 @@ func TestRetention(t *testing.T) { Path: path.Join(tempDir, "traces"), }, Block: &encoding.BlockConfig{ - IndexDownsampleBytes: 17, - BloomFP: .01, - Encoding: backend.EncLZ4_256k, - IndexPageSizeBytes: 1000, + IndexDownsampleBytes: 17, + BloomFilterShardSize: 100_000, + BloomFilterShardCount: 10, + Encoding: backend.EncLZ4_256k, + IndexPageSizeBytes: 1000, }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), @@ -83,10 +84,11 @@ func TestBlockRetentionOverride(t *testing.T) { Path: path.Join(tempDir, "traces"), }, Block: &encoding.BlockConfig{ - IndexDownsampleBytes: 17, - BloomFP: .01, - Encoding: backend.EncLZ4_256k, - IndexPageSizeBytes: 1000, + IndexDownsampleBytes: 17, + BloomFilterShardSize: 100_000, + BloomFilterShardCount: 10, + Encoding: backend.EncLZ4_256k, + IndexPageSizeBytes: 1000, }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), diff --git a/tempodb/tempodb_test.go b/tempodb/tempodb_test.go index 2c6bb52f602..636d9b52240 100644 --- a/tempodb/tempodb_test.go +++ b/tempodb/tempodb_test.go @@ -27,29 +27,38 @@ import ( const ( testTenantID = "fake" testTenantID2 = "fake2" + tmpdir = "/tmp" ) -func TestDB(t *testing.T) { - tempDir, err := ioutil.TempDir("/tmp", "") +func testConfig(enc backend.Encoding, blocklistPoll time.Duration) (Reader, Writer, Compactor, error) { + tempDir, err := ioutil.TempDir(tmpdir, "") defer os.RemoveAll(tempDir) - assert.NoError(t, err, "unexpected error creating temp dir") - r, w, c, err := New(&Config{ + if err != nil { + return nil, nil, nil, err + } + + return New(&Config{ Backend: "local", Local: &local.Config{ Path: path.Join(tempDir, "traces"), }, Block: &encoding.BlockConfig{ - IndexDownsampleBytes: 17, - BloomFP: .01, - Encoding: backend.EncGZIP, - IndexPageSizeBytes: 1000, + IndexDownsampleBytes: 17, + BloomFilterShardSize: 100_000, + BloomFilterShardCount: 10, + Encoding: enc, + IndexPageSizeBytes: 1000, }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), }, - BlocklistPoll: 0, + BlocklistPoll: blocklistPoll, }, log.NewNopLogger()) +} + +func TestDB(t *testing.T) { + r, w, c, err := testConfig(backend.EncGZIP, 0) assert.NoError(t, err) c.EnableCompaction(&CompactorConfig{ @@ -106,27 +115,7 @@ func TestBlockSharding(t *testing.T) { // push a req with some traceID // cut headblock & write to backend // search with different shards and check if its respecting the params - - tempDir, err := ioutil.TempDir("/tmp", "") - defer os.RemoveAll(tempDir) - assert.NoError(t, err, "unexpected error creating temp dir") - - r, w, _, err := New(&Config{ - Backend: "local", - Local: &local.Config{ - Path: path.Join(tempDir, "traces"), - }, - Block: &encoding.BlockConfig{ - IndexDownsampleBytes: 17, - BloomFP: .01, - Encoding: backend.EncLZ4_256k, - IndexPageSizeBytes: 1000, - }, - WAL: &wal.Config{ - Filepath: path.Join(tempDir, "wal"), - }, - BlocklistPoll: 0, - }, log.NewNopLogger()) + r, w, _, err := testConfig(backend.EncLZ4_256k, 0) assert.NoError(t, err) // create block with known ID @@ -180,26 +169,7 @@ func TestBlockSharding(t *testing.T) { } func TestNilOnUnknownTenantID(t *testing.T) { - tempDir, err := ioutil.TempDir("/tmp", "") - defer os.RemoveAll(tempDir) - assert.NoError(t, err, "unexpected error creating temp dir") - - r, _, _, err := New(&Config{ - Backend: "local", - Local: &local.Config{ - Path: path.Join(tempDir, "traces"), - }, - Block: &encoding.BlockConfig{ - IndexDownsampleBytes: 17, - BloomFP: .01, - Encoding: backend.EncLZ4_256k, - IndexPageSizeBytes: 1000, - }, - WAL: &wal.Config{ - Filepath: path.Join(tempDir, "wal"), - }, - BlocklistPoll: 0, - }, log.NewNopLogger()) + r, _, _, err := testConfig(backend.EncLZ4_256k, 0) assert.NoError(t, err) buff, err := r.Find(context.Background(), "unknown", []byte{0x01}, BlockIDMin, BlockIDMax) @@ -208,26 +178,7 @@ func TestNilOnUnknownTenantID(t *testing.T) { } func TestBlockCleanup(t *testing.T) { - tempDir, err := ioutil.TempDir("/tmp", "") - defer os.RemoveAll(tempDir) - assert.NoError(t, err, "unexpected error creating temp dir") - - r, w, c, err := New(&Config{ - Backend: "local", - Local: &local.Config{ - Path: path.Join(tempDir, "traces"), - }, - Block: &encoding.BlockConfig{ - IndexDownsampleBytes: 17, - BloomFP: .01, - Encoding: backend.EncLZ4_256k, - IndexPageSizeBytes: 1000, - }, - WAL: &wal.Config{ - Filepath: path.Join(tempDir, "wal"), - }, - BlocklistPoll: 0, - }, log.NewNopLogger()) + r, w, c, err := testConfig(backend.EncLZ4_256k, 0) assert.NoError(t, err) c.EnableCompaction(&CompactorConfig{ @@ -255,7 +206,7 @@ func TestBlockCleanup(t *testing.T) { assert.Len(t, rw.blockLists[testTenantID], 1) - os.RemoveAll(tempDir + "/traces/" + testTenantID) + os.RemoveAll(tmpdir + "/traces/" + testTenantID) // poll rw.pollBlocklist() @@ -292,22 +243,7 @@ func TestCleanMissingTenants(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - r, _, _, err := New(&Config{ - Backend: "local", - Local: &local.Config{ - Path: path.Join("/tmp", "traces"), - }, - Block: &encoding.BlockConfig{ - IndexDownsampleBytes: 17, - BloomFP: .01, - Encoding: backend.EncLZ4_256k, - IndexPageSizeBytes: 1000, - }, - WAL: &wal.Config{ - Filepath: path.Join("/tmp", "wal"), - }, - BlocklistPoll: 0, - }, log.NewNopLogger()) + r, _, _, err := testConfig(backend.EncLZ4_256k, 0) assert.NoError(t, err) rw := r.(*readerWriter) @@ -349,26 +285,7 @@ func checkBlocklists(t *testing.T, expectedID uuid.UUID, expectedB int, expected } func TestUpdateBlocklist(t *testing.T) { - tempDir, err := ioutil.TempDir("/tmp", "") - defer os.RemoveAll(tempDir) - assert.NoError(t, err, "unexpected error creating temp dir") - - r, _, _, err := New(&Config{ - Backend: "local", - Local: &local.Config{ - Path: path.Join(tempDir, "traces"), - }, - Block: &encoding.BlockConfig{ - IndexDownsampleBytes: 17, - BloomFP: .01, - Encoding: backend.EncLZ4_256k, - IndexPageSizeBytes: 1000, - }, - WAL: &wal.Config{ - Filepath: path.Join(tempDir, "wal"), - }, - BlocklistPoll: 0, - }, log.NewNopLogger()) + r, _, _, err := testConfig(backend.EncLZ4_256k, 0) assert.NoError(t, err) rw := r.(*readerWriter) @@ -537,26 +454,7 @@ func TestUpdateBlocklist(t *testing.T) { } func TestUpdateBlocklistCompacted(t *testing.T) { - tempDir, err := ioutil.TempDir("/tmp", "") - defer os.RemoveAll(tempDir) - assert.NoError(t, err, "unexpected error creating temp dir") - - r, _, _, err := New(&Config{ - Backend: "local", - Local: &local.Config{ - Path: path.Join(tempDir, "traces"), - }, - Block: &encoding.BlockConfig{ - IndexDownsampleBytes: 17, - BloomFP: .01, - Encoding: backend.EncLZ4_256k, - IndexPageSizeBytes: 1000, - }, - WAL: &wal.Config{ - Filepath: path.Join(tempDir, "wal"), - }, - BlocklistPoll: 0, - }, log.NewNopLogger()) + r, _, _, err := testConfig(backend.EncLZ4_256k, 0) assert.NoError(t, err) rw := r.(*readerWriter) @@ -859,26 +757,7 @@ func TestIncludeCompactedBlock(t *testing.T) { } func TestSearchCompactedBlocks(t *testing.T) { - tempDir, err := ioutil.TempDir("/tmp", "") - defer os.RemoveAll(tempDir) - assert.NoError(t, err, "unexpected error creating temp dir") - - r, w, c, err := New(&Config{ - Backend: "local", - Local: &local.Config{ - Path: path.Join(tempDir, "traces"), - }, - Block: &encoding.BlockConfig{ - IndexDownsampleBytes: 17, - BloomFP: .01, - Encoding: backend.EncLZ4_256k, - IndexPageSizeBytes: 1000, - }, - WAL: &wal.Config{ - Filepath: path.Join(tempDir, "wal"), - }, - BlocklistPoll: time.Minute, - }, log.NewNopLogger()) + r, w, c, err := testConfig(backend.EncLZ4_256k, time.Minute) assert.NoError(t, err) c.EnableCompaction(&CompactorConfig{ @@ -965,26 +844,7 @@ func TestSearchCompactedBlocks(t *testing.T) { } func TestCompleteBlock(t *testing.T) { - tempDir, err := ioutil.TempDir("/tmp", "") - defer os.RemoveAll(tempDir) - assert.NoError(t, err, "unexpected error creating temp dir") - - _, w, _, err := New(&Config{ - Backend: "local", - Local: &local.Config{ - Path: path.Join(tempDir, "traces"), - }, - Block: &encoding.BlockConfig{ - IndexDownsampleBytes: 17, - BloomFP: .01, - Encoding: backend.EncLZ4_256k, - IndexPageSizeBytes: 1000, - }, - WAL: &wal.Config{ - Filepath: path.Join(tempDir, "wal"), - }, - BlocklistPoll: time.Minute, - }, log.NewNopLogger()) + _, w, _, err := testConfig(backend.EncLZ4_256k, time.Minute) assert.NoError(t, err) wal := w.WAL() From bfd728c8dc276c333bf1634479478aac76227f9a Mon Sep 17 00:00:00 2001 From: Annanay Date: Fri, 14 May 2021 21:05:38 +0530 Subject: [PATCH 02/17] Create filter based on size and fp Signed-off-by: Annanay --- example/docker-compose/etc/tempo-azure.yaml | 1 + .../docker-compose/etc/tempo-gcs-fake.yaml | 1 + example/docker-compose/etc/tempo-local.yaml | 3 +- .../docker-compose/etc/tempo-s3-minio.yaml | 1 + modules/storage/config.go | 8 +- tempodb/backend/block_meta.go | 2 +- tempodb/compactor_bookmark_test.go | 10 +- tempodb/compactor_test.go | 140 +----------------- tempodb/encoding/common/bloom.go | 42 +++--- tempodb/encoding/common/bloom_test.go | 78 +++++++--- tempodb/encoding/config.go | 18 ++- tempodb/encoding/streaming_block.go | 10 +- tempodb/tempodb_test.go | 16 +- 13 files changed, 119 insertions(+), 211 deletions(-) diff --git a/example/docker-compose/etc/tempo-azure.yaml b/example/docker-compose/etc/tempo-azure.yaml index 82b8224bb09..c936203b0e1 100644 --- a/example/docker-compose/etc/tempo-azure.yaml +++ b/example/docker-compose/etc/tempo-azure.yaml @@ -32,6 +32,7 @@ storage: trace: backend: azure # backend configuration to use block: + bloom_filter_false_positive: .05 # bloom filter false positive rate. lower values create larger filters but fewer false positives index_downsample_bytes: 1000 # number of bytes per index record encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd wal: diff --git a/example/docker-compose/etc/tempo-gcs-fake.yaml b/example/docker-compose/etc/tempo-gcs-fake.yaml index 01f20672913..c58eb9340c0 100644 --- a/example/docker-compose/etc/tempo-gcs-fake.yaml +++ b/example/docker-compose/etc/tempo-gcs-fake.yaml @@ -33,6 +33,7 @@ storage: trace: backend: gcs # backend configuration to use block: + bloom_filter_false_positive: .05 # bloom filter false positive rate. lower values create larger filters but fewer false positives index_downsample_bytes: 1000 # number of bytes per index record encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd wal: diff --git a/example/docker-compose/etc/tempo-local.yaml b/example/docker-compose/etc/tempo-local.yaml index 95f1d701025..da35442412e 100644 --- a/example/docker-compose/etc/tempo-local.yaml +++ b/example/docker-compose/etc/tempo-local.yaml @@ -32,8 +32,7 @@ storage: trace: backend: local # backend configuration to use block: - bloom_filter_shard_size: 100_000 # size of a single bloom filter shard in bytes - bloom_filter_shard_count: 10 # number of bloom filter shards + bloom_filter_false_positive: .05 # bloom filter false positive rate. lower values create larger filters but fewer false positives index_downsample_bytes: 1000 # number of bytes per index record encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd wal: diff --git a/example/docker-compose/etc/tempo-s3-minio.yaml b/example/docker-compose/etc/tempo-s3-minio.yaml index f64b0aa1b8a..ee2b8563a27 100644 --- a/example/docker-compose/etc/tempo-s3-minio.yaml +++ b/example/docker-compose/etc/tempo-s3-minio.yaml @@ -33,6 +33,7 @@ storage: trace: backend: s3 # backend configuration to use block: + bloom_filter_false_positive: .05 # bloom filter false positive rate. lower values create larger filters but fewer false positives index_downsample_bytes: 1000 # number of bytes per index record encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd wal: diff --git a/modules/storage/config.go b/modules/storage/config.go index 7ef0fda1d82..4f5fdc70015 100644 --- a/modules/storage/config.go +++ b/modules/storage/config.go @@ -5,6 +5,7 @@ import ( "time" cortex_cache "github.com/cortexproject/cortex/pkg/chunk/cache" + "github.com/grafana/tempo/pkg/util" "github.com/grafana/tempo/tempodb" "github.com/grafana/tempo/tempodb/backend" @@ -36,10 +37,9 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) f.StringVar(&cfg.Trace.WAL.Filepath, util.PrefixConfig(prefix, "trace.wal.path"), "/var/tempo/wal", "Path at which store WAL blocks.") cfg.Trace.WAL.Encoding = backend.EncNone - cfg.Trace.Block = &encoding.BlockConfig{ - BloomFilterShardCount: 10, - BloomFilterShardSize: 100_000, // 100KiB - } + cfg.Trace.Block = &encoding.BlockConfig{} + f.Float64Var(&cfg.Trace.Block.BloomFP, util.PrefixConfig(prefix, "trace.block.bloom-filter-false-positive"), .05, "Bloom Filter False Positive.") + f.IntVar(&cfg.Trace.Block.BloomFilterShardSize, util.PrefixConfig(prefix, "trace.block.bloom-filter-shard-size"), 250*1024, "Bloom Filter Shard Size.") f.IntVar(&cfg.Trace.Block.IndexDownsampleBytes, util.PrefixConfig(prefix, "trace.block.index-downsample-bytes"), 1024*1024, "Number of bytes (before compression) per index record.") f.IntVar(&cfg.Trace.Block.IndexPageSizeBytes, util.PrefixConfig(prefix, "trace.block.index-page-size-bytes"), 250*1024, "Number of bytes per index page.") cfg.Trace.Block.Encoding = backend.EncZstd diff --git a/tempodb/backend/block_meta.go b/tempodb/backend/block_meta.go index 5ee3fd1d231..08076660305 100644 --- a/tempodb/backend/block_meta.go +++ b/tempodb/backend/block_meta.go @@ -28,7 +28,7 @@ type BlockMeta struct { IndexPageSize uint32 `json:"indexPageSize"` // Size of each index page in bytes TotalRecords uint32 `json:"totalRecords"` // Total Records stored in the index file DataEncoding string `json:"dataEncoding"` // DataEncoding is a string provided externally, but tracked by tempodb that indicates the way the bytes are encoded - BloomShardCount uint8 `json:"bloomShardCount"` // + BloomShardCount uint8 `json:"bloomShardCount"` // Number of bloom filter shards } func NewBlockMeta(tenantID string, blockID uuid.UUID, version string, encoding Encoding, dataEncoding string) *BlockMeta { diff --git a/tempodb/compactor_bookmark_test.go b/tempodb/compactor_bookmark_test.go index 1f09f2405c3..0ded50230ef 100644 --- a/tempodb/compactor_bookmark_test.go +++ b/tempodb/compactor_bookmark_test.go @@ -33,11 +33,11 @@ func TestCurrentClear(t *testing.T) { Path: path.Join(tempDir, "traces"), }, Block: &encoding.BlockConfig{ - IndexDownsampleBytes: 17, - BloomFilterShardSize: 100_000, - BloomFilterShardCount: 10, - Encoding: backend.EncGZIP, - IndexPageSizeBytes: 1000, + IndexDownsampleBytes: 17, + BloomFP: .01, + BloomFilterShardSize: 100_000, + Encoding: backend.EncGZIP, + IndexPageSizeBytes: 1000, }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), diff --git a/tempodb/compactor_test.go b/tempodb/compactor_test.go index b1047486fc4..edf34f2f85d 100644 --- a/tempodb/compactor_test.go +++ b/tempodb/compactor_test.go @@ -2,14 +2,10 @@ package tempodb import ( "context" - "io/ioutil" "math/rand" - "os" - "path" "testing" "time" - "github.com/go-kit/kit/log" "github.com/golang/protobuf/proto" "github.com/google/uuid" "github.com/stretchr/testify/assert" @@ -18,10 +14,6 @@ import ( "github.com/grafana/tempo/pkg/tempopb" "github.com/grafana/tempo/pkg/util/test" "github.com/grafana/tempo/tempodb/backend" - "github.com/grafana/tempo/tempodb/backend/local" - "github.com/grafana/tempo/tempodb/encoding" - "github.com/grafana/tempo/tempodb/pool" - "github.com/grafana/tempo/tempodb/wal" ) type mockSharder struct { @@ -48,32 +40,8 @@ func (m *mockOverrides) BlockRetentionForTenant(_ string) time.Duration { } func TestCompaction(t *testing.T) { - tempDir, err := ioutil.TempDir("/tmp", "") - defer os.RemoveAll(tempDir) - assert.NoError(t, err, "unexpected error creating temp dir") - - r, w, c, err := New(&Config{ - Backend: "local", - Pool: &pool.Config{ - MaxWorkers: 10, - QueueDepth: 100, - }, - Local: &local.Config{ - Path: path.Join(tempDir, "traces"), - }, - Block: &encoding.BlockConfig{ - IndexDownsampleBytes: 11, - BloomFilterShardSize: 100_000, - BloomFilterShardCount: 10, - Encoding: backend.EncLZ4_4M, - IndexPageSizeBytes: 1000, - }, - WAL: &wal.Config{ - Filepath: path.Join(tempDir, "wal"), - }, - BlocklistPoll: 0, - }, log.NewNopLogger()) - require.NoError(t, err) + r, w, c, err := testConfig(backend.EncLZ4_4M, 0) + assert.NoError(t, err) c.EnableCompaction(&CompactorConfig{ ChunkSizeBytes: 10, @@ -177,31 +145,7 @@ func TestCompaction(t *testing.T) { } func TestSameIDCompaction(t *testing.T) { - tempDir, err := ioutil.TempDir("/tmp", "") - defer os.RemoveAll(tempDir) - assert.NoError(t, err, "unexpected error creating temp dir") - - r, w, c, err := New(&Config{ - Backend: "local", - Pool: &pool.Config{ - MaxWorkers: 10, - QueueDepth: 100, - }, - Local: &local.Config{ - Path: path.Join(tempDir, "traces"), - }, - Block: &encoding.BlockConfig{ - IndexDownsampleBytes: 11, - BloomFilterShardSize: 100_000, - BloomFilterShardCount: 10, - Encoding: backend.EncSnappy, - IndexPageSizeBytes: 1000, - }, - WAL: &wal.Config{ - Filepath: path.Join(tempDir, "wal"), - }, - BlocklistPoll: 0, - }, log.NewNopLogger()) + r, w, c, err := testConfig(backend.EncSnappy, 0) assert.NoError(t, err) c.EnableCompaction(&CompactorConfig{ @@ -265,31 +209,7 @@ func TestSameIDCompaction(t *testing.T) { } func TestCompactionUpdatesBlocklist(t *testing.T) { - tempDir, err := ioutil.TempDir("/tmp", "") - defer os.RemoveAll(tempDir) - assert.NoError(t, err, "unexpected error creating temp dir") - - r, w, c, err := New(&Config{ - Backend: "local", - Pool: &pool.Config{ - MaxWorkers: 10, - QueueDepth: 100, - }, - Local: &local.Config{ - Path: path.Join(tempDir, "traces"), - }, - Block: &encoding.BlockConfig{ - IndexDownsampleBytes: 11, - BloomFilterShardSize: 100_000, - BloomFilterShardCount: 10, - Encoding: backend.EncNone, - IndexPageSizeBytes: 1000, - }, - WAL: &wal.Config{ - Filepath: path.Join(tempDir, "wal"), - }, - BlocklistPoll: 0, - }, log.NewNopLogger()) + r, w, c, err := testConfig(backend.EncNone, 0) assert.NoError(t, err) c.EnableCompaction(&CompactorConfig{ @@ -332,31 +252,7 @@ func TestCompactionUpdatesBlocklist(t *testing.T) { } func TestCompactionMetrics(t *testing.T) { - tempDir, err := ioutil.TempDir("/tmp", "") - defer os.RemoveAll(tempDir) - assert.NoError(t, err, "unexpected error creating temp dir") - - r, w, c, err := New(&Config{ - Backend: "local", - Pool: &pool.Config{ - MaxWorkers: 10, - QueueDepth: 100, - }, - Local: &local.Config{ - Path: path.Join(tempDir, "traces"), - }, - Block: &encoding.BlockConfig{ - IndexDownsampleBytes: 11, - BloomFilterShardSize: 100_000, - BloomFilterShardCount: 10, - Encoding: backend.EncNone, - IndexPageSizeBytes: 1000, - }, - WAL: &wal.Config{ - Filepath: path.Join(tempDir, "wal"), - }, - BlocklistPoll: 0, - }, log.NewNopLogger()) + r, w, c, err := testConfig(backend.EncNone, 0) assert.NoError(t, err) c.EnableCompaction(&CompactorConfig{ @@ -403,31 +299,7 @@ func TestCompactionMetrics(t *testing.T) { } func TestCompactionIteratesThroughTenants(t *testing.T) { - tempDir, err := ioutil.TempDir("/tmp", "") - defer os.RemoveAll(tempDir) - assert.NoError(t, err, "unexpected error creating temp dir") - - r, w, c, err := New(&Config{ - Backend: "local", - Pool: &pool.Config{ - MaxWorkers: 10, - QueueDepth: 100, - }, - Local: &local.Config{ - Path: path.Join(tempDir, "traces"), - }, - Block: &encoding.BlockConfig{ - IndexDownsampleBytes: 11, - BloomFilterShardSize: 100_000, - BloomFilterShardCount: 10, - Encoding: backend.EncLZ4_64k, - IndexPageSizeBytes: 1000, - }, - WAL: &wal.Config{ - Filepath: path.Join(tempDir, "wal"), - }, - BlocklistPoll: 0, - }, log.NewNopLogger()) + r, w, c, err := testConfig(backend.EncLZ4_64k, 0) assert.NoError(t, err) c.EnableCompaction(&CompactorConfig{ diff --git a/tempodb/encoding/common/bloom.go b/tempodb/encoding/common/bloom.go index c556a7ff0bf..607dafa8b89 100644 --- a/tempodb/encoding/common/bloom.go +++ b/tempodb/encoding/common/bloom.go @@ -4,8 +4,6 @@ import ( "bytes" "math" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "github.com/willf/bloom" "github.com/grafana/tempo/pkg/util" @@ -13,16 +11,6 @@ import ( const legacyShardCount = 10 -var ( - metricBloomFP = promauto.NewHistogram(prometheus.HistogramOpts{ - Namespace: "tempo", - Name: "bloom_filter_false_positive", - Help: "False positive values for bloom filters created", - // 0.005, 0.020, 0.080, 0.32 - Buckets: prometheus.ExponentialBuckets(0.005, 4, 4), - }) -) - type ShardedBloomFilter struct { blooms []*bloom.BloomFilter } @@ -39,26 +27,28 @@ func evaluateK(shardSizeInBits, itemsPerBloom int) (k int) { } // NewBloom creates a ShardedBloomFilter -func NewBloom(shardSize, shardCount, estimatedObjects int) *ShardedBloomFilter { - itemsPerBloom := estimatedObjects / shardCount - if itemsPerBloom == 0 { - itemsPerBloom = 1 +func NewBloom(fp float64, shardSize, estimatedObjects uint) *ShardedBloomFilter { + // estimate the number of shards needed. an approximate value is enough + var shardCount uint + var kPerBloom uint + for { + shardCount++ + b := bloom.New(shardSize*8, uint(evaluateK(int(shardSize*8), int(estimatedObjects/shardCount)))) + if b.EstimateFalsePositiveRate(estimatedObjects/shardCount) < fp { + kPerBloom = b.K() + break + } } - shardSizeInBits := 8 * shardSize - k := evaluateK(shardSizeInBits, itemsPerBloom) - b := &ShardedBloomFilter{ blooms: make([]*bloom.BloomFilter, shardCount), } - for i := 0; i < shardCount; i++ { + + for i := 0; i < int(shardCount); i++ { // New(m uint, k uint) creates a new Bloom filter with _m_ bits and _k_ hashing functions - b.blooms[i] = bloom.New(uint(shardSizeInBits), uint(k)) + b.blooms[i] = bloom.New(shardSize*8, kPerBloom) } - // metric the false positive rate so we can track if we're making bad blooms - metricBloomFP.Observe(b.blooms[0].EstimateFalsePositiveRate(uint(itemsPerBloom))) - return b } @@ -81,6 +71,10 @@ func (b *ShardedBloomFilter) Write() ([][]byte, error) { return bloomBytes, nil } +func (b *ShardedBloomFilter) GetShardCount() int { + return len(b.blooms) +} + // Test implements bloom.Test -> required only for testing func (b *ShardedBloomFilter) Test(traceID []byte) bool { shardKey := ShardKeyForTraceID(traceID, len(b.blooms)) diff --git a/tempodb/encoding/common/bloom_test.go b/tempodb/encoding/common/bloom_test.go index 209dde423f3..6af339880e3 100644 --- a/tempodb/encoding/common/bloom_test.go +++ b/tempodb/encoding/common/bloom_test.go @@ -2,7 +2,6 @@ package common import ( "bytes" - "math" "math/rand" "testing" @@ -13,7 +12,7 @@ import ( func TestShardedBloom(t *testing.T) { // create a bunch of traceIDs var err error - const numTraces = 1000 + const numTraces = 10000 traceIDs := make([][]byte, 0) for i := 0; i < numTraces; i++ { id := make([]byte, 16) @@ -23,10 +22,10 @@ func TestShardedBloom(t *testing.T) { } // create sharded bloom filter - shardSize := 1000 - shardCount := 5 - estimatedObjects := 1000 - b := NewBloom(shardSize, shardCount, estimatedObjects) + const bloomFP = .01 + shardSize := uint(100) + estimatedObjects := uint(numTraces) + b := NewBloom(bloomFP, shardSize, estimatedObjects) // add traceIDs to sharded bloom filter for _, traceID := range traceIDs { @@ -36,20 +35,18 @@ func TestShardedBloom(t *testing.T) { // get byte representation bloomBytes, err := b.Write() assert.NoError(t, err) - assert.Len(t, bloomBytes, shardCount) // parse byte representation into willf_bloom.Bloomfilter var filters []*willf_bloom.BloomFilter - for i := 0; i < shardCount; i++ { + for i := 0; i < b.GetShardCount(); i++ { filters = append(filters, &willf_bloom.BloomFilter{}) } for i, singleBloom := range bloomBytes { _, err = filters[i].ReadFrom(bytes.NewReader(singleBloom)) assert.NoError(t, err) - // assert that parsed form has the expected _m_ and _k_ - assert.Equal(t, filters[i].Cap(), uint(shardSize*8)) // * 8 because need bits from bytes - assert.Equal(t, filters[i].K(), uint(evaluateK(shardSize*8, estimatedObjects/shardCount))) // * 8 because need bits from bytes + // assert that parsed form has the expected size and atleast the fp specified + assert.Equal(t, shardSize*8, filters[i].Cap()) // * 8 because need bits from bytes } // confirm that the sharded bloom and parsed form give the same result @@ -59,15 +56,60 @@ func TestShardedBloom(t *testing.T) { if !found { missingCount++ } - assert.Equal(t, found, filters[ShardKeyForTraceID(traceID, shardCount)].Test(traceID)) + assert.Equal(t, found, filters[ShardKeyForTraceID(traceID, b.GetShardCount())].Test(traceID)) } - // get estimated bloom filter false positive - estimatedBloomFP := filters[0].EstimateFalsePositiveRate(uint(numTraces / shardCount)) - // check that missingCount is less than estimatedBloomFP - assert.LessOrEqual(t, float64(missingCount), estimatedBloomFP*numTraces) + // check that missingCount is less than bloomFP + assert.LessOrEqual(t, float64(missingCount), bloomFP*numTraces) } -func TestEvaluateK(t *testing.T) { - assert.Equal(t, int(math.Ceil(math.Ln2*float64(100))), evaluateK(1000, 10)) +func TestShardedBloomFalsePositive(t *testing.T) { + tests := []struct { + name string + bloomFP float64 + shardSize uint + estimatedObjects uint + }{ + { + name: "regular", + bloomFP: 0.01, + shardSize: 100, + estimatedObjects: 1000, + }, + { + name: "large estimated objects", + bloomFP: 0.01, + shardSize: 100, + estimatedObjects: 10000, + }, + { + name: "large shard size", + bloomFP: 0.01, + shardSize: 100000, + estimatedObjects: 10, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + + b := NewBloom(tt.bloomFP, tt.shardSize, tt.estimatedObjects) + + // get byte representation + bloomBytes, err := b.Write() + assert.NoError(t, err) + + // parse byte representation into willf_bloom.Bloomfilter + var filters []*willf_bloom.BloomFilter + for i := 0; i < b.GetShardCount(); i++ { + filters = append(filters, &willf_bloom.BloomFilter{}) + } + + for i, singleBloom := range bloomBytes { + _, err = filters[i].ReadFrom(bytes.NewReader(singleBloom)) + assert.NoError(t, err) + assert.LessOrEqual(t, filters[i].EstimateFalsePositiveRate(tt.estimatedObjects/uint(b.GetShardCount())), tt.bloomFP) + } + }) + } } diff --git a/tempodb/encoding/config.go b/tempodb/encoding/config.go index 9d28611b71c..0eebc318576 100644 --- a/tempodb/encoding/config.go +++ b/tempodb/encoding/config.go @@ -8,11 +8,11 @@ import ( // BlockConfig holds configuration options for newly created blocks type BlockConfig struct { - IndexDownsampleBytes int `yaml:"index_downsample_bytes"` - IndexPageSizeBytes int `yaml:"index_page_size_bytes"` - BloomFilterShardSize int `yaml:"bloom_filter_shard_size"` - BloomFilterShardCount uint8 `yaml:"bloom_filter_shard_count"` - Encoding backend.Encoding `yaml:"encoding"` + IndexDownsampleBytes int `yaml:"index_downsample_bytes"` + IndexPageSizeBytes int `yaml:"index_page_size_bytes"` + BloomFP float64 `yaml:"bloom_filter_false_positive"` + BloomFilterShardSize int `yaml:"bloom_filter_shard_size"` + Encoding backend.Encoding `yaml:"encoding"` } // ValidateConfig returns true if the config is valid @@ -25,8 +25,12 @@ func ValidateConfig(b *BlockConfig) error { return fmt.Errorf("Positive index page size required") } - if b.BloomFilterShardSize <= 0 || b.BloomFilterShardCount <= 0 { - return fmt.Errorf("Positive value required for bloom-filter shard size and shard count") + if b.BloomFP <= 0.0 { + return fmt.Errorf("invalid bloom filter fp rate %v", b.BloomFP) + } + + if b.BloomFilterShardSize <= 0 { + return fmt.Errorf("Positive value required for bloom-filter shard size") } return nil diff --git a/tempodb/encoding/streaming_block.go b/tempodb/encoding/streaming_block.go index 731ea340612..137b0efb8e6 100644 --- a/tempodb/encoding/streaming_block.go +++ b/tempodb/encoding/streaming_block.go @@ -39,15 +39,9 @@ func NewStreamingBlock(cfg *BlockConfig, id uuid.UUID, tenantID string, metas [] } c := &StreamingBlock{ -<<<<<<< HEAD - encoding: latestEncoding(), - compactedMeta: backend.NewBlockMeta(tenantID, id, currentVersion, cfg.Encoding), - bloom: common.NewBloom(cfg.BloomFilterShardSize, int(cfg.BloomFilterShardCount), estimatedObjects), -======= encoding: LatestEncoding(), compactedMeta: backend.NewBlockMeta(tenantID, id, currentVersion, cfg.Encoding, dataEncoding), - bloom: common.NewWithEstimates(uint(estimatedObjects), cfg.BloomFP), ->>>>>>> main + bloom: common.NewBloom(cfg.BloomFP, uint(cfg.BloomFilterShardSize), uint(estimatedObjects)), inMetas: metas, cfg: cfg, } @@ -138,7 +132,7 @@ func (c *StreamingBlock) Complete(ctx context.Context, tracker backend.AppendTra meta.TotalRecords = uint32(len(records)) // casting meta.IndexPageSize = uint32(c.cfg.IndexPageSizeBytes) - meta.BloomShardCount = c.cfg.BloomFilterShardCount + meta.BloomShardCount = uint8(c.bloom.GetShardCount()) return bytesFlushed, writeBlockMeta(ctx, w, meta, indexBytes, c.bloom) } diff --git a/tempodb/tempodb_test.go b/tempodb/tempodb_test.go index 8ae261bf927..c43952ba55a 100644 --- a/tempodb/tempodb_test.go +++ b/tempodb/tempodb_test.go @@ -25,9 +25,9 @@ import ( ) const ( - testTenantID = "fake" - testTenantID2 = "fake2" - tmpdir = "/tmp" + testTenantID = "fake" + testTenantID2 = "fake2" + tmpdir = "/tmp" testDataEncoding = "blerg" ) @@ -45,11 +45,11 @@ func testConfig(enc backend.Encoding, blocklistPoll time.Duration) (Reader, Writ Path: path.Join(tempDir, "traces"), }, Block: &encoding.BlockConfig{ - IndexDownsampleBytes: 17, - BloomFilterShardSize: 100_000, - BloomFilterShardCount: 10, - Encoding: enc, - IndexPageSizeBytes: 1000, + IndexDownsampleBytes: 17, + BloomFP: .01, + BloomFilterShardSize: 100_000, + Encoding: enc, + IndexPageSizeBytes: 1000, }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), From 235528be5ad3eaf9d9d3b953b247b5cc25a78ea4 Mon Sep 17 00:00:00 2001 From: Annanay Date: Mon, 17 May 2021 12:30:22 +0530 Subject: [PATCH 03/17] Use bloom.EstimateParameters and check for bloom size rather than fp Signed-off-by: Annanay --- tempodb/encoding/common/bloom.go | 6 +++--- tempodb/encoding/common/bloom_test.go | 5 ++++- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/tempodb/encoding/common/bloom.go b/tempodb/encoding/common/bloom.go index 607dafa8b89..827657a655e 100644 --- a/tempodb/encoding/common/bloom.go +++ b/tempodb/encoding/common/bloom.go @@ -33,9 +33,9 @@ func NewBloom(fp float64, shardSize, estimatedObjects uint) *ShardedBloomFilter var kPerBloom uint for { shardCount++ - b := bloom.New(shardSize*8, uint(evaluateK(int(shardSize*8), int(estimatedObjects/shardCount)))) - if b.EstimateFalsePositiveRate(estimatedObjects/shardCount) < fp { - kPerBloom = b.K() + var m, k uint + if m, k = bloom.EstimateParameters(estimatedObjects/shardCount, fp); m < shardSize { + kPerBloom = k break } } diff --git a/tempodb/encoding/common/bloom_test.go b/tempodb/encoding/common/bloom_test.go index 6af339880e3..248b9d70fbb 100644 --- a/tempodb/encoding/common/bloom_test.go +++ b/tempodb/encoding/common/bloom_test.go @@ -35,6 +35,7 @@ func TestShardedBloom(t *testing.T) { // get byte representation bloomBytes, err := b.Write() assert.NoError(t, err) + assert.Len(t, bloomBytes, b.GetShardCount()) // parse byte representation into willf_bloom.Bloomfilter var filters []*willf_bloom.BloomFilter @@ -45,7 +46,7 @@ func TestShardedBloom(t *testing.T) { _, err = filters[i].ReadFrom(bytes.NewReader(singleBloom)) assert.NoError(t, err) - // assert that parsed form has the expected size and atleast the fp specified + // assert that parsed form has the expected size assert.Equal(t, shardSize*8, filters[i].Cap()) // * 8 because need bits from bytes } @@ -91,7 +92,9 @@ func TestShardedBloomFalsePositive(t *testing.T) { } for _, tt := range tests { + tt := tt // capture range variable, needed for running test cases in parallel t.Run(tt.name, func(t *testing.T) { + t.Parallel() b := NewBloom(tt.bloomFP, tt.shardSize, tt.estimatedObjects) From c92f696185d6d601d121304b5c7d595591763dd9 Mon Sep 17 00:00:00 2001 From: Annanay Date: Mon, 17 May 2021 12:34:21 +0530 Subject: [PATCH 04/17] Lint Signed-off-by: Annanay --- modules/ingester/ingester_test.go | 10 +++++----- modules/ingester/instance_test.go | 10 +++++----- modules/querier/querier_test.go | 10 +++++----- tempodb/encoding/common/bloom.go | 14 ++------------ tempodb/encoding/common/bloom_test.go | 2 +- tempodb/encoding/streaming_block_test.go | 8 ++++---- tempodb/retention_test.go | 20 ++++++++++---------- 7 files changed, 32 insertions(+), 42 deletions(-) diff --git a/modules/ingester/ingester_test.go b/modules/ingester/ingester_test.go index b3528b3d226..c5bedc05d85 100644 --- a/modules/ingester/ingester_test.go +++ b/modules/ingester/ingester_test.go @@ -258,11 +258,11 @@ func defaultIngester(t *testing.T, tmpDir string) (*Ingester, []*tempopb.Trace, Path: tmpDir, }, Block: &encoding.BlockConfig{ - IndexDownsampleBytes: 2, - BloomFilterShardSize: 100_000, - BloomFilterShardCount: 10, - Encoding: backend.EncLZ4_1M, - IndexPageSizeBytes: 1000, + IndexDownsampleBytes: 2, + BloomFP: 0.01, + BloomFilterShardSize: 100_000, + Encoding: backend.EncLZ4_1M, + IndexPageSizeBytes: 1000, }, WAL: &wal.Config{ Filepath: tmpDir, diff --git a/modules/ingester/instance_test.go b/modules/ingester/instance_test.go index a2e380167a9..a48d658ca3c 100644 --- a/modules/ingester/instance_test.go +++ b/modules/ingester/instance_test.go @@ -519,11 +519,11 @@ func defaultInstance(t require.TestingT, tmpDir string) *instance { Path: tmpDir, }, Block: &encoding.BlockConfig{ - IndexDownsampleBytes: 2, - BloomFilterShardSize: 100_000, - BloomFilterShardCount: 10, - Encoding: backend.EncLZ4_1M, - IndexPageSizeBytes: 1000, + IndexDownsampleBytes: 2, + BloomFP: 0.01, + BloomFilterShardSize: 100_000, + Encoding: backend.EncLZ4_1M, + IndexPageSizeBytes: 1000, }, WAL: &wal.Config{ Filepath: tmpDir, diff --git a/modules/querier/querier_test.go b/modules/querier/querier_test.go index 8add04486ef..f48a5acb002 100644 --- a/modules/querier/querier_test.go +++ b/modules/querier/querier_test.go @@ -55,11 +55,11 @@ func TestReturnAllHits(t *testing.T) { Path: path.Join(tempDir, "traces"), }, Block: &encoding.BlockConfig{ - Encoding: backend.EncNone, - IndexDownsampleBytes: 10, - BloomFilterShardSize: 100_000, - BloomFilterShardCount: 10, - IndexPageSizeBytes: 1000, + Encoding: backend.EncNone, + IndexDownsampleBytes: 10, + BloomFP: 0.01, + BloomFilterShardSize: 100_000, + IndexPageSizeBytes: 1000, }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), diff --git a/tempodb/encoding/common/bloom.go b/tempodb/encoding/common/bloom.go index 827657a655e..10f92595664 100644 --- a/tempodb/encoding/common/bloom.go +++ b/tempodb/encoding/common/bloom.go @@ -2,7 +2,6 @@ package common import ( "bytes" - "math" "github.com/willf/bloom" @@ -15,17 +14,6 @@ type ShardedBloomFilter struct { blooms []*bloom.BloomFilter } -func evaluateK(shardSizeInBits, itemsPerBloom int) (k int) { - // Per https://llimllib.github.io/bloomfilter-tutorial/ under "How many hash functions should I use?" - // the optimal value of k: (m/n)ln(2) - // m: number of bits in the filter - // n: estimated number of objects - // k: number of hash functions - k = int(math.Ceil((float64(shardSizeInBits) / float64(itemsPerBloom)) * (math.Ln2))) - - return -} - // NewBloom creates a ShardedBloomFilter func NewBloom(fp float64, shardSize, estimatedObjects uint) *ShardedBloomFilter { // estimate the number of shards needed. an approximate value is enough @@ -34,6 +22,8 @@ func NewBloom(fp float64, shardSize, estimatedObjects uint) *ShardedBloomFilter for { shardCount++ var m, k uint + // m: number of bits in the filter + // k: number of hash functions if m, k = bloom.EstimateParameters(estimatedObjects/shardCount, fp); m < shardSize { kPerBloom = k break diff --git a/tempodb/encoding/common/bloom_test.go b/tempodb/encoding/common/bloom_test.go index 248b9d70fbb..dd66dfd5d74 100644 --- a/tempodb/encoding/common/bloom_test.go +++ b/tempodb/encoding/common/bloom_test.go @@ -92,7 +92,7 @@ func TestShardedBloomFalsePositive(t *testing.T) { } for _, tt := range tests { - tt := tt // capture range variable, needed for running test cases in parallel + tt := tt // capture range variable, needed for running test cases in parallel t.Run(tt.name, func(t *testing.T) { t.Parallel() diff --git a/tempodb/encoding/streaming_block_test.go b/tempodb/encoding/streaming_block_test.go index 309dddfb316..c4d1bf874f5 100644 --- a/tempodb/encoding/streaming_block_test.go +++ b/tempodb/encoding/streaming_block_test.go @@ -59,10 +59,10 @@ func TestStreamingBlockAddObject(t *testing.T) { numObjects := (rand.Int() % 20) + 1 cb, err := NewStreamingBlock(&BlockConfig{ - BloomFilterShardSize: 100_000, - BloomFilterShardCount: 10, - IndexDownsampleBytes: indexDownsample, - Encoding: backend.EncGZIP, + BloomFP: 0.01, + BloomFilterShardSize: 100_000, + IndexDownsampleBytes: indexDownsample, + Encoding: backend.EncGZIP, }, uuid.New(), testTenantID, metas, numObjects) assert.NoError(t, err) diff --git a/tempodb/retention_test.go b/tempodb/retention_test.go index 71b50673113..04d943fb1ca 100644 --- a/tempodb/retention_test.go +++ b/tempodb/retention_test.go @@ -28,11 +28,11 @@ func TestRetention(t *testing.T) { Path: path.Join(tempDir, "traces"), }, Block: &encoding.BlockConfig{ - IndexDownsampleBytes: 17, - BloomFilterShardSize: 100_000, - BloomFilterShardCount: 10, - Encoding: backend.EncLZ4_256k, - IndexPageSizeBytes: 1000, + IndexDownsampleBytes: 17, + BloomFP: 0.01, + BloomFilterShardSize: 100_000, + Encoding: backend.EncLZ4_256k, + IndexPageSizeBytes: 1000, }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), @@ -84,11 +84,11 @@ func TestBlockRetentionOverride(t *testing.T) { Path: path.Join(tempDir, "traces"), }, Block: &encoding.BlockConfig{ - IndexDownsampleBytes: 17, - BloomFilterShardSize: 100_000, - BloomFilterShardCount: 10, - Encoding: backend.EncLZ4_256k, - IndexPageSizeBytes: 1000, + IndexDownsampleBytes: 17, + BloomFP: 0.01, + BloomFilterShardSize: 100_000, + Encoding: backend.EncLZ4_256k, + IndexPageSizeBytes: 1000, }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), From 61640b66644f9f802d6d61741c922350cf24cbe9 Mon Sep 17 00:00:00 2001 From: Annanay Date: Mon, 17 May 2021 12:45:31 +0530 Subject: [PATCH 05/17] Use uint16 in block meta Signed-off-by: Annanay --- tempodb/backend/block_meta.go | 2 +- tempodb/encoding/common/bloom_test.go | 6 ++++++ tempodb/encoding/streaming_block.go | 2 +- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/tempodb/backend/block_meta.go b/tempodb/backend/block_meta.go index 08076660305..4ed52612a5d 100644 --- a/tempodb/backend/block_meta.go +++ b/tempodb/backend/block_meta.go @@ -28,7 +28,7 @@ type BlockMeta struct { IndexPageSize uint32 `json:"indexPageSize"` // Size of each index page in bytes TotalRecords uint32 `json:"totalRecords"` // Total Records stored in the index file DataEncoding string `json:"dataEncoding"` // DataEncoding is a string provided externally, but tracked by tempodb that indicates the way the bytes are encoded - BloomShardCount uint8 `json:"bloomShardCount"` // Number of bloom filter shards + BloomShardCount uint16 `json:"bloomShardCount"` // Number of bloom filter shards } func NewBlockMeta(tenantID string, blockID uuid.UUID, version string, encoding Encoding, dataEncoding string) *BlockMeta { diff --git a/tempodb/encoding/common/bloom_test.go b/tempodb/encoding/common/bloom_test.go index dd66dfd5d74..dea2644d8c2 100644 --- a/tempodb/encoding/common/bloom_test.go +++ b/tempodb/encoding/common/bloom_test.go @@ -89,6 +89,12 @@ func TestShardedBloomFalsePositive(t *testing.T) { shardSize: 100000, estimatedObjects: 10, }, + { + name: "current scale", + bloomFP: 0.05, + shardSize: 250 * 1024, + estimatedObjects: 10_000_000, + }, } for _, tt := range tests { diff --git a/tempodb/encoding/streaming_block.go b/tempodb/encoding/streaming_block.go index 137b0efb8e6..3f6abb5487a 100644 --- a/tempodb/encoding/streaming_block.go +++ b/tempodb/encoding/streaming_block.go @@ -132,7 +132,7 @@ func (c *StreamingBlock) Complete(ctx context.Context, tracker backend.AppendTra meta.TotalRecords = uint32(len(records)) // casting meta.IndexPageSize = uint32(c.cfg.IndexPageSizeBytes) - meta.BloomShardCount = uint8(c.bloom.GetShardCount()) + meta.BloomShardCount = uint16(c.bloom.GetShardCount()) return bytesFlushed, writeBlockMeta(ctx, w, meta, indexBytes, c.bloom) } From 1b165c4d51cbc147a63323e74dd8512bb3081359 Mon Sep 17 00:00:00 2001 From: Annanay Date: Mon, 17 May 2021 16:00:42 +0530 Subject: [PATCH 06/17] Increase test timeout to 20m Signed-off-by: Annanay --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 8f237c6f869..a0fa712a28a 100644 --- a/Makefile +++ b/Makefile @@ -24,7 +24,7 @@ ALL_DOC := $(shell find . \( -name "*.md" -o -name "*.yaml" \) \ ALL_PKGS := $(shell go list $(sort $(dir $(ALL_SRC)))) GO_OPT= -mod vendor -ldflags "-X main.Branch=$(GIT_BRANCH) -X main.Revision=$(GIT_REVISION) -X main.Version=$(VERSION)" -GOTEST_OPT?= -race -timeout 10m -count=1 +GOTEST_OPT?= -race -timeout 20m -count=1 GOTEST_OPT_WITH_COVERAGE = $(GOTEST_OPT) -cover GOTEST=go test LINT=golangci-lint From 3b77daf56714c6fa027d92ea880cd3ed4a5d2fdd Mon Sep 17 00:00:00 2001 From: Annanay Date: Tue, 18 May 2021 19:01:04 +0530 Subject: [PATCH 07/17] Add more testing Signed-off-by: Annanay --- Makefile | 2 +- tempodb/backend/block_meta.go | 2 +- tempodb/backend/block_meta_test.go | 27 +++++ tempodb/backend/local/local_test.go | 3 +- tempodb/compactor_test.go | 140 ++++++++++++++++++++++- tempodb/encoding/streaming_block_test.go | 7 +- tempodb/tempodb_test.go | 11 +- 7 files changed, 179 insertions(+), 13 deletions(-) diff --git a/Makefile b/Makefile index a0fa712a28a..8f237c6f869 100644 --- a/Makefile +++ b/Makefile @@ -24,7 +24,7 @@ ALL_DOC := $(shell find . \( -name "*.md" -o -name "*.yaml" \) \ ALL_PKGS := $(shell go list $(sort $(dir $(ALL_SRC)))) GO_OPT= -mod vendor -ldflags "-X main.Branch=$(GIT_BRANCH) -X main.Revision=$(GIT_REVISION) -X main.Version=$(VERSION)" -GOTEST_OPT?= -race -timeout 20m -count=1 +GOTEST_OPT?= -race -timeout 10m -count=1 GOTEST_OPT_WITH_COVERAGE = $(GOTEST_OPT) -cover GOTEST=go test LINT=golangci-lint diff --git a/tempodb/backend/block_meta.go b/tempodb/backend/block_meta.go index 4ed52612a5d..a1f2597b1eb 100644 --- a/tempodb/backend/block_meta.go +++ b/tempodb/backend/block_meta.go @@ -28,7 +28,7 @@ type BlockMeta struct { IndexPageSize uint32 `json:"indexPageSize"` // Size of each index page in bytes TotalRecords uint32 `json:"totalRecords"` // Total Records stored in the index file DataEncoding string `json:"dataEncoding"` // DataEncoding is a string provided externally, but tracked by tempodb that indicates the way the bytes are encoded - BloomShardCount uint16 `json:"bloomShardCount"` // Number of bloom filter shards + BloomShardCount uint16 `json:"bloomShards"` // Number of bloom filter shards } func NewBlockMeta(tenantID string, blockID uuid.UUID, version string, encoding Encoding, dataEncoding string) *BlockMeta { diff --git a/tempodb/backend/block_meta_test.go b/tempodb/backend/block_meta_test.go index 68f980eb5ad..e37fadf72d1 100644 --- a/tempodb/backend/block_meta_test.go +++ b/tempodb/backend/block_meta_test.go @@ -2,6 +2,7 @@ package backend import ( "bytes" + "encoding/json" "math/rand" "testing" @@ -41,3 +42,29 @@ func TestBlockMeta(t *testing.T) { assert.Equal(t, 1, bytes.Compare(b.MaxID, b.MinID)) assert.Equal(t, 2, b.TotalObjects) } + +func TestBlockMetaParsing(t *testing.T) { + inputJSON := ` +{ + "format": "v0", + "blockID": "00000000-0000-0000-0000-000000000000", + "minID": "AAAAAAAAAAAAOO0z0LnnHg==", + "maxID": "AAAAAAAAAAD/o61w2bYIDg==", + "tenantID": "single-tenant", + "startTime": "2021-01-01T00:00:00.0000000Z", + "endTime": "2021-01-02T00:00:00.0000000Z", + "totalObjects": 10, + "size": 12345, + "compactionLevel": 0, + "encoding": "zstd", + "indexPageSize": 250000, + "totalRecords": 124356, + "dataEncoding": "", + "bloomShards": 244 +} +` + + blockMeta := BlockMeta{} + err := json.Unmarshal([]byte(inputJSON), &blockMeta) + assert.NoError(t, err, "expected to be able to unmarshal from JSON") +} diff --git a/tempodb/backend/local/local_test.go b/tempodb/backend/local/local_test.go index fc4886902eb..b69194bd6ce 100644 --- a/tempodb/backend/local/local_test.go +++ b/tempodb/backend/local/local_test.go @@ -10,9 +10,10 @@ import ( "testing" "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/encoding/common" - "github.com/stretchr/testify/assert" ) const objectName = "test" diff --git a/tempodb/compactor_test.go b/tempodb/compactor_test.go index edf34f2f85d..52eedffda53 100644 --- a/tempodb/compactor_test.go +++ b/tempodb/compactor_test.go @@ -2,10 +2,14 @@ package tempodb import ( "context" + "io/ioutil" "math/rand" + "os" + "path" "testing" "time" + "github.com/go-kit/kit/log" "github.com/golang/protobuf/proto" "github.com/google/uuid" "github.com/stretchr/testify/assert" @@ -14,6 +18,10 @@ import ( "github.com/grafana/tempo/pkg/tempopb" "github.com/grafana/tempo/pkg/util/test" "github.com/grafana/tempo/tempodb/backend" + "github.com/grafana/tempo/tempodb/backend/local" + "github.com/grafana/tempo/tempodb/encoding" + "github.com/grafana/tempo/tempodb/pool" + "github.com/grafana/tempo/tempodb/wal" ) type mockSharder struct { @@ -40,8 +48,32 @@ func (m *mockOverrides) BlockRetentionForTenant(_ string) time.Duration { } func TestCompaction(t *testing.T) { - r, w, c, err := testConfig(backend.EncLZ4_4M, 0) - assert.NoError(t, err) + tempDir, err := ioutil.TempDir("/tmp", "") + defer os.RemoveAll(tempDir) + assert.NoError(t, err, "unexpected error creating temp dir") + + r, w, c, err := New(&Config{ + Backend: "local", + Pool: &pool.Config{ + MaxWorkers: 10, + QueueDepth: 100, + }, + Local: &local.Config{ + Path: path.Join(tempDir, "traces"), + }, + Block: &encoding.BlockConfig{ + IndexDownsampleBytes: 11, + BloomFP: .01, + BloomFilterShardSize: 100_000, + Encoding: backend.EncLZ4_4M, + IndexPageSizeBytes: 1000, + }, + WAL: &wal.Config{ + Filepath: path.Join(tempDir, "wal"), + }, + BlocklistPoll: 0, + }, log.NewNopLogger()) + require.NoError(t, err) c.EnableCompaction(&CompactorConfig{ ChunkSizeBytes: 10, @@ -145,7 +177,31 @@ func TestCompaction(t *testing.T) { } func TestSameIDCompaction(t *testing.T) { - r, w, c, err := testConfig(backend.EncSnappy, 0) + tempDir, err := ioutil.TempDir("/tmp", "") + defer os.RemoveAll(tempDir) + assert.NoError(t, err, "unexpected error creating temp dir") + + r, w, c, err := New(&Config{ + Backend: "local", + Pool: &pool.Config{ + MaxWorkers: 10, + QueueDepth: 100, + }, + Local: &local.Config{ + Path: path.Join(tempDir, "traces"), + }, + Block: &encoding.BlockConfig{ + IndexDownsampleBytes: 11, + BloomFP: .01, + BloomFilterShardSize: 100_000, + Encoding: backend.EncSnappy, + IndexPageSizeBytes: 1000, + }, + WAL: &wal.Config{ + Filepath: path.Join(tempDir, "wal"), + }, + BlocklistPoll: 0, + }, log.NewNopLogger()) assert.NoError(t, err) c.EnableCompaction(&CompactorConfig{ @@ -209,7 +265,31 @@ func TestSameIDCompaction(t *testing.T) { } func TestCompactionUpdatesBlocklist(t *testing.T) { - r, w, c, err := testConfig(backend.EncNone, 0) + tempDir, err := ioutil.TempDir("/tmp", "") + defer os.RemoveAll(tempDir) + assert.NoError(t, err, "unexpected error creating temp dir") + + r, w, c, err := New(&Config{ + Backend: "local", + Pool: &pool.Config{ + MaxWorkers: 10, + QueueDepth: 100, + }, + Local: &local.Config{ + Path: path.Join(tempDir, "traces"), + }, + Block: &encoding.BlockConfig{ + IndexDownsampleBytes: 11, + BloomFP: .01, + BloomFilterShardSize: 100_000, + Encoding: backend.EncNone, + IndexPageSizeBytes: 1000, + }, + WAL: &wal.Config{ + Filepath: path.Join(tempDir, "wal"), + }, + BlocklistPoll: 0, + }, log.NewNopLogger()) assert.NoError(t, err) c.EnableCompaction(&CompactorConfig{ @@ -252,7 +332,31 @@ func TestCompactionUpdatesBlocklist(t *testing.T) { } func TestCompactionMetrics(t *testing.T) { - r, w, c, err := testConfig(backend.EncNone, 0) + tempDir, err := ioutil.TempDir("/tmp", "") + defer os.RemoveAll(tempDir) + assert.NoError(t, err, "unexpected error creating temp dir") + + r, w, c, err := New(&Config{ + Backend: "local", + Pool: &pool.Config{ + MaxWorkers: 10, + QueueDepth: 100, + }, + Local: &local.Config{ + Path: path.Join(tempDir, "traces"), + }, + Block: &encoding.BlockConfig{ + IndexDownsampleBytes: 11, + BloomFP: .01, + BloomFilterShardSize: 100_000, + Encoding: backend.EncNone, + IndexPageSizeBytes: 1000, + }, + WAL: &wal.Config{ + Filepath: path.Join(tempDir, "wal"), + }, + BlocklistPoll: 0, + }, log.NewNopLogger()) assert.NoError(t, err) c.EnableCompaction(&CompactorConfig{ @@ -299,7 +403,31 @@ func TestCompactionMetrics(t *testing.T) { } func TestCompactionIteratesThroughTenants(t *testing.T) { - r, w, c, err := testConfig(backend.EncLZ4_64k, 0) + tempDir, err := ioutil.TempDir("/tmp", "") + defer os.RemoveAll(tempDir) + assert.NoError(t, err, "unexpected error creating temp dir") + + r, w, c, err := New(&Config{ + Backend: "local", + Pool: &pool.Config{ + MaxWorkers: 10, + QueueDepth: 100, + }, + Local: &local.Config{ + Path: path.Join(tempDir, "traces"), + }, + Block: &encoding.BlockConfig{ + IndexDownsampleBytes: 11, + BloomFP: .01, + BloomFilterShardSize: 100_000, + Encoding: backend.EncLZ4_64k, + IndexPageSizeBytes: 1000, + }, + WAL: &wal.Config{ + Filepath: path.Join(tempDir, "wal"), + }, + BlocklistPoll: 0, + }, log.NewNopLogger()) assert.NoError(t, err) c.EnableCompaction(&CompactorConfig{ diff --git a/tempodb/encoding/streaming_block_test.go b/tempodb/encoding/streaming_block_test.go index c4d1bf874f5..1587df840cc 100644 --- a/tempodb/encoding/streaming_block_test.go +++ b/tempodb/encoding/streaming_block_test.go @@ -60,7 +60,7 @@ func TestStreamingBlockAddObject(t *testing.T) { numObjects := (rand.Int() % 20) + 1 cb, err := NewStreamingBlock(&BlockConfig{ BloomFP: 0.01, - BloomFilterShardSize: 100_000, + BloomFilterShardSize: 100, IndexDownsampleBytes: indexDownsample, Encoding: backend.EncGZIP, }, uuid.New(), testTenantID, metas, numObjects) @@ -118,6 +118,7 @@ func TestStreamingBlockAddObject(t *testing.T) { assert.Equal(t, testTenantID, meta.TenantID) assert.Equal(t, numObjects, meta.TotalObjects) assert.Greater(t, meta.Size, uint64(0)) + assert.Greater(t, cb.bloom.GetShardCount(), 0) // bloom for _, id := range ids { @@ -238,6 +239,7 @@ func streamingBlock(t *testing.T, cfg *BlockConfig, w backend.Writer) (*Streamin originatingMeta.StartTime = time.Now().Add(-5 * time.Minute) originatingMeta.EndTime = time.Now().Add(5 * time.Minute) originatingMeta.DataEncoding = "foo" + originatingMeta.TotalObjects = numMsgs // calc expected records byteCounter := 0 @@ -260,6 +262,8 @@ func streamingBlock(t *testing.T, cfg *BlockConfig, w backend.Writer) (*Streamin block, err := NewStreamingBlock(cfg, originatingMeta.BlockID, originatingMeta.TenantID, []*backend.BlockMeta{originatingMeta}, originatingMeta.TotalObjects) require.NoError(t, err, "unexpected error completing block") + expectedBloomShards := block.bloom.GetShardCount() + ctx := context.Background() for { id, data, err := iter.Next(ctx) @@ -288,6 +292,7 @@ func streamingBlock(t *testing.T, cfg *BlockConfig, w backend.Writer) (*Streamin require.Equal(t, originatingMeta.EndTime, block.BlockMeta().EndTime) require.Equal(t, originatingMeta.TenantID, block.BlockMeta().TenantID) require.Equal(t, originatingMeta.DataEncoding, block.BlockMeta().DataEncoding) + require.Equal(t, expectedBloomShards, int(block.BlockMeta().BloomShardCount)) // Verify block size was written require.Greater(t, block.BlockMeta().Size, uint64(0)) diff --git a/tempodb/tempodb_test.go b/tempodb/tempodb_test.go index c43952ba55a..7458494e760 100644 --- a/tempodb/tempodb_test.go +++ b/tempodb/tempodb_test.go @@ -10,9 +10,12 @@ import ( "time" "github.com/go-kit/kit/log" - "github.com/golang/protobuf/proto" "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/grafana/tempo/tempodb/pool" "github.com/grafana/tempo/pkg/tempopb" "github.com/grafana/tempo/pkg/util/test" "github.com/grafana/tempo/tempodb/backend" @@ -20,8 +23,6 @@ import ( "github.com/grafana/tempo/tempodb/encoding" "github.com/grafana/tempo/tempodb/encoding/common" "github.com/grafana/tempo/tempodb/wal" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) const ( @@ -41,6 +42,10 @@ func testConfig(enc backend.Encoding, blocklistPoll time.Duration) (Reader, Writ return New(&Config{ Backend: "local", + Pool: &pool.Config{ + MaxWorkers: 10, + QueueDepth: 100, + }, Local: &local.Config{ Path: path.Join(tempDir, "traces"), }, From da69f284e692fe77f44d90e21ff01f61fea3cf4f Mon Sep 17 00:00:00 2001 From: Annanay Date: Wed, 19 May 2021 13:53:55 +0530 Subject: [PATCH 08/17] Address comments, rename config key, Write() => Marshal() Signed-off-by: Annanay --- modules/ingester/ingester_test.go | 2 +- modules/ingester/instance_test.go | 2 +- modules/querier/querier_test.go | 2 +- modules/storage/config.go | 2 +- tempodb/compactor_bookmark_test.go | 2 +- tempodb/compactor_test.go | 10 +++++----- tempodb/encoding/block.go | 2 +- tempodb/encoding/common/bloom.go | 25 +++++++++++------------- tempodb/encoding/common/bloom_test.go | 4 ++-- tempodb/encoding/config.go | 4 ++-- tempodb/encoding/streaming_block.go | 2 +- tempodb/encoding/streaming_block_test.go | 2 +- tempodb/retention_test.go | 4 ++-- tempodb/tempodb_test.go | 4 ++-- 14 files changed, 32 insertions(+), 35 deletions(-) diff --git a/modules/ingester/ingester_test.go b/modules/ingester/ingester_test.go index c5bedc05d85..7e0587c62a4 100644 --- a/modules/ingester/ingester_test.go +++ b/modules/ingester/ingester_test.go @@ -260,7 +260,7 @@ func defaultIngester(t *testing.T, tmpDir string) (*Ingester, []*tempopb.Trace, Block: &encoding.BlockConfig{ IndexDownsampleBytes: 2, BloomFP: 0.01, - BloomFilterShardSize: 100_000, + BloomShardSizeBytes: 100_000, Encoding: backend.EncLZ4_1M, IndexPageSizeBytes: 1000, }, diff --git a/modules/ingester/instance_test.go b/modules/ingester/instance_test.go index a48d658ca3c..c4ba9906e9f 100644 --- a/modules/ingester/instance_test.go +++ b/modules/ingester/instance_test.go @@ -521,7 +521,7 @@ func defaultInstance(t require.TestingT, tmpDir string) *instance { Block: &encoding.BlockConfig{ IndexDownsampleBytes: 2, BloomFP: 0.01, - BloomFilterShardSize: 100_000, + BloomShardSizeBytes: 100_000, Encoding: backend.EncLZ4_1M, IndexPageSizeBytes: 1000, }, diff --git a/modules/querier/querier_test.go b/modules/querier/querier_test.go index f48a5acb002..fbbd6fef99b 100644 --- a/modules/querier/querier_test.go +++ b/modules/querier/querier_test.go @@ -58,7 +58,7 @@ func TestReturnAllHits(t *testing.T) { Encoding: backend.EncNone, IndexDownsampleBytes: 10, BloomFP: 0.01, - BloomFilterShardSize: 100_000, + BloomShardSizeBytes: 100_000, IndexPageSizeBytes: 1000, }, WAL: &wal.Config{ diff --git a/modules/storage/config.go b/modules/storage/config.go index 4f5fdc70015..456d30c9232 100644 --- a/modules/storage/config.go +++ b/modules/storage/config.go @@ -39,7 +39,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) cfg.Trace.Block = &encoding.BlockConfig{} f.Float64Var(&cfg.Trace.Block.BloomFP, util.PrefixConfig(prefix, "trace.block.bloom-filter-false-positive"), .05, "Bloom Filter False Positive.") - f.IntVar(&cfg.Trace.Block.BloomFilterShardSize, util.PrefixConfig(prefix, "trace.block.bloom-filter-shard-size"), 250*1024, "Bloom Filter Shard Size.") + f.IntVar(&cfg.Trace.Block.BloomShardSizeBytes, util.PrefixConfig(prefix, "trace.block.bloom-filter-shard-size-bytes"), 250*1024, "Bloom Filter Shard Size in bytes.") f.IntVar(&cfg.Trace.Block.IndexDownsampleBytes, util.PrefixConfig(prefix, "trace.block.index-downsample-bytes"), 1024*1024, "Number of bytes (before compression) per index record.") f.IntVar(&cfg.Trace.Block.IndexPageSizeBytes, util.PrefixConfig(prefix, "trace.block.index-page-size-bytes"), 250*1024, "Number of bytes per index page.") cfg.Trace.Block.Encoding = backend.EncZstd diff --git a/tempodb/compactor_bookmark_test.go b/tempodb/compactor_bookmark_test.go index 0ded50230ef..225e27b527a 100644 --- a/tempodb/compactor_bookmark_test.go +++ b/tempodb/compactor_bookmark_test.go @@ -35,7 +35,7 @@ func TestCurrentClear(t *testing.T) { Block: &encoding.BlockConfig{ IndexDownsampleBytes: 17, BloomFP: .01, - BloomFilterShardSize: 100_000, + BloomShardSizeBytes: 100_000, Encoding: backend.EncGZIP, IndexPageSizeBytes: 1000, }, diff --git a/tempodb/compactor_test.go b/tempodb/compactor_test.go index 52eedffda53..735fb7a5cd0 100644 --- a/tempodb/compactor_test.go +++ b/tempodb/compactor_test.go @@ -64,7 +64,7 @@ func TestCompaction(t *testing.T) { Block: &encoding.BlockConfig{ IndexDownsampleBytes: 11, BloomFP: .01, - BloomFilterShardSize: 100_000, + BloomShardSizeBytes: 100_000, Encoding: backend.EncLZ4_4M, IndexPageSizeBytes: 1000, }, @@ -193,7 +193,7 @@ func TestSameIDCompaction(t *testing.T) { Block: &encoding.BlockConfig{ IndexDownsampleBytes: 11, BloomFP: .01, - BloomFilterShardSize: 100_000, + BloomShardSizeBytes: 100_000, Encoding: backend.EncSnappy, IndexPageSizeBytes: 1000, }, @@ -281,7 +281,7 @@ func TestCompactionUpdatesBlocklist(t *testing.T) { Block: &encoding.BlockConfig{ IndexDownsampleBytes: 11, BloomFP: .01, - BloomFilterShardSize: 100_000, + BloomShardSizeBytes: 100_000, Encoding: backend.EncNone, IndexPageSizeBytes: 1000, }, @@ -348,7 +348,7 @@ func TestCompactionMetrics(t *testing.T) { Block: &encoding.BlockConfig{ IndexDownsampleBytes: 11, BloomFP: .01, - BloomFilterShardSize: 100_000, + BloomShardSizeBytes: 100_000, Encoding: backend.EncNone, IndexPageSizeBytes: 1000, }, @@ -419,7 +419,7 @@ func TestCompactionIteratesThroughTenants(t *testing.T) { Block: &encoding.BlockConfig{ IndexDownsampleBytes: 11, BloomFP: .01, - BloomFilterShardSize: 100_000, + BloomShardSizeBytes: 100_000, Encoding: backend.EncLZ4_64k, IndexPageSizeBytes: 1000, }, diff --git a/tempodb/encoding/block.go b/tempodb/encoding/block.go index b0d2d5daba0..08f24c7fb1f 100644 --- a/tempodb/encoding/block.go +++ b/tempodb/encoding/block.go @@ -26,7 +26,7 @@ func bloomName(shard int) string { // writeBlockMeta writes the bloom filter, meta and index to the passed in backend.Writer func writeBlockMeta(ctx context.Context, w backend.Writer, meta *backend.BlockMeta, indexBytes []byte, b *common.ShardedBloomFilter) error { - blooms, err := b.Write() + blooms, err := b.Marshal() if err != nil { return err } diff --git a/tempodb/encoding/common/bloom.go b/tempodb/encoding/common/bloom.go index 10f92595664..ea93807873a 100644 --- a/tempodb/encoding/common/bloom.go +++ b/tempodb/encoding/common/bloom.go @@ -16,18 +16,15 @@ type ShardedBloomFilter struct { // NewBloom creates a ShardedBloomFilter func NewBloom(fp float64, shardSize, estimatedObjects uint) *ShardedBloomFilter { - // estimate the number of shards needed. an approximate value is enough + // estimate the number of shards needed + // m: number of bits in the filter + // k: number of hash functions var shardCount uint - var kPerBloom uint - for { - shardCount++ - var m, k uint - // m: number of bits in the filter - // k: number of hash functions - if m, k = bloom.EstimateParameters(estimatedObjects/shardCount, fp); m < shardSize { - kPerBloom = k - break - } + m, k := bloom.EstimateParameters(estimatedObjects, fp) + if m%(shardSize*8) == 0 { + shardCount = m / (shardSize * 8) + } else { + shardCount = (m / (shardSize * 8)) + 1 } b := &ShardedBloomFilter{ @@ -36,7 +33,7 @@ func NewBloom(fp float64, shardSize, estimatedObjects uint) *ShardedBloomFilter for i := 0; i < int(shardCount); i++ { // New(m uint, k uint) creates a new Bloom filter with _m_ bits and _k_ hashing functions - b.blooms[i] = bloom.New(shardSize*8, kPerBloom) + b.blooms[i] = bloom.New(shardSize*8, k) } return b @@ -47,8 +44,8 @@ func (b *ShardedBloomFilter) Add(traceID []byte) { b.blooms[shardKey].Add(traceID) } -// Write is a wrapper around bloom.WriteTo -func (b *ShardedBloomFilter) Write() ([][]byte, error) { +// Marshal is a wrapper around bloom.WriteTo +func (b *ShardedBloomFilter) Marshal() ([][]byte, error) { bloomBytes := make([][]byte, len(b.blooms)) for i, f := range b.blooms { bloomBuffer := &bytes.Buffer{} diff --git a/tempodb/encoding/common/bloom_test.go b/tempodb/encoding/common/bloom_test.go index dea2644d8c2..24dda40cdcc 100644 --- a/tempodb/encoding/common/bloom_test.go +++ b/tempodb/encoding/common/bloom_test.go @@ -33,7 +33,7 @@ func TestShardedBloom(t *testing.T) { } // get byte representation - bloomBytes, err := b.Write() + bloomBytes, err := b.Marshal() assert.NoError(t, err) assert.Len(t, bloomBytes, b.GetShardCount()) @@ -105,7 +105,7 @@ func TestShardedBloomFalsePositive(t *testing.T) { b := NewBloom(tt.bloomFP, tt.shardSize, tt.estimatedObjects) // get byte representation - bloomBytes, err := b.Write() + bloomBytes, err := b.Marshal() assert.NoError(t, err) // parse byte representation into willf_bloom.Bloomfilter diff --git a/tempodb/encoding/config.go b/tempodb/encoding/config.go index 0eebc318576..69fe00bc25e 100644 --- a/tempodb/encoding/config.go +++ b/tempodb/encoding/config.go @@ -11,7 +11,7 @@ type BlockConfig struct { IndexDownsampleBytes int `yaml:"index_downsample_bytes"` IndexPageSizeBytes int `yaml:"index_page_size_bytes"` BloomFP float64 `yaml:"bloom_filter_false_positive"` - BloomFilterShardSize int `yaml:"bloom_filter_shard_size"` + BloomShardSizeBytes int `yaml:"bloom_filter_shard_size_bytes"` Encoding backend.Encoding `yaml:"encoding"` } @@ -29,7 +29,7 @@ func ValidateConfig(b *BlockConfig) error { return fmt.Errorf("invalid bloom filter fp rate %v", b.BloomFP) } - if b.BloomFilterShardSize <= 0 { + if b.BloomShardSizeBytes <= 0 { return fmt.Errorf("Positive value required for bloom-filter shard size") } diff --git a/tempodb/encoding/streaming_block.go b/tempodb/encoding/streaming_block.go index 3f6abb5487a..0bedfab1712 100644 --- a/tempodb/encoding/streaming_block.go +++ b/tempodb/encoding/streaming_block.go @@ -41,7 +41,7 @@ func NewStreamingBlock(cfg *BlockConfig, id uuid.UUID, tenantID string, metas [] c := &StreamingBlock{ encoding: LatestEncoding(), compactedMeta: backend.NewBlockMeta(tenantID, id, currentVersion, cfg.Encoding, dataEncoding), - bloom: common.NewBloom(cfg.BloomFP, uint(cfg.BloomFilterShardSize), uint(estimatedObjects)), + bloom: common.NewBloom(cfg.BloomFP, uint(cfg.BloomShardSizeBytes), uint(estimatedObjects)), inMetas: metas, cfg: cfg, } diff --git a/tempodb/encoding/streaming_block_test.go b/tempodb/encoding/streaming_block_test.go index 1587df840cc..15453554b7a 100644 --- a/tempodb/encoding/streaming_block_test.go +++ b/tempodb/encoding/streaming_block_test.go @@ -60,7 +60,7 @@ func TestStreamingBlockAddObject(t *testing.T) { numObjects := (rand.Int() % 20) + 1 cb, err := NewStreamingBlock(&BlockConfig{ BloomFP: 0.01, - BloomFilterShardSize: 100, + BloomShardSizeBytes: 100, IndexDownsampleBytes: indexDownsample, Encoding: backend.EncGZIP, }, uuid.New(), testTenantID, metas, numObjects) diff --git a/tempodb/retention_test.go b/tempodb/retention_test.go index 04d943fb1ca..e9a2cecf5ff 100644 --- a/tempodb/retention_test.go +++ b/tempodb/retention_test.go @@ -30,7 +30,7 @@ func TestRetention(t *testing.T) { Block: &encoding.BlockConfig{ IndexDownsampleBytes: 17, BloomFP: 0.01, - BloomFilterShardSize: 100_000, + BloomShardSizeBytes: 100_000, Encoding: backend.EncLZ4_256k, IndexPageSizeBytes: 1000, }, @@ -86,7 +86,7 @@ func TestBlockRetentionOverride(t *testing.T) { Block: &encoding.BlockConfig{ IndexDownsampleBytes: 17, BloomFP: 0.01, - BloomFilterShardSize: 100_000, + BloomShardSizeBytes: 100_000, Encoding: backend.EncLZ4_256k, IndexPageSizeBytes: 1000, }, diff --git a/tempodb/tempodb_test.go b/tempodb/tempodb_test.go index 7458494e760..297fcc1e58b 100644 --- a/tempodb/tempodb_test.go +++ b/tempodb/tempodb_test.go @@ -15,13 +15,13 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/grafana/tempo/tempodb/pool" "github.com/grafana/tempo/pkg/tempopb" "github.com/grafana/tempo/pkg/util/test" "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/backend/local" "github.com/grafana/tempo/tempodb/encoding" "github.com/grafana/tempo/tempodb/encoding/common" + "github.com/grafana/tempo/tempodb/pool" "github.com/grafana/tempo/tempodb/wal" ) @@ -52,7 +52,7 @@ func testConfig(enc backend.Encoding, blocklistPoll time.Duration) (Reader, Writ Block: &encoding.BlockConfig{ IndexDownsampleBytes: 17, BloomFP: .01, - BloomFilterShardSize: 100_000, + BloomShardSizeBytes: 100_000, Encoding: enc, IndexPageSizeBytes: 1000, }, From 9e8e807f553ef4dbcbc50a12bf929353993a4599 Mon Sep 17 00:00:00 2001 From: Annanay Date: Wed, 19 May 2021 15:46:25 +0530 Subject: [PATCH 09/17] Fix tests Signed-off-by: Annanay --- tempodb/encoding/common/bloom.go | 4 ++++ tempodb/encoding/streaming_block_test.go | 1 + 2 files changed, 5 insertions(+) diff --git a/tempodb/encoding/common/bloom.go b/tempodb/encoding/common/bloom.go index ea93807873a..ceeeb278556 100644 --- a/tempodb/encoding/common/bloom.go +++ b/tempodb/encoding/common/bloom.go @@ -27,6 +27,10 @@ func NewBloom(fp float64, shardSize, estimatedObjects uint) *ShardedBloomFilter shardCount = (m / (shardSize * 8)) + 1 } + if shardCount == 0 { + shardCount = 1 + } + b := &ShardedBloomFilter{ blooms: make([]*bloom.BloomFilter, shardCount), } diff --git a/tempodb/encoding/streaming_block_test.go b/tempodb/encoding/streaming_block_test.go index 15453554b7a..b2a348d9646 100644 --- a/tempodb/encoding/streaming_block_test.go +++ b/tempodb/encoding/streaming_block_test.go @@ -138,6 +138,7 @@ func TestStreamingBlockAll(t *testing.T) { &BlockConfig{ IndexDownsampleBytes: 1000, BloomFP: .01, + BloomShardSizeBytes: 10_000, Encoding: enc, IndexPageSizeBytes: 1000, }, From 3bb6a8b3092d21bf1437ea5eb8b8df8904a9545f Mon Sep 17 00:00:00 2001 From: Annanay Date: Wed, 19 May 2021 17:12:56 +0530 Subject: [PATCH 10/17] Sanity checks on shardCount Signed-off-by: Annanay --- tempodb/encoding/common/bloom.go | 10 +++++++++- tempodb/tempodb_test.go | 5 ----- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/tempodb/encoding/common/bloom.go b/tempodb/encoding/common/bloom.go index ceeeb278556..0560a86808e 100644 --- a/tempodb/encoding/common/bloom.go +++ b/tempodb/encoding/common/bloom.go @@ -2,13 +2,17 @@ package common import ( "bytes" + "math" "github.com/willf/bloom" "github.com/grafana/tempo/pkg/util" ) -const legacyShardCount = 10 +const ( + legacyShardCount = 10 + maxShardCount = math.MaxUint16 +) type ShardedBloomFilter struct { blooms []*bloom.BloomFilter @@ -31,6 +35,10 @@ func NewBloom(fp float64, shardSize, estimatedObjects uint) *ShardedBloomFilter shardCount = 1 } + if shardCount > maxShardCount { + shardCount = maxShardCount + } + b := &ShardedBloomFilter{ blooms: make([]*bloom.BloomFilter, shardCount), } diff --git a/tempodb/tempodb_test.go b/tempodb/tempodb_test.go index 297fcc1e58b..eee604cfeac 100644 --- a/tempodb/tempodb_test.go +++ b/tempodb/tempodb_test.go @@ -21,7 +21,6 @@ import ( "github.com/grafana/tempo/tempodb/backend/local" "github.com/grafana/tempo/tempodb/encoding" "github.com/grafana/tempo/tempodb/encoding/common" - "github.com/grafana/tempo/tempodb/pool" "github.com/grafana/tempo/tempodb/wal" ) @@ -42,10 +41,6 @@ func testConfig(enc backend.Encoding, blocklistPoll time.Duration) (Reader, Writ return New(&Config{ Backend: "local", - Pool: &pool.Config{ - MaxWorkers: 10, - QueueDepth: 100, - }, Local: &local.Config{ Path: path.Join(tempDir, "traces"), }, From 75342793c4d76f5f9b8f9e89188f64bc87693d9a Mon Sep 17 00:00:00 2001 From: Annanay Date: Wed, 19 May 2021 17:20:46 +0530 Subject: [PATCH 11/17] Fix test Signed-off-by: Annanay --- tempodb/tempodb_test.go | 50 +++++++++++++++++++---------------------- 1 file changed, 23 insertions(+), 27 deletions(-) diff --git a/tempodb/tempodb_test.go b/tempodb/tempodb_test.go index eee604cfeac..409bbb76809 100644 --- a/tempodb/tempodb_test.go +++ b/tempodb/tempodb_test.go @@ -31,15 +31,11 @@ const ( testDataEncoding = "blerg" ) -func testConfig(enc backend.Encoding, blocklistPoll time.Duration) (Reader, Writer, Compactor, error) { +func testConfig(t *testing.T, enc backend.Encoding, blocklistPoll time.Duration) (Reader, Writer, Compactor, string) { tempDir, err := ioutil.TempDir(tmpdir, "") - defer os.RemoveAll(tempDir) - - if err != nil { - return nil, nil, nil, err - } + require.NoError(t, err) - return New(&Config{ + r, w, c, err := New(&Config{ Backend: "local", Local: &local.Config{ Path: path.Join(tempDir, "traces"), @@ -56,11 +52,13 @@ func testConfig(enc backend.Encoding, blocklistPoll time.Duration) (Reader, Writ }, BlocklistPoll: blocklistPoll, }, log.NewNopLogger()) + require.NoError(t, err) + return r, w, c, tempDir } func TestDB(t *testing.T) { - r, w, c, err := testConfig(backend.EncGZIP, 0) - assert.NoError(t, err) + r, w, c, tempDir := testConfig(t, backend.EncGZIP, 0) + defer os.RemoveAll(tempDir) c.EnableCompaction(&CompactorConfig{ ChunkSizeBytes: 10, @@ -117,8 +115,8 @@ func TestBlockSharding(t *testing.T) { // push a req with some traceID // cut headblock & write to backend // search with different shards and check if its respecting the params - r, w, _, err := testConfig(backend.EncLZ4_256k, 0) - assert.NoError(t, err) + r, w, _, tempDir := testConfig(t, backend.EncLZ4_256k, 0) + defer os.RemoveAll(tempDir) // create block with known ID blockID := uuid.New() @@ -171,8 +169,8 @@ func TestBlockSharding(t *testing.T) { } func TestNilOnUnknownTenantID(t *testing.T) { - r, _, _, err := testConfig(backend.EncLZ4_256k, 0) - assert.NoError(t, err) + r, _, _, tempDir := testConfig(t, backend.EncLZ4_256k, 0) + defer os.RemoveAll(tempDir) buff, _, err := r.Find(context.Background(), "unknown", []byte{0x01}, BlockIDMin, BlockIDMax) assert.Nil(t, buff) @@ -180,8 +178,8 @@ func TestNilOnUnknownTenantID(t *testing.T) { } func TestBlockCleanup(t *testing.T) { - r, w, c, err := testConfig(backend.EncLZ4_256k, 0) - assert.NoError(t, err) + r, w, c, tempDir := testConfig(t, backend.EncLZ4_256k, 0) + defer os.RemoveAll(tempDir) c.EnableCompaction(&CompactorConfig{ ChunkSizeBytes: 10, @@ -193,7 +191,6 @@ func TestBlockCleanup(t *testing.T) { blockID := uuid.New() wal := w.WAL() - assert.NoError(t, err) head, err := wal.NewBlock(blockID, testTenantID, "") assert.NoError(t, err) @@ -245,8 +242,8 @@ func TestCleanMissingTenants(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - r, _, _, err := testConfig(backend.EncLZ4_256k, 0) - assert.NoError(t, err) + r, _, _, tempDir := testConfig(t, backend.EncLZ4_256k, 0) + defer os.RemoveAll(tempDir) rw := r.(*readerWriter) @@ -287,8 +284,8 @@ func checkBlocklists(t *testing.T, expectedID uuid.UUID, expectedB int, expected } func TestUpdateBlocklist(t *testing.T) { - r, _, _, err := testConfig(backend.EncLZ4_256k, 0) - assert.NoError(t, err) + r, _, _, tempDir := testConfig(t, backend.EncLZ4_256k, 0) + defer os.RemoveAll(tempDir) rw := r.(*readerWriter) @@ -456,8 +453,8 @@ func TestUpdateBlocklist(t *testing.T) { } func TestUpdateBlocklistCompacted(t *testing.T) { - r, _, _, err := testConfig(backend.EncLZ4_256k, 0) - assert.NoError(t, err) + r, _, _, tempDir := testConfig(t, backend.EncLZ4_256k, 0) + defer os.RemoveAll(tempDir) rw := r.(*readerWriter) @@ -759,8 +756,8 @@ func TestIncludeCompactedBlock(t *testing.T) { } func TestSearchCompactedBlocks(t *testing.T) { - r, w, c, err := testConfig(backend.EncLZ4_256k, time.Minute) - assert.NoError(t, err) + r, w, c, tempDir := testConfig(t, backend.EncLZ4_256k, time.Minute) + defer os.RemoveAll(tempDir) c.EnableCompaction(&CompactorConfig{ ChunkSizeBytes: 10, @@ -770,7 +767,6 @@ func TestSearchCompactedBlocks(t *testing.T) { }, &mockSharder{}, &mockOverrides{}) wal := w.WAL() - assert.NoError(t, err) head, err := wal.NewBlock(uuid.New(), testTenantID, "") assert.NoError(t, err) @@ -846,8 +842,8 @@ func TestSearchCompactedBlocks(t *testing.T) { } func TestCompleteBlock(t *testing.T) { - _, w, _, err := testConfig(backend.EncLZ4_256k, time.Minute) - assert.NoError(t, err) + _, w, _, tempDir := testConfig(t, backend.EncLZ4_256k, time.Minute) + defer os.RemoveAll(tempDir) wal := w.WAL() From 1d4c25357b558c587834b0b088ba670bddadff82 Mon Sep 17 00:00:00 2001 From: Annanay Date: Wed, 19 May 2021 17:40:49 +0530 Subject: [PATCH 12/17] More testing around bloom shard counts Signed-off-by: Annanay --- tempodb/encoding/common/bloom.go | 7 +++--- tempodb/encoding/common/bloom_test.go | 36 +++++++++++++++++++++++++++ 2 files changed, 40 insertions(+), 3 deletions(-) diff --git a/tempodb/encoding/common/bloom.go b/tempodb/encoding/common/bloom.go index 0560a86808e..ab812a82ed8 100644 --- a/tempodb/encoding/common/bloom.go +++ b/tempodb/encoding/common/bloom.go @@ -11,7 +11,8 @@ import ( const ( legacyShardCount = 10 - maxShardCount = math.MaxUint16 + minShardCount = 1 + maxShardCount = math.MaxUint16 ) type ShardedBloomFilter struct { @@ -31,8 +32,8 @@ func NewBloom(fp float64, shardSize, estimatedObjects uint) *ShardedBloomFilter shardCount = (m / (shardSize * 8)) + 1 } - if shardCount == 0 { - shardCount = 1 + if shardCount < minShardCount { + shardCount = minShardCount } if shardCount > maxShardCount { diff --git a/tempodb/encoding/common/bloom_test.go b/tempodb/encoding/common/bloom_test.go index 24dda40cdcc..8913d99e530 100644 --- a/tempodb/encoding/common/bloom_test.go +++ b/tempodb/encoding/common/bloom_test.go @@ -122,3 +122,39 @@ func TestShardedBloomFalsePositive(t *testing.T) { }) } } + +func TestBloomShardCount(t *testing.T) { + tests := []struct { + name string + bloomFP float64 + shardSize uint + estimatedObjects uint + expectedShards uint + }{ + { + name: "too many shards", + bloomFP: 0.01, + shardSize: 1, + estimatedObjects: 100000, + expectedShards: maxShardCount, + }, + { + name: "too few shards", + bloomFP: 0.01, + shardSize: 10, + estimatedObjects: 1, + expectedShards: minShardCount, + }, + } + + for _, tt := range tests { + tt := tt // capture range variable, needed for running test cases in parallel + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + b := NewBloom(tt.bloomFP, tt.shardSize, tt.estimatedObjects) + assert.Equal(t, int(tt.expectedShards), b.GetShardCount()) + }) + } + +} From 1d8ebe203515c06cafdb20ceb2e56dae013a6a1d Mon Sep 17 00:00:00 2001 From: Annanay Date: Wed, 19 May 2021 18:11:51 +0530 Subject: [PATCH 13/17] fix TestBlockCleanup Signed-off-by: Annanay --- tempodb/tempodb_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tempodb/tempodb_test.go b/tempodb/tempodb_test.go index 409bbb76809..ebfa8a773a8 100644 --- a/tempodb/tempodb_test.go +++ b/tempodb/tempodb_test.go @@ -205,7 +205,7 @@ func TestBlockCleanup(t *testing.T) { assert.Len(t, rw.blockLists[testTenantID], 1) - os.RemoveAll(tmpdir + "/traces/" + testTenantID) + os.RemoveAll(tempDir + "/traces/" + testTenantID) // poll rw.pollBlocklist() From a04d5799b3eacae72350badd93be60e46df1cd2e Mon Sep 17 00:00:00 2001 From: Annanay Date: Wed, 19 May 2021 18:40:04 +0530 Subject: [PATCH 14/17] CHANGELOG Signed-off-by: Annanay --- CHANGELOG.md | 1 + tempodb/encoding/common/bloom.go | 3 +++ 2 files changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5815d680820..92fc12e2701 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ * [ENHANCEMENT] Zipkin support - Dedupe span IDs based on span.Kind (client/server) in Query Frontend. [#687](https://github.com/grafana/tempo/pull/687) * [ENHANCEMENT] Reduce marshalling in the ingesters to improve performance. [#694](https://github.com/grafana/tempo/pull/694) This is kind of a **breaking change**. Rollout all ingesters before any other component to prevent dropped spans. +* [ENHANCEMENT] Allow setting the bloom filter shard size with support dynamic shard count.[#644](https://github.com/grafana/tempo/pull/644) * [CHANGE] Fix Query Frontend grpc settings to avoid noisy error log. [#690](https://github.com/grafana/tempo/pull/690) * [CHANGE] GCS SDK update v1.12.0 => v.15.0, ReadAllWithEstimate used in GCS/S3 backends. [#693](https://github.com/grafana/tempo/pull/693) diff --git a/tempodb/encoding/common/bloom.go b/tempodb/encoding/common/bloom.go index ab812a82ed8..c54a2029ad5 100644 --- a/tempodb/encoding/common/bloom.go +++ b/tempodb/encoding/common/bloom.go @@ -4,6 +4,8 @@ import ( "bytes" "math" + cortex_util "github.com/cortexproject/cortex/pkg/util/log" + "github.com/go-kit/kit/log/level" "github.com/willf/bloom" "github.com/grafana/tempo/pkg/util" @@ -38,6 +40,7 @@ func NewBloom(fp float64, shardSize, estimatedObjects uint) *ShardedBloomFilter if shardCount > maxShardCount { shardCount = maxShardCount + level.Warn(cortex_util.Logger).Log("msg", "required bloom filter shard count exceeded max. consider increasing bloom_filter_shard_size_bytes") } b := &ShardedBloomFilter{ From 7d59c14c4033daffb846029429b847869143a400 Mon Sep 17 00:00:00 2001 From: Annanay Date: Wed, 19 May 2021 20:16:58 +0530 Subject: [PATCH 15/17] Correctly set meta.TotalObjects in newAppendBlockFromFile Signed-off-by: Annanay --- tempodb/encoding/common/bloom.go | 6 +----- tempodb/wal/append_block.go | 1 + 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/tempodb/encoding/common/bloom.go b/tempodb/encoding/common/bloom.go index c54a2029ad5..4f9406dda6f 100644 --- a/tempodb/encoding/common/bloom.go +++ b/tempodb/encoding/common/bloom.go @@ -28,11 +28,7 @@ func NewBloom(fp float64, shardSize, estimatedObjects uint) *ShardedBloomFilter // k: number of hash functions var shardCount uint m, k := bloom.EstimateParameters(estimatedObjects, fp) - if m%(shardSize*8) == 0 { - shardCount = m / (shardSize * 8) - } else { - shardCount = (m / (shardSize * 8)) + 1 - } + shardCount = uint(math.Ceil(float64(m)/(float64(shardSize)*8.0))) if shardCount < minShardCount { shardCount = minShardCount diff --git a/tempodb/wal/append_block.go b/tempodb/wal/append_block.go index 6b5bb3eae06..541b4783f26 100644 --- a/tempodb/wal/append_block.go +++ b/tempodb/wal/append_block.go @@ -138,6 +138,7 @@ func newAppendBlockFromFile(filename string, path string) (*AppendBlock, error, common.SortRecords(records) + b.meta.TotalObjects = len(records) b.appender = encoding.NewRecordAppender(records) return b, warning, nil From 072460b33ba3fa4aa531dac62f0388e3e118d8ed Mon Sep 17 00:00:00 2001 From: Annanay Date: Wed, 19 May 2021 20:21:23 +0530 Subject: [PATCH 16/17] make fmt Signed-off-by: Annanay --- tempodb/encoding/common/bloom.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tempodb/encoding/common/bloom.go b/tempodb/encoding/common/bloom.go index 4f9406dda6f..d0ec490e23d 100644 --- a/tempodb/encoding/common/bloom.go +++ b/tempodb/encoding/common/bloom.go @@ -28,7 +28,7 @@ func NewBloom(fp float64, shardSize, estimatedObjects uint) *ShardedBloomFilter // k: number of hash functions var shardCount uint m, k := bloom.EstimateParameters(estimatedObjects, fp) - shardCount = uint(math.Ceil(float64(m)/(float64(shardSize)*8.0))) + shardCount = uint(math.Ceil(float64(m) / (float64(shardSize) * 8.0))) if shardCount < minShardCount { shardCount = minShardCount From b467328594371f8f30c4f2dad5b717745cf2c146 Mon Sep 17 00:00:00 2001 From: Annanay Date: Thu, 20 May 2021 13:20:48 +0530 Subject: [PATCH 17/17] reduce maxShardCount, use appender.length() over len(records) Signed-off-by: Annanay --- tempodb/encoding/common/bloom.go | 2 +- tempodb/encoding/common/bloom_test.go | 12 +++--------- tempodb/wal/append_block.go | 2 +- 3 files changed, 5 insertions(+), 11 deletions(-) diff --git a/tempodb/encoding/common/bloom.go b/tempodb/encoding/common/bloom.go index d0ec490e23d..ce0490978bb 100644 --- a/tempodb/encoding/common/bloom.go +++ b/tempodb/encoding/common/bloom.go @@ -14,7 +14,7 @@ import ( const ( legacyShardCount = 10 minShardCount = 1 - maxShardCount = math.MaxUint16 + maxShardCount = 1000 ) type ShardedBloomFilter struct { diff --git a/tempodb/encoding/common/bloom_test.go b/tempodb/encoding/common/bloom_test.go index 8913d99e530..754f5d53a1c 100644 --- a/tempodb/encoding/common/bloom_test.go +++ b/tempodb/encoding/common/bloom_test.go @@ -73,9 +73,9 @@ func TestShardedBloomFalsePositive(t *testing.T) { }{ { name: "regular", - bloomFP: 0.01, - shardSize: 100, - estimatedObjects: 1000, + bloomFP: 0.05, + shardSize: 250 * 1024, + estimatedObjects: 10_000_000, }, { name: "large estimated objects", @@ -89,12 +89,6 @@ func TestShardedBloomFalsePositive(t *testing.T) { shardSize: 100000, estimatedObjects: 10, }, - { - name: "current scale", - bloomFP: 0.05, - shardSize: 250 * 1024, - estimatedObjects: 10_000_000, - }, } for _, tt := range tests { diff --git a/tempodb/wal/append_block.go b/tempodb/wal/append_block.go index 541b4783f26..5e485728800 100644 --- a/tempodb/wal/append_block.go +++ b/tempodb/wal/append_block.go @@ -138,8 +138,8 @@ func newAppendBlockFromFile(filename string, path string) (*AppendBlock, error, common.SortRecords(records) - b.meta.TotalObjects = len(records) b.appender = encoding.NewRecordAppender(records) + b.meta.TotalObjects = b.appender.Length() return b, warning, nil }