Skip to content
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## main / unreleased

* [CHANGE] Capture and update search metrics for TraceQL [#2087](https://github.com/grafana/tempo/pull/2087) (@electron0zero)
* [CHANGE] tempo-mixin: disable auto refresh every 10 seconds [#2290](https://github.com/grafana/tempo/pull/2290) (@electron0zero)
* [CHANGE] Update tempo-mixin to show request in Resources dashboard [#2281](https://github.com/grafana/tempo/pull/2281) (@electron0zero)
* [FEATURE] New parquet based block format vParquet2 [#2244](https://github.com/grafana/tempo/pull/2244) (@stoewer)
Expand Down
10 changes: 8 additions & 2 deletions pkg/traceql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ func (e *Engine) Execute(ctx context.Context, searchReq *tempopb.SearchRequest,
defer iterator.Close()

res := &tempopb.SearchResponse{
Traces: nil,
// TODO capture and update metrics
Traces: nil,
Metrics: &tempopb.SearchMetrics{},
}
for {
Expand All @@ -96,6 +95,13 @@ func (e *Engine) Execute(ctx context.Context, searchReq *tempopb.SearchRequest,
span.SetTag("spansets_evaluated", spansetsEvaluated)
span.SetTag("spansets_found", len(res.Traces))

// Bytes can be nil when callback is no set
if fetchSpansResponse.Bytes != nil {
// InspectedBytes is used to compute query throughput and SLO metrics
res.Metrics.InspectedBytes = fetchSpansResponse.Bytes()
span.SetTag("inspectedBytes", res.Metrics.InspectedBytes)
}

return res, nil
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/traceql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ func TestEngine_Execute(t *testing.T) {
sort(response.Traces)

assert.Equal(t, expectedTraceSearchMetadata, response.Traces)

assert.Equal(t, uint64(100_00), response.Metrics.InspectedBytes)
}

func TestEngine_asTraceSearchMetadata(t *testing.T) {
Expand Down Expand Up @@ -274,6 +276,9 @@ func (m *MockSpanSetFetcher) Fetch(ctx context.Context, request FetchSpansReques
m.iterator.(*MockSpanSetIterator).filter = request.Filter
return FetchSpansResponse{
Results: m.iterator,
Bytes: func() uint64 {
return 100_00 // hardcoded in tests
},
}, nil
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/traceql/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ type SpansetIterator interface {

type FetchSpansResponse struct {
Results SpansetIterator
Bytes func() uint64
// callback to get the size of data read during Fetch
Bytes func() uint64
}

type SpansetFetcher interface {
Expand Down
2 changes: 1 addition & 1 deletion tempodb/encoding/vparquet/block_findtracebyid.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (b *backendBlock) FindTraceByID(ctx context.Context, traceID common.ID, opt
return nil, fmt.Errorf("unexpected error opening parquet file: %w", err)
}
defer func() {
span.SetTag("inspectedBytes", rr.TotalBytesRead.Load())
span.SetTag("inspectedBytes", rr.BytesRead())
}()

return findTraceByID(derivedCtx, traceID, b.meta, pf)
Expand Down
5 changes: 3 additions & 2 deletions tempodb/encoding/vparquet/block_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func (b *backendBlock) openForSearch(ctx context.Context, opts common.SearchOpti
b.openMtx.Lock()
defer b.openMtx.Unlock()

// TODO: ctx is also cached when we cache backendReaderAt, not ideal but leaving it as is for now
backendReaderAt := NewBackendReaderAt(ctx, b.r, DataFileName, b.meta.BlockID, b.meta.TenantID)

// no searches currently require bloom filters or the page index. so just add them statically
Expand Down Expand Up @@ -109,7 +110,7 @@ func (b *backendBlock) Search(ctx context.Context, req *tempopb.SearchRequest, o
if err != nil {
return nil, fmt.Errorf("unexpected error opening parquet file: %w", err)
}
defer func() { span.SetTag("inspectedBytes", rr.TotalBytesRead.Load()) }()
defer func() { span.SetTag("inspectedBytes", rr.BytesRead()) }()

// Get list of row groups to inspect. Ideally we use predicate pushdown
// here to keep only row groups that can potentially satisfy the request
Expand All @@ -120,7 +121,7 @@ func (b *backendBlock) Search(ctx context.Context, req *tempopb.SearchRequest, o
return nil, err
}
results.Metrics.InspectedBlocks++
results.Metrics.InspectedBytes += rr.TotalBytesRead.Load()
results.Metrics.InspectedBytes += rr.BytesRead()
results.Metrics.InspectedTraces += uint32(b.meta.TotalObjects)

return results, nil
Expand Down
4 changes: 2 additions & 2 deletions tempodb/encoding/vparquet/block_search_tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (b *backendBlock) SearchTags(ctx context.Context, cb common.TagCallback, op
if err != nil {
return fmt.Errorf("unexpected error opening parquet file: %w", err)
}
defer func() { span.SetTag("inspectedBytes", rr.TotalBytesRead.Load()) }()
defer func() { span.SetTag("inspectedBytes", rr.BytesRead()) }()

return searchTags(derivedCtx, cb, pf)
}
Expand Down Expand Up @@ -197,7 +197,7 @@ func (b *backendBlock) SearchTagValuesV2(ctx context.Context, tag traceql.Attrib
if err != nil {
return fmt.Errorf("unexpected error opening parquet file: %w", err)
}
defer func() { span.SetTag("inspectedBytes", rr.TotalBytesRead.Load()) }()
defer func() { span.SetTag("inspectedBytes", rr.BytesRead()) }()

return searchTagValues(derivedCtx, tag, cb, pf)
}
Expand Down
6 changes: 3 additions & 3 deletions tempodb/encoding/vparquet/block_traceql.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ const (
columnPathSpanStartTime = "rs.ils.Spans.StartUnixNanos"
columnPathSpanEndTime = "rs.ils.Spans.EndUnixNanos"
columnPathSpanKind = "rs.ils.Spans.Kind"
//columnPathSpanDuration = "rs.ils.Spans.DurationNanos"
// columnPathSpanDuration = "rs.ils.Spans.DurationNanos"
columnPathSpanStatusCode = "rs.ils.Spans.StatusCode"
columnPathSpanAttrKey = "rs.ils.Spans.Attrs.Key"
columnPathSpanAttrString = "rs.ils.Spans.Attrs.Value"
Expand Down Expand Up @@ -175,7 +175,7 @@ func (b *backendBlock) Fetch(ctx context.Context, req traceql.FetchSpansRequest,

return traceql.FetchSpansResponse{
Results: iter,
Bytes: func() uint64 { return rr.TotalBytesRead.Load() },
Bytes: func() uint64 { return rr.BytesRead() },
}, nil
}

Expand Down Expand Up @@ -1076,7 +1076,7 @@ func (c *spanCollector) KeepGroup(res *parquetquery.IteratorResult) bool {
span.endtimeUnixNanos = endTimeUnixNanos
case columnPathSpanName:
span.attributes[traceql.NewIntrinsic(traceql.IntrinsicName)] = traceql.NewStaticString(kv.Value.String())
//case columnPathSpanDuration:
// case columnPathSpanDuration:
// span.Attributes[traceql.NewIntrinsic(traceql.IntrinsicDuration)] = traceql.NewStaticDuration(time.Duration(kv.Value.Uint64()))
case columnPathSpanStatusCode:
// Map OTLP status code back to TraceQL enum.
Expand Down
33 changes: 31 additions & 2 deletions tempodb/encoding/vparquet/readers.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type BackendReaderAt struct {
blockID uuid.UUID
tenantID string

TotalBytesRead atomic.Uint64
bytesRead atomic.Uint64
}

var _ io.ReaderAt = (*BackendReaderAt)(nil)
Expand All @@ -37,7 +37,7 @@ func NewBackendReaderAt(ctx context.Context, r backend.Reader, name string, bloc
}

func (b *BackendReaderAt) ReadAt(p []byte, off int64) (int, error) {
b.TotalBytesRead.Add(uint64(len(p)))
b.bytesRead.Add(uint64(len(p)))
err := b.r.ReadRange(b.ctx, b.name, b.blockID, b.tenantID, uint64(off), p, false)
if err != nil {
return 0, err
Expand All @@ -53,6 +53,10 @@ func (b *BackendReaderAt) ReadAtWithCache(p []byte, off int64) (int, error) {
return len(p), err
}

func (b *BackendReaderAt) BytesRead() uint64 {
return b.bytesRead.Load()
}

// parquetOptimizedReaderAt is used to cheat a few parquet calls. By default when opening a
// file parquet always requests the magic number and then the footer length. We can save
// both of these calls from going to the backend.
Expand Down Expand Up @@ -128,3 +132,28 @@ func (r *cachedReaderAt) ReadAt(p []byte, off int64) (int, error) {

return r.r.ReadAt(p, off)
}

// walReaderAt is wrapper over io.ReaderAt, and is used to measure the total bytes read when searching walBlock.
type walReaderAt struct {
r io.ReaderAt

bytesRead atomic.Uint64
}

var _ io.ReaderAt = (*walReaderAt)(nil)

func newWalReaderAt(r io.ReaderAt) *walReaderAt {
return &walReaderAt{r, atomic.Uint64{}}
}

func (wr *walReaderAt) ReadAt(p []byte, off int64) (int, error) {
// parquet-go will call ReadAt when reading data from a parquet file.
n, err := wr.r.ReadAt(p, off)
// ReadAt can read less than len(p) bytes in some cases
wr.bytesRead.Add(uint64(n))
return n, err
}

func (wr *walReaderAt) BytesRead() uint64 {
return wr.bytesRead.Load()
}
52 changes: 32 additions & 20 deletions tempodb/encoding/vparquet/wal_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@ import (
"time"

"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/segmentio/parquet-go"

"github.com/grafana/dskit/multierror"
"github.com/grafana/tempo/pkg/model"
"github.com/grafana/tempo/pkg/model/trace"
Expand All @@ -25,6 +22,8 @@ import (
"github.com/grafana/tempo/pkg/warnings"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/encoding/common"
"github.com/pkg/errors"
"github.com/segmentio/parquet-go"
)

var _ common.WALBlock = (*walBlock)(nil)
Expand Down Expand Up @@ -219,16 +218,17 @@ func (w *walBlockFlush) file() (*pageFile, error) {
if err != nil {
return nil, fmt.Errorf("error getting file info: %w", err)
}
sz := info.Size()
pf, err := parquet.OpenFile(file, sz, parquet.SkipBloomFilters(true), parquet.SkipPageIndex(true), parquet.FileSchema(walSchema))
size := info.Size()

wr := newWalReaderAt(file)
pf, err := parquet.OpenFile(wr, size, parquet.SkipBloomFilters(true), parquet.SkipPageIndex(true), parquet.FileSchema(walSchema))
if err != nil {
return nil, fmt.Errorf("error opening parquet file: %w", err)
}

f := &pageFile{parquetFile: pf, osFile: file}
f := &pageFile{parquetFile: pf, osFile: file, r: wr}

return f, nil

}

func (w *walBlockFlush) rowIterator() (*rowIterator, error) {
Expand All @@ -246,6 +246,7 @@ func (w *walBlockFlush) rowIterator() (*rowIterator, error) {

type pageFile struct {
parquetFile *parquet.File
r *walReaderAt
osFile *os.File
}

Expand Down Expand Up @@ -540,10 +541,10 @@ func (b *walBlock) Search(ctx context.Context, req *tempopb.SearchRequest, opts
},
}

for i, page := range b.readFlushes() {
file, err := page.file()
for i, blockFlush := range b.readFlushes() {
file, err := blockFlush.file()
if err != nil {
return nil, fmt.Errorf("error opening file %s: %w", page.path, err)
return nil, fmt.Errorf("error opening file %s: %w", blockFlush.path, err)
}

defer file.Close()
Expand All @@ -555,7 +556,7 @@ func (b *walBlock) Search(ctx context.Context, req *tempopb.SearchRequest, opts
}

results.Traces = append(results.Traces, r.Traces...)
results.Metrics.InspectedBytes += uint64(pf.Size())
results.Metrics.InspectedBytes += file.r.BytesRead()
results.Metrics.InspectedTraces += uint32(pf.NumRows())
if len(results.Traces) >= int(req.Limit) {
break
Expand All @@ -566,10 +567,10 @@ func (b *walBlock) Search(ctx context.Context, req *tempopb.SearchRequest, opts
}

func (b *walBlock) SearchTags(ctx context.Context, cb common.TagCallback, opts common.SearchOptions) error {
for i, page := range b.readFlushes() {
file, err := page.file()
for i, blockFlush := range b.readFlushes() {
file, err := blockFlush.file()
if err != nil {
return fmt.Errorf("error opening file %s: %w", page.path, err)
return fmt.Errorf("error opening file %s: %w", blockFlush.path, err)
}

defer file.Close()
Expand Down Expand Up @@ -600,10 +601,10 @@ func (b *walBlock) SearchTagValues(ctx context.Context, tag string, cb common.Ta
}

func (b *walBlock) SearchTagValuesV2(ctx context.Context, tag traceql.Attribute, cb common.TagCallbackV2, opts common.SearchOptions) error {
for i, page := range b.readFlushes() {
file, err := page.file()
for i, blockFlush := range b.readFlushes() {
file, err := blockFlush.file()
if err != nil {
return fmt.Errorf("error opening file %s: %w", page.path, err)
return fmt.Errorf("error opening file %s: %w", blockFlush.path, err)
}

defer file.Close()
Expand All @@ -625,9 +626,11 @@ func (b *walBlock) Fetch(ctx context.Context, req traceql.FetchSpansRequest, opt
return traceql.FetchSpansResponse{}, errors.Wrap(err, "conditions invalid")
}

pages := b.readFlushes()
iters := make([]traceql.SpansetIterator, 0, len(pages))
for _, page := range pages {
blockFlushes := b.readFlushes()
// collect page readers to compute totalBytesRead
readers := make([]*walReaderAt, 0, len(blockFlushes))
iters := make([]traceql.SpansetIterator, 0, len(blockFlushes))
for _, page := range blockFlushes {
file, err := page.file()
if err != nil {
return traceql.FetchSpansResponse{}, fmt.Errorf("error opening file %s: %w", page.path, err)
Expand All @@ -642,13 +645,22 @@ func (b *walBlock) Fetch(ctx context.Context, req traceql.FetchSpansRequest, opt

wrappedIterator := &pageFileClosingIterator{iter: iter, pageFile: file}
iters = append(iters, wrappedIterator)
readers = append(readers, file.r)
}

// combine iters?
return traceql.FetchSpansResponse{
Results: &mergeSpansetIterator{
iters: iters,
},
Bytes: func() uint64 {
// read value when callback is called
var totalBytesRead uint64
for _, r := range readers {
totalBytesRead += r.BytesRead()
}
return totalBytesRead
Comment thread
mdisibio marked this conversation as resolved.
},
}, nil
}

Expand Down
5 changes: 1 addition & 4 deletions tempodb/encoding/vparquet2/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/encoding/common"
"github.com/segmentio/parquet-go"
)

const (
Expand All @@ -16,9 +15,7 @@ type backendBlock struct {
meta *backend.BlockMeta
r backend.Reader

openMtx sync.Mutex
pf *parquet.File
readerAt *BackendReaderAt
openMtx sync.Mutex
}

var _ common.BackendBlock = (*backendBlock)(nil)
Expand Down
2 changes: 1 addition & 1 deletion tempodb/encoding/vparquet2/block_findtracebyid.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (b *backendBlock) FindTraceByID(ctx context.Context, traceID common.ID, opt
return nil, fmt.Errorf("unexpected error opening parquet file: %w", err)
}
defer func() {
span.SetTag("inspectedBytes", rr.TotalBytesRead.Load())
span.SetTag("inspectedBytes", rr.BytesRead())
}()

return findTraceByID(derivedCtx, traceID, b.meta, pf)
Expand Down
Loading