Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
.DS_Store
.idea
.vscode
*.test
Comment thread
joe-elliott marked this conversation as resolved.
/bin
/cmd/tempo-cli/tempo-cli
/cmd/tempo-serverless/vendor/
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* [FEATURE] New encoding vParquet3 with support for dedicated attribute columns (@mapno, @stoewer) [#2649](https://github.com/grafana/tempo/pull/2649)
* [FEATURE] Add filtering support to Generic Forwarding [#2742](https://github.com/grafana/tempo/pull/2742) (@Blinkuu)
* [CHANGE] Update Go to 1.21 [#2486](https://github.com/grafana/tempo/pull/2829) (@zalegrala)
* [ENHANCEMENT] Add block indexes to vParquet2 and vParquet3 to improve trace by ID lookup [#2697](https://github.com/grafana/tempo/pull/2697) (@mdisibio)
* [ENHANCEMENT] Assert ingestion rate limits as early as possible [#2640](https://github.com/grafana/tempo/pull/2703) (@mghildiy)
* [ENHANCEMENT] Add several metrics-generator fields to user-configurable overrides [#2711](https://github.com/grafana/tempo/pull/2711) (@kvrhdn)
* [ENHANCEMENT] Update /api/metrics/summary to correctly handle missing attributes and improve performance of TraceQL `select()` queries. [#2765](https://github.com/grafana/tempo/pull/2765) (@mdisibio)
Expand Down
183 changes: 118 additions & 65 deletions tempodb/encoding/vparquet2/block_findtracebyid.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"io"
"os"

"github.com/opentracing/opentracing-go"
"github.com/parquet-go/parquet-go"
Expand All @@ -25,6 +26,9 @@ const (
NotFound = -3

TraceIDColumnName = "TraceID"

EnvVarIndexName = "VPARQUET_INDEX"
EnvVarIndexDisabledValue = "0"
)

func (b *backendBlock) checkBloom(ctx context.Context, id common.ID) (found bool, err error) {
Expand Down Expand Up @@ -53,6 +57,41 @@ func (b *backendBlock) checkBloom(ctx context.Context, id common.ID) (found bool
return filter.Test(id), nil
}

func (b *backendBlock) checkIndex(ctx context.Context, id common.ID) (bool, int, error) {
if os.Getenv(EnvVarIndexName) == EnvVarIndexDisabledValue {
// Index lookup disabled
return true, -1, nil
}

span, derivedCtx := opentracing.StartSpanFromContext(ctx, "parquet.backendBlock.checkIndex",
opentracing.Tags{
"blockID": b.meta.BlockID,
"tenantID": b.meta.TenantID,
})
defer span.Finish()

indexBytes, err := b.r.Read(derivedCtx, common.NameIndex, b.meta.BlockID, b.meta.TenantID, true)
if err == backend.ErrDoesNotExist {
return true, -1, nil
}
if err != nil {
return false, -1, fmt.Errorf("error retrieving index (%s, %s): %w", b.meta.TenantID, b.meta.BlockID, err)
}

index, err := unmarshalIndex(indexBytes)
if err != nil {
return false, -1, fmt.Errorf("error parsing index (%s, %s): %w", b.meta.TenantID, b.meta.BlockID, err)
}

rowGroup := index.Find(id)
if rowGroup == -1 {
// Ruled out by index
return false, -1, nil
}

return true, rowGroup, nil
}

func (b *backendBlock) FindTraceByID(ctx context.Context, traceID common.ID, opts common.SearchOptions) (_ *tempopb.Trace, err error) {
span, derivedCtx := opentracing.StartSpanFromContext(ctx, "parquet.backendBlock.FindTraceByID",
opentracing.Tags{
Expand All @@ -70,6 +109,14 @@ func (b *backendBlock) FindTraceByID(ctx context.Context, traceID common.ID, opt
return nil, nil
}

ok, rowGroup, err := b.checkIndex(ctx, traceID)
if err != nil {
return nil, err
}
if !ok {
return nil, nil
}

pf, rr, err := b.openForSearch(derivedCtx, opts)
if err != nil {
return nil, fmt.Errorf("unexpected error opening parquet file: %w", err)
Expand All @@ -78,85 +125,91 @@ func (b *backendBlock) FindTraceByID(ctx context.Context, traceID common.ID, opt
span.SetTag("inspectedBytes", rr.BytesRead())
}()

return findTraceByID(derivedCtx, traceID, b.meta, pf)
return findTraceByID(derivedCtx, traceID, b.meta, pf, rowGroup)
}

func findTraceByID(ctx context.Context, traceID common.ID, meta *backend.BlockMeta, pf *parquet.File) (*tempopb.Trace, error) {
func findTraceByID(ctx context.Context, traceID common.ID, meta *backend.BlockMeta, pf *parquet.File, rowGroup int) (*tempopb.Trace, error) {
// traceID column index
colIndex, _ := pq.GetColumnIndexByPath(pf, TraceIDColumnName)
if colIndex == -1 {
return nil, fmt.Errorf("unable to get index for column: %s", TraceIDColumnName)
}

numRowGroups := len(pf.RowGroups())
buf := make(parquet.Row, 1)

// Cache of row group bounds
rowGroupMins := make([]common.ID, numRowGroups+1)
// todo: restore using meta min/max id once it works
// https://github.com/grafana/tempo/issues/1903
rowGroupMins[0] = bytes.Repeat([]byte{0}, 16)
rowGroupMins[numRowGroups] = bytes.Repeat([]byte{255}, 16) // This is actually inclusive and the logic is special for the last row group below
// If no index then fallback to binary searching the rowgroups.
if rowGroup == -1 {
var (
numRowGroups = len(pf.RowGroups())
buf = make(parquet.Row, 1)
err error
)

// Gets the minimum trace ID within the row group. Since the column is sorted
// ascending we just read the first value from the first page.
getRowGroupMin := func(rgIdx int) (common.ID, error) {
min := rowGroupMins[rgIdx]
if len(min) > 0 {
// Already loaded
// Cache of row group bounds
rowGroupMins := make([]common.ID, numRowGroups+1)
// todo: restore using meta min/max id once it works
// https://github.com/grafana/tempo/issues/1903
rowGroupMins[0] = bytes.Repeat([]byte{0}, 16)
rowGroupMins[numRowGroups] = bytes.Repeat([]byte{255}, 16) // This is actually inclusive and the logic is special for the last row group below

// Gets the minimum trace ID within the row group. Since the column is sorted
// ascending we just read the first value from the first page.
getRowGroupMin := func(rgIdx int) (common.ID, error) {
min := rowGroupMins[rgIdx]
if len(min) > 0 {
// Already loaded
return min, nil
}

pages := pf.RowGroups()[rgIdx].ColumnChunks()[colIndex].Pages()
defer pages.Close()

page, err := pages.ReadPage()
if err != nil {
return nil, err
}

c, err := page.Values().ReadValues(buf)
if err != nil && err != io.EOF {
return nil, err
}
if c < 1 {
return nil, fmt.Errorf("failed to read value from page: traceID: %s blockID:%v rowGroupIdx:%d", util.TraceIDToHexString(traceID), meta.BlockID, rgIdx)
}

min = buf[0].ByteArray()
rowGroupMins[rgIdx] = min
return min, nil
}

pages := pf.RowGroups()[rgIdx].ColumnChunks()[colIndex].Pages()
defer pages.Close()

page, err := pages.ReadPage()
if err != nil {
return nil, err
}

c, err := page.Values().ReadValues(buf)
if err != nil && err != io.EOF {
return nil, err
}
if c < 1 {
return nil, fmt.Errorf("failed to read value from page: traceID: %s blockID:%v rowGroupIdx:%d", util.TraceIDToHexString(traceID), meta.BlockID, rgIdx)
}

min = buf[0].ByteArray()
rowGroupMins[rgIdx] = min
return min, nil
}

rowGroup, err := binarySearch(numRowGroups, func(rgIdx int) (int, error) {
min, err := getRowGroupMin(rgIdx)
if err != nil {
return 0, err
}

if check := bytes.Compare(traceID, min); check <= 0 {
// Trace is before or in this group
return check, nil
}

max, err := getRowGroupMin(rgIdx + 1)
rowGroup, err = binarySearch(numRowGroups, func(rgIdx int) (int, error) {
min, err := getRowGroupMin(rgIdx)
if err != nil {
return 0, err
}

if check := bytes.Compare(traceID, min); check <= 0 {
// Trace is before or in this group
return check, nil
}

max, err := getRowGroupMin(rgIdx + 1)
if err != nil {
return 0, err
}

// This is actually the min of the next group, so check is exclusive not inclusive like min
// Except for the last group, it is inclusive
check := bytes.Compare(traceID, max)
if check > 0 || (check == 0 && rgIdx < (numRowGroups-1)) {
// Trace is after this group
return 1, nil
}

// Must be in this group
return 0, nil
})
if err != nil {
return 0, err
}

// This is actually the min of the next group, so check is exclusive not inclusive like min
// Except for the last group, it is inclusive
check := bytes.Compare(traceID, max)
if check > 0 || (check == 0 && rgIdx < (numRowGroups-1)) {
// Trace is after this group
return 1, nil
return nil, errors.Wrap(err, "error binary searching row groups")
}

// Must be in this group
return 0, nil
})
if err != nil {
return nil, errors.Wrap(err, "error binary searching row groups")
}

if rowGroup == -1 {
Expand Down
34 changes: 30 additions & 4 deletions tempodb/encoding/vparquet2/block_findtracebyid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,28 +143,54 @@ func TestBackendBlockFindTraceByID_TestData(t *testing.T) {
}
}

/*func genIndex(b *testing.B, block *backendBlock) *index {
pf, _, err := block.openForSearch(context.TODO(), common.DefaultSearchOptions())
require.NoError(b, err)

i := &index{}

for j := range pf.RowGroups() {
iter := parquetquery.NewSyncIterator(context.TODO(), pf.RowGroups()[j:j+1], 0, "", 1000, nil, "TraceID")
defer iter.Close()

for {
v, err := iter.Next()
require.NoError(b, err)
if v == nil {
break
}

i.Add(v.Entries[0].Value.ByteArray())
}
i.Flush()
}

return i
}*/
Comment thread
mdisibio marked this conversation as resolved.

func BenchmarkFindTraceByID(b *testing.B) {
ctx := context.TODO()
tenantID := "1"
blockID := uuid.MustParse("3685ee3d-cbbf-4f36-bf28-93447a19dea6")
// blockID := uuid.MustParse("1a2d50d7-f10e-41f0-850d-158b19ead23d")
blockID := uuid.MustParse("1f38c153-b798-4bba-b4f4-cc60633e5cab")

r, _, _, err := local.New(&local.Config{
Path: path.Join("/Users/marty/src/tmp/"),
})
require.NoError(b, err)

rr := backend.NewReader(r)
// ww := backend.NewWriter(w)

meta, err := rr.BlockMeta(ctx, blockID, tenantID)
require.NoError(b, err)

traceID := meta.MinID
// traceID, err := util.HexStringToTraceID("1a029f7ace79c7f2")
// require.NoError(b, err)

block := newBackendBlock(meta, rr)

// idx := genIndex(b, block)
// writeBlockMeta(ctx, ww, block.meta, &common.ShardedBloomFilter{}, idx)

b.ResetTimer()

for i := 0; i < b.N; i++ {
Expand Down
18 changes: 17 additions & 1 deletion tempodb/encoding/vparquet2/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,18 @@ func CopyBlock(ctx context.Context, fromMeta, toMeta *backend.BlockMeta, from ba
}
}

// Index (may not exist)
err = cpy(common.NameIndex)
if err != nil && err != backend.ErrDoesNotExist {
return err
}

// Meta
err = to.WriteBlockMeta(ctx, toMeta)
return err
}

func writeBlockMeta(ctx context.Context, w backend.Writer, meta *backend.BlockMeta, bloom *common.ShardedBloomFilter) error {
func writeBlockMeta(ctx context.Context, w backend.Writer, meta *backend.BlockMeta, bloom *common.ShardedBloomFilter, index *index) error {
// bloom
blooms, err := bloom.Marshal()
if err != nil {
Expand All @@ -65,6 +71,16 @@ func writeBlockMeta(ctx context.Context, w backend.Writer, meta *backend.BlockMe
}
}

// Index
i, err := index.Marshal()
if err != nil {
return err
}
err = w.Write(ctx, common.NameIndex, meta.BlockID, meta.TenantID, i, true)
if err != nil {
return err
}

// meta
err = w.WriteBlockMeta(ctx, meta)
if err != nil {
Expand Down
Loading