diff --git a/CHANGELOG.md b/CHANGELOG.md index 8eb7fb028ea..56b9ff39fff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ * [FEATURE] Add query_end_cutoff setting to clamp near-“now” queries (default 30s) for consistent Tempo results [#5682](https://github.com/grafana/tempo/pull/5682) (@javiermolinar) * [FEATURE] New block encoding vParquet5-preview4 with array support for dedicated columns. This is a preview, breaking changes are expected. [#5760](https://github.com/grafana/tempo/pull/5760) (@stoewer) * [FEATURE] New block encoding vParquet5-preview5 with virtual span row numbers. This is a preview, breaking changes are expected. [#5943](https://github.com/grafana/tempo/pull/5943) (@stoewer) +* [FEATURE] New block encoding vParquet5-preview6 with dedicated columns for events and improved support for high-cardinality/high-length "blob" attributes [#5946](https://github.com/grafana/tempo/pull/5946) (@mdisibio) * [FEATURE] Add SSE-C encryption support to S3 backend [#5789](https://github.com/grafana/tempo/pull/5789) (@steffsas) * [FEATURE] Add support for `Accept: application/vnd.grafana.llm` to Tempo endpoints. Currently supported directly by trace by id and tag values [#5961](https://github.com/grafana/tempo/pull/5961) (@joe-elliott) This response is subject to change and should not be relied on. It is intended for LLM consumption only. Even a fundamental change to its representation (yaml? markdown?) would not be considered breaking. diff --git a/cmd/tempo-cli/cmd-analyse-block.go b/cmd/tempo-cli/cmd-analyse-block.go index 736ed49df49..6ae2e020504 100644 --- a/cmd/tempo-cli/cmd-analyse-block.go +++ b/cmd/tempo-cli/cmd-analyse-block.go @@ -5,11 +5,10 @@ import ( "errors" "fmt" "io" - "maps" "os" - "slices" "sort" "strconv" + "strings" "sync" "text/tabwriter" "time" @@ -24,11 +23,13 @@ import ( "github.com/grafana/tempo/tempodb/encoding/vparquet2" "github.com/grafana/tempo/tempodb/encoding/vparquet3" "github.com/grafana/tempo/tempodb/encoding/vparquet4" + "github.com/grafana/tempo/tempodb/encoding/vparquet5" ) type attributePaths struct { - span scopeAttributePath - res scopeAttributePath + span scopeAttributePath + res scopeAttributePath + event scopeAttributePath } type scopeAttributePath struct { @@ -36,6 +37,7 @@ type scopeAttributePath struct { keyPath string valPath string isArrayPath string + dedicatedColScope backend.DedicatedColumnScope dedicatedColsPaths []string } @@ -60,12 +62,14 @@ func pathsForVersion(v string) attributePaths { defLevel: vparquet3.DefinitionLevelResourceSpansILSSpanAttrs, keyPath: vparquet3.FieldSpanAttrKey, valPath: vparquet3.FieldSpanAttrVal, + dedicatedColScope: backend.DedicatedColumnScopeSpan, dedicatedColsPaths: vparquet3.DedicatedResourceColumnPaths[backend.DedicatedColumnScopeSpan][backend.DedicatedColumnTypeString], }, res: scopeAttributePath{ defLevel: vparquet3.DefinitionLevelResourceAttrs, keyPath: vparquet3.FieldResourceAttrKey, valPath: vparquet3.FieldResourceAttrVal, + dedicatedColScope: backend.DedicatedColumnScopeResource, dedicatedColsPaths: vparquet3.DedicatedResourceColumnPaths[backend.DedicatedColumnScopeResource][backend.DedicatedColumnTypeString], }, } @@ -76,6 +80,7 @@ func pathsForVersion(v string) attributePaths { keyPath: vparquet4.FieldSpanAttrKey, valPath: vparquet4.FieldSpanAttrVal, isArrayPath: vparquet4.FieldSpanAttrIsArray, + dedicatedColScope: backend.DedicatedColumnScopeSpan, dedicatedColsPaths: vparquet4.DedicatedResourceColumnPaths[backend.DedicatedColumnScopeSpan][backend.DedicatedColumnTypeString], }, res: scopeAttributePath{ @@ -83,8 +88,42 @@ func pathsForVersion(v string) attributePaths { keyPath: vparquet4.FieldResourceAttrKey, valPath: vparquet4.FieldResourceAttrVal, isArrayPath: vparquet4.FieldResourceAttrIsArray, + dedicatedColScope: backend.DedicatedColumnScopeResource, dedicatedColsPaths: vparquet4.DedicatedResourceColumnPaths[backend.DedicatedColumnScopeResource][backend.DedicatedColumnTypeString], }, + event: scopeAttributePath{ + defLevel: vparquet4.DefinitionLevelResourceSpansILSSpanEventAttrs, + keyPath: vparquet4.FieldEventAttrKey, + valPath: vparquet4.FieldEventAttrVal, + isArrayPath: vparquet4.FieldEventAttrIsArray, + }, + } + case vparquet5.VersionString: + return attributePaths{ + span: scopeAttributePath{ + defLevel: vparquet5.DefinitionLevelResourceSpansILSSpanAttrs, + keyPath: vparquet5.FieldSpanAttrKey, + valPath: vparquet5.FieldSpanAttrVal, + isArrayPath: vparquet5.FieldSpanAttrIsArray, + dedicatedColScope: backend.DedicatedColumnScopeSpan, + dedicatedColsPaths: vparquet5.DedicatedResourceColumnPaths[backend.DedicatedColumnScopeSpan][backend.DedicatedColumnTypeString], + }, + res: scopeAttributePath{ + defLevel: vparquet5.DefinitionLevelResourceAttrs, + keyPath: vparquet5.FieldResourceAttrKey, + valPath: vparquet5.FieldResourceAttrVal, + isArrayPath: vparquet5.FieldResourceAttrIsArray, + dedicatedColScope: backend.DedicatedColumnScopeResource, + dedicatedColsPaths: vparquet5.DedicatedResourceColumnPaths[backend.DedicatedColumnScopeResource][backend.DedicatedColumnTypeString], + }, + event: scopeAttributePath{ + defLevel: vparquet5.DefinitionLevelResourceSpansILSSpanEventAttrs, + keyPath: vparquet5.FieldEventAttrKey, + valPath: vparquet5.FieldEventAttrVal, + isArrayPath: vparquet5.FieldEventAttrIsArray, + dedicatedColScope: backend.DedicatedColumnScopeEvent, + dedicatedColsPaths: vparquet5.DedicatedResourceColumnPaths[backend.DedicatedColumnScopeEvent][backend.DedicatedColumnTypeString], + }, } default: panic("unsupported version") @@ -97,12 +136,19 @@ type analyseBlockCmd struct { TenantID string `arg:"" help:"tenant-id within the bucket"` BlockID string `arg:"" help:"block ID to list"` NumAttr int `help:"Number of attributes to display" default:"15"` + BlobThreshold string `help:"Convert column to blob when dictionary size reaches this value. Disable with 0" default:"4MiB"` GenerateJsonnet bool `help:"Generate overrides Jsonnet for dedicated columns"` + GenerateCliArgs bool `help:"Generate textual args for passing to parquet conversion command"` SimpleSummary bool `help:"Print only single line of top attributes" default:"false"` PrintFullSummary bool `help:"Print full summary of the analysed block" default:"true"` } func (cmd *analyseBlockCmd) Run(ctx *globalOptions) error { + blobBytes, err := humanize.ParseBytes(cmd.BlobThreshold) + if err != nil { + return err + } + r, _, _, err := loadBackend(&cmd.backendOptions, ctx) if err != nil { return err @@ -120,7 +166,7 @@ func (cmd *analyseBlockCmd) Run(ctx *globalOptions) error { return errors.New("failed to process block") } - return blockSum.print(cmd.NumAttr, cmd.GenerateJsonnet, cmd.SimpleSummary, cmd.PrintFullSummary) + return blockSum.print(cmd.NumAttr, cmd.GenerateJsonnet, cmd.SimpleSummary, cmd.PrintFullSummary, cmd.GenerateCliArgs, blobBytes) } func processBlock(r backend.Reader, tenantID, blockID string, maxStartTime, minStartTime time.Time, minCompactionLvl uint32) (*blockSummary, error) { @@ -150,6 +196,8 @@ func processBlock(r backend.Reader, tenantID, blockID string, maxStartTime, minS reader = vparquet3.NewBackendReaderAt(context.Background(), r, vparquet3.DataFileName, meta) case vparquet4.VersionString: reader = vparquet4.NewBackendReaderAt(context.Background(), r, vparquet4.DataFileName, meta) + case vparquet5.VersionString: + reader = vparquet5.NewBackendReaderAt(context.Background(), r, vparquet5.DataFileName, meta) default: fmt.Println("Unsupported block version:", meta.Version) return nil, nil @@ -166,90 +214,201 @@ func processBlock(r backend.Reader, tenantID, blockID string, maxStartTime, minS paths := pathsForVersion(meta.Version) - // Aggregate span attributes - spanAttrsSummary, err := aggregateAttributes(pf, paths.span.defLevel, paths.span.keyPath, paths.span.valPath, paths.span.isArrayPath) + spanSummary, err := aggregateScope(pf, meta, paths.span) if err != nil { return nil, err } - // add up dedicated span attribute columns - spanDedicatedSummary, err := aggregateDedicatedColumns(pf, backend.DedicatedColumnScopeSpan, meta, paths.span.dedicatedColsPaths) + resSummary, err := aggregateScope(pf, meta, paths.res) if err != nil { return nil, err } - // merge dedicated with span attributes - for k, v := range spanDedicatedSummary.attributes { - spanAttrsSummary.attributes[k] = v - spanAttrsSummary.dedicated[k] = struct{}{} - } - spanAttrsSummary.totalBytes += spanDedicatedSummary.totalBytes - // Aggregate resource attributes - resourceAttrsSummary, err := aggregateAttributes(pf, paths.res.defLevel, paths.res.keyPath, paths.res.valPath, paths.res.isArrayPath) + eventSummary, err := aggregateScope(pf, meta, paths.event) if err != nil { return nil, err } - // add up dedicated resource attribute columns - resourceDedicatedSummary, err := aggregateDedicatedColumns(pf, backend.DedicatedColumnScopeResource, meta, paths.res.dedicatedColsPaths) + return &blockSummary{ + numRowGroups: len(pf.RowGroups()), + spanSummary: spanSummary, + resourceSummary: resSummary, + eventSummary: eventSummary, + }, nil +} + +func aggregateScope(pf *parquet.File, meta *backend.BlockMeta, paths scopeAttributePath) (attributeSummary, error) { + res, err := aggregateAttributes(pf, paths.defLevel, paths.keyPath, paths.valPath, paths.isArrayPath) if err != nil { - return nil, err + return res, err } - // merge dedicated with span attributes - for k, v := range resourceDedicatedSummary.attributes { - resourceAttrsSummary.attributes[k] = v - resourceAttrsSummary.dedicated[k] = struct{}{} + + if len(paths.dedicatedColsPaths) > 0 { + dedicatedData, err := aggregateDedicatedColumns(pf, paths.dedicatedColScope, meta, paths.dedicatedColsPaths) + if err != nil { + return res, err + } + // merge dedicated with span attributes + res.dedicated = make(map[string]struct{}, len(dedicatedData.attributes)) + for k, v := range dedicatedData.attributes { + res.attributes[k] = v + res.dedicated[k] = struct{}{} + } } - resourceAttrsSummary.totalBytes += spanDedicatedSummary.totalBytes - return &blockSummary{ - spanSummary: spanAttrsSummary, - resourceSummary: resourceAttrsSummary, - }, nil + return res, nil } type blockSummary struct { - spanSummary, resourceSummary genericAttrSummary + spanSummary attributeSummary + resourceSummary attributeSummary + eventSummary attributeSummary + numRowGroups int +} + +func (s *blockSummary) add(other blockSummary) { + s.numRowGroups += other.numRowGroups + s.spanSummary.add(other.spanSummary) + s.resourceSummary.add(other.resourceSummary) + s.eventSummary.add(other.eventSummary) } -func (s *blockSummary) print(maxAttr int, generateJsonnet, simpleSummary, printFullSummary bool) error { +func (s blockSummary) print(maxAttr int, generateJsonnet, simpleSummary, printFullSummary, generateCliArgs bool, blobBytes uint64) error { if printFullSummary { - if err := printSummary("span", maxAttr, s.spanSummary, false); err != nil { + if err := printSummary("span", maxAttr, s.spanSummary, false, s.numRowGroups, blobBytes); err != nil { + return err + } + + if err := printSummary("resource", maxAttr, s.resourceSummary, false, s.numRowGroups, blobBytes); err != nil { return err } - if err := printSummary("resource", maxAttr, s.resourceSummary, false); err != nil { + if err := printSummary("event", maxAttr, s.eventSummary, false, s.numRowGroups, blobBytes); err != nil { return err } } if simpleSummary { - if err := printSummary("span", maxAttr, s.spanSummary, true); err != nil { + if err := printSummary("span", maxAttr, s.spanSummary, true, s.numRowGroups, blobBytes); err != nil { return err } - if err := printSummary("resource", maxAttr, s.resourceSummary, true); err != nil { + if err := printSummary("resource", maxAttr, s.resourceSummary, true, s.numRowGroups, blobBytes); err != nil { return err } } if generateJsonnet { - printDedicatedColumnOverridesJsonnet(s.spanSummary, s.resourceSummary) + printDedicatedColumnOverridesJsonnet(s, maxAttr, s.numRowGroups, blobBytes) + } + + if generateCliArgs { + printCliArgs(s, maxAttr, s.numRowGroups, blobBytes) } return nil } -type genericAttrSummary struct { - totalBytes uint64 - attributes map[string]uint64 // key: attribute name, value: total bytes - skipped []string - dedicated map[string]struct{} +type attributeSummary struct { + attributes map[string]*attribute // key: attribute name + arrayAttributes map[string]*attribute // key: attribute name + dedicated map[string]struct{} +} + +func (a *attributeSummary) add(other attributeSummary) { + // Initialize if needed + if a.attributes == nil { + a.attributes = make(map[string]*attribute) + } + if a.arrayAttributes == nil { + a.arrayAttributes = make(map[string]*attribute) + } + if a.dedicated == nil { + a.dedicated = make(map[string]struct{}) + } + + for k, v := range other.attributes { + existing, ok := a.attributes[k] + if !ok { + a.attributes[k] = v + continue + } + existing.totalBytes += v.totalBytes + for k, v := range v.cardinality { + existing.cardinality[k] += v + } + } + for k, v := range other.arrayAttributes { + existing, ok := a.arrayAttributes[k] + if !ok { + a.arrayAttributes[k] = v + continue + } + existing.totalBytes += v.totalBytes + } + for k := range other.dedicated { + a.dedicated[k] = struct{}{} + } +} + +func (a attributeSummary) totalBytes() uint64 { + total := uint64(0) + for _, a := range a.attributes { + total += a.totalBytes + } + return total } type attribute struct { - name string - bytes uint64 + name string + cardinality cardinality // Only populated for non-arraystring attributes + totalBytes uint64 +} + +type cardinality map[string]uint64 + +func (c cardinality) add(value string) { + // TODO - instead of storing the raw value in the map, we could hash it and record the length. The + // requirement is to be able to estimate the cardinality and total content size at the end. + c[value]++ +} + +// totalBytes is the sum of all value content length regardless of cardinality or repetitino +func (c cardinality) totalBytes() uint64 { + total := uint64(0) + for v, count := range c { + total += uint64(len(v)) * count + } + return total +} + +func (c cardinality) distinctValueCount() int { + return len(c) +} + +func (c cardinality) totalOccurrences() uint64 { + total := uint64(0) + for _, count := range c { + total += count + } + return total +} + +// dictionarySize is the estimated total size of a compressed dictionary for this attribute. +func (c cardinality) dictionarySize() uint64 { + total := uint64(0) + for v := range c { + total += 4 + uint64(len(v)) // 32-bit length, plus the value itself + } + return total +} + +// avgSizePerRowGroup is the average number of bytes used for this attribute per row group, assuming a +// compressed dictionary and page content of 1 byte per row. +func (c cardinality) avgSizePerRowGroup(numRowGroups int) uint64 { + dict := c.dictionarySize() + content := c.totalOccurrences() + return uint64((float64(dict) + float64(content)) / float64(numRowGroups)) } type makeIterFn func(columnName string, predicate pq.Predicate, selectAs string) pq.Iterator @@ -272,7 +431,7 @@ func makeIterFunc(ctx context.Context, pf *parquet.File) makeIterFn { } } -func aggregateAttributes(pf *parquet.File, definitionLevel int, keyPath string, valuePath string, isArrayPath string) (genericAttrSummary, error) { +func aggregateAttributes(pf *parquet.File, definitionLevel int, keyPath string, valuePath string, isArrayPath string) (attributeSummary, error) { makeIter := makeIterFunc(context.Background(), pf) iters := []pq.Iterator{ @@ -287,41 +446,58 @@ func aggregateAttributes(pf *parquet.File, definitionLevel int, keyPath string, defer attrIter.Close() var ( - totalBytes uint64 - attributes = make(map[string]uint64, 1000) - skippedMap = make(map[string]struct{}, 1000) + attributes = make(map[string]*attribute, 1000) + arrayAttributes = make(map[string]*attribute, 1000) ) for res, err := attrIter.Next(); res != nil; res, err = attrIter.Next() { if err != nil { - return genericAttrSummary{}, err + return attributeSummary{}, err } for _, e := range res.OtherEntries { - if stats, ok := e.Value.(*attrStats); ok { - if stats.isArray { - skippedMap[stats.name] = struct{}{} - continue + stats, ok := e.Value.(*attrStats) + if !ok { + continue + } + + if stats.isArray { + v, ok := arrayAttributes[stats.name] + if !ok { + v = &attribute{ + name: stats.name, + } + arrayAttributes[stats.name] = v } + v.totalBytes += uint64(len(stats.value)) + arrayAttributes[stats.name] = v + continue + } - attributes[stats.name] += stats.bytes - totalBytes += stats.bytes - putStats(stats) + a, ok := attributes[stats.name] + if !ok { + a = &attribute{ + name: stats.name, + cardinality: make(cardinality), + } + attributes[stats.name] = a } + + a.totalBytes += uint64(len(stats.value)) + a.cardinality.add(stats.value) + + putStats(stats) } } - return genericAttrSummary{ - totalBytes: totalBytes, - attributes: attributes, - skipped: slices.Collect(maps.Keys(skippedMap)), - dedicated: make(map[string]struct{}), + return attributeSummary{ + attributes: attributes, + arrayAttributes: arrayAttributes, }, nil } -func aggregateDedicatedColumns(pf *parquet.File, scope backend.DedicatedColumnScope, meta *backend.BlockMeta, paths []string) (genericAttrSummary, error) { - attrMap := make(map[string]uint64) - totalBytes := uint64(0) +func aggregateDedicatedColumns(pf *parquet.File, scope backend.DedicatedColumnScope, meta *backend.BlockMeta, paths []string) (attributeSummary, error) { + attributes := make(map[string]*attribute) i := 0 for _, dedColumn := range meta.DedicatedColumns { @@ -329,30 +505,33 @@ func aggregateDedicatedColumns(pf *parquet.File, scope backend.DedicatedColumnSc continue } - sz, err := aggregateSingleColumn(pf, paths[i]) + c, err := aggregateSingleColumn(pf, paths[i]) if err != nil { - return genericAttrSummary{}, err + return attributeSummary{}, err } i++ - attrMap[dedColumn.Name] = sz - totalBytes += sz + attributes[dedColumn.Name] = &attribute{ + name: dedColumn.Name, + totalBytes: c.totalBytes(), + cardinality: c, + } } - return genericAttrSummary{ - totalBytes: totalBytes, - attributes: attrMap, + return attributeSummary{ + attributes: attributes, }, nil } -func aggregateSingleColumn(pf *parquet.File, colName string) (uint64, error) { - iter := makeIterFunc(context.Background(), pf)(colName, nil, "value") - - var totalBytes uint64 +func aggregateSingleColumn(pf *parquet.File, colName string) (cardinality, error) { + var ( + iter = makeIterFunc(context.Background(), pf)(colName, nil, "value") + cardinality = make(cardinality) + ) for res, err := iter.Next(); res != nil; res, err = iter.Next() { if err != nil { - return 0, err + return nil, err } var val parquet.Value @@ -366,75 +545,163 @@ func aggregateSingleColumn(pf *parquet.File, colName string) (uint64, error) { continue } - totalBytes += val.Uint64() // for strings Uint64() returns the length of the string + cardinality[val.String()]++ } - return totalBytes, nil + return cardinality, nil } -func printSummary(scope string, max int, summary genericAttrSummary, simple bool) error { - // TODO: Support more output formats +func printSummary(scope string, maxAttr int, summary attributeSummary, simple bool, numRowGroups int, blobBytes uint64) error { w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0) - if max > len(summary.attributes) { - max = len(summary.attributes) + if maxAttr > len(summary.attributes) { + maxAttr = len(summary.attributes) } fmt.Println("") - attrList := topN(max, summary.attributes) + attrList := topN(maxAttr, summary.attributes) if simple { fmt.Printf("%s attributes: ", scope) for _, a := range attrList { fmt.Printf("\"%s\", ", a.name) } fmt.Println("") - } else { - fmt.Printf("Top %d %s attributes by size\n", max, scope) - fmt.Printf("Skipped array attributes: %d\n", len(summary.skipped)) - for _, a := range attrList { + return nil + } - name := a.name - if _, ok := summary.dedicated[a.name]; ok { - name = a.name + " (dedicated)" - } + fmt.Printf("Top %d %s attributes by size\n", len(attrList), scope) + totalBytes := summary.totalBytes() + for _, a := range attrList { + + name := a.name + if _, ok := summary.dedicated[a.name]; ok { + name = a.name + " (dedicated)" + } + + var ( + thisBytes = a.totalBytes + percentage = float64(thisBytes) / float64(totalBytes) * 100 + totalOccurences = a.cardinality.totalOccurrences() + distinct = a.cardinality.distinctValueCount() + avgReuse = float64(totalOccurences) / float64(distinct) + totalSize = a.cardinality.avgSizePerRowGroup(numRowGroups) + ) + + blob := "" + if blobBytes > 0 && totalSize >= blobBytes { + blob = "(blob)" + } + + _, err := fmt.Fprintf(w, "name: %s\t size: %s\t (%.2f%%)\tcount: %d\t distinct: %d\t avg reuse: %.2f\t avg rowgroup content (dict + body): %s %s\n", + name, + humanize.Bytes(thisBytes), + percentage, + totalOccurences, + distinct, + avgReuse, + humanize.Bytes(totalSize), + blob, + ) + if err != nil { + return err + } + } + + err := w.Flush() + if err != nil { + return err + } - percentage := float64(a.bytes) / float64(summary.totalBytes) * 100 - _, err := fmt.Fprintf(w, "name: %s\t size: %s\t (%s%%)\n", name, humanize.Bytes(a.bytes), strconv.FormatFloat(percentage, 'f', 2, 64)) + arrayAttrList := topN(maxAttr, summary.arrayAttributes) + if len(arrayAttrList) > 0 { + fmt.Printf("Top %d %s array attributes by size\n", len(arrayAttrList), scope) + for _, a := range arrayAttrList { + percentage := float64(a.totalBytes) / float64(totalBytes) * 100 + _, err := fmt.Fprintf(w, "name: %s\t size: %s\t (%s%%)\n", a.name, humanize.Bytes(a.totalBytes), strconv.FormatFloat(percentage, 'f', 2, 64)) if err != nil { return err } } + + err = w.Flush() + if err != nil { + return err + } } - return w.Flush() + return nil } -func printDedicatedColumnOverridesJsonnet(spanSummary, resourceSummary genericAttrSummary) { +func printDedicatedColumnOverridesJsonnet(summary blockSummary, maxAttr int, numRowGroups int, blobBytes uint64) { fmt.Println("") fmt.Printf("parquet_dedicated_columns: [\n") - // span attributes first - spanAttrList := topN(10, spanSummary.attributes) - for _, a := range spanAttrList { - fmt.Printf(" { scope: 'span', name: '%s', type: 'string' },\n", a.name) + optionsText := func(a *attribute) string { + options := []string{} + if blobBytes > 0 && a.cardinality.avgSizePerRowGroup(numRowGroups) > blobBytes { + options = append(options, "'blob'") + } + if len(options) > 0 { + return ", options: [" + strings.Join(options, ", ") + "]" + } + return "" } - // span attributes first - resourceAttrList := topN(10, resourceSummary.attributes) - for _, a := range resourceAttrList { - fmt.Printf(" { scope: 'resource', name: '%s', type: 'string' },\n", a.name) + for _, a := range topN(maxAttr, summary.spanSummary.attributes) { + fmt.Printf(" { scope: 'span', name: '%s', type: 'string' %s },\n", a.name, optionsText(a)) } + + for _, a := range topN(maxAttr, summary.resourceSummary.attributes) { + fmt.Printf(" { scope: 'resource', name: '%s', type: 'string' %s },\n", a.name, optionsText(a)) + } + + for _, a := range topN(maxAttr, summary.eventSummary.attributes) { + fmt.Printf(" { scope: 'event', name: '%s', type: 'string' %s },\n", a.name, optionsText(a)) + } + fmt.Printf("], \n") fmt.Println("") } -func topN(n int, attrs map[string]uint64) []attribute { - top := make([]attribute, 0, len(attrs)) - for name, bytes := range attrs { - top = append(top, attribute{name, bytes}) +func printCliArgs(s blockSummary, maxAttr int, numRowGroups int, blobBytes uint64) { + fmt.Println("") + fmt.Printf("quoted/spaced cli list:") + + ss := []string{} + for _, a := range topN(maxAttr, s.spanSummary.attributes) { + if blobBytes > 0 && a.cardinality.avgSizePerRowGroup(numRowGroups) > blobBytes { + ss = append(ss, fmt.Sprintf("\"blob/span.%s\"", a.name)) + } else { + ss = append(ss, fmt.Sprintf("\"span.%s\"", a.name)) + } + } + + for _, a := range topN(maxAttr, s.resourceSummary.attributes) { + if blobBytes > 0 && a.cardinality.avgSizePerRowGroup(numRowGroups) > blobBytes { + ss = append(ss, fmt.Sprintf("\"blob/resource.%s\"", a.name)) + } else { + ss = append(ss, fmt.Sprintf("\"resource.%s\"", a.name)) + } + } + + for _, a := range topN(maxAttr, s.eventSummary.attributes) { + if blobBytes > 0 && a.cardinality.avgSizePerRowGroup(numRowGroups) > blobBytes { + ss = append(ss, fmt.Sprintf("\"blob/event.%s\"", a.name)) + } else { + ss = append(ss, fmt.Sprintf("\"event.%s\"", a.name)) + } + } + + fmt.Println(strings.Join(ss, " ")) +} + +func topN(n int, attrs map[string]*attribute) []*attribute { + top := make([]*attribute, 0, len(attrs)) + for _, attr := range attrs { + top = append(top, attr) } sort.Slice(top, func(i, j int) bool { - return top[i].bytes > top[j].bytes + return top[i].totalBytes > top[j].totalBytes }) if len(top) > n { top = top[:n] @@ -446,6 +713,7 @@ var _ pq.GroupPredicate = (*attrStatsCollector)(nil) type attrStats struct { name string + value string bytes uint64 isArray bool isNull bool @@ -459,6 +727,7 @@ var statsPool = sync.Pool{ func putStats(s *attrStats) { s.name = "" + s.value = "" s.bytes = 0 s.isArray = false s.isNull = false @@ -497,7 +766,8 @@ func (a attrStatsCollector) KeepGroup(res *pq.IteratorResult) bool { if e.Value.IsNull() { stats.isNull = true } else { - stats.bytes += e.Value.Uint64() // for strings Uint64() returns the length of the string + stats.value = e.Value.String() + stats.bytes += uint64(len(stats.value)) } case "isArray": if !stats.isArray { diff --git a/cmd/tempo-cli/cmd-analyse-blocks.go b/cmd/tempo-cli/cmd-analyse-blocks.go index f24668042a5..215a6e8ce40 100644 --- a/cmd/tempo-cli/cmd-analyse-blocks.go +++ b/cmd/tempo-cli/cmd-analyse-blocks.go @@ -5,6 +5,7 @@ import ( "errors" "time" + "github.com/dustin/go-humanize" "github.com/google/uuid" "github.com/grafana/tempo/tempodb/backend" @@ -14,17 +15,24 @@ type analyseBlocksCmd struct { backendOptions Jsonnet bool `help:"output Jsonnet necessary for overrides"` + Cli bool `help:"Generate textual args for passing to parquet conversion command"` SimpleSummary bool `help:"Print only single line of top attributes" default:"false"` PrintFullSummary bool `help:"Print full summary of the analysed block" default:"true"` TenantID string `arg:"" help:"tenant-id within the bucket"` MinCompactionLevel int `help:"Min compaction level to analyse" default:"3"` MaxBlocks int `help:"Max number of blocks to analyse" default:"10"` NumAttr int `help:"Number of attributes to display" default:"15"` + BlobThreshold string `help:"Convert column to blob when dictionary size reaches this value" default:"4MiB"` MaxStartTime string `help:"Oldest start time for a block to be processed. RFC3339 format '2006-01-02T15:04:05Z07:00'" default:""` MinStartTime string `help:"Newest start time for a block to be processed. RFC3339 format '2006-01-02T15:04:05Z07:00'" default:""` } func (cmd *analyseBlocksCmd) Run(ctx *globalOptions) error { + blobBytes, err := humanize.ParseBytes(cmd.BlobThreshold) + if err != nil { + return err + } + r, _, _, err := loadBackend(&cmd.backendOptions, ctx) if err != nil { return err @@ -36,11 +44,12 @@ func (cmd *analyseBlocksCmd) Run(ctx *globalOptions) error { return err } - processedBlocks := map[uuid.UUID]struct{}{} - topSpanAttrs, topResourceAttrs := make(map[string]uint64), make(map[string]uint64) - totalSpanBytes, totalResourceBytes := uint64(0), uint64(0) + var ( + processedBlocks = map[uuid.UUID]struct{}{} + totalSummary blockSummary + maxStartTime, minStartTime time.Time + ) - var maxStartTime, minStartTime time.Time if cmd.MaxStartTime != "" { maxStartTime, err = time.Parse(time.RFC3339, cmd.MaxStartTime) if err != nil { @@ -80,28 +89,11 @@ func (cmd *analyseBlocksCmd) Run(ctx *globalOptions) error { continue } - for k, v := range blockSum.spanSummary.attributes { - topSpanAttrs[k] += v - } - totalSpanBytes += blockSum.spanSummary.totalBytes - - for k, v := range blockSum.resourceSummary.attributes { - topResourceAttrs[k] += v - } - totalResourceBytes += blockSum.resourceSummary.totalBytes + totalSummary.add(*blockSum) processedBlocks[block] = struct{}{} } // Get top N attributes from map - return (&blockSummary{ - spanSummary: genericAttrSummary{ - totalBytes: totalSpanBytes, - attributes: topSpanAttrs, - }, - resourceSummary: genericAttrSummary{ - totalBytes: totalResourceBytes, - attributes: topResourceAttrs, - }, - }).print(cmd.NumAttr, cmd.Jsonnet, cmd.SimpleSummary, cmd.PrintFullSummary) + return totalSummary.print(cmd.NumAttr, cmd.Jsonnet, cmd.SimpleSummary, cmd.PrintFullSummary, cmd.Cli, blobBytes) } diff --git a/cmd/tempo-cli/cmd-convert-parquet-4to5.go b/cmd/tempo-cli/cmd-convert-parquet-4to5.go index 78ad0ad0591..c53590de967 100644 --- a/cmd/tempo-cli/cmd-convert-parquet-4to5.go +++ b/cmd/tempo-cli/cmd-convert-parquet-4to5.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "path/filepath" + "strings" "github.com/parquet-go/parquet-go" @@ -55,22 +56,41 @@ func (cmd *convertParquet4to5) Run() error { dedicatedCols = make(backend.DedicatedColumns, 0, len(cmd.DedicatedColumns)) for _, col := range cmd.DedicatedColumns { + + var ( + typ = backend.DedicatedColumnTypeString + options = backend.DedicatedColumnOptions{} + ) + + col, blob := strings.CutPrefix(col, "blob/") + if blob { + options = append(options, backend.DedicatedColumnOptionBlob) + } + att, err := traceql.ParseIdentifier(col) if err != nil { return err } - scope := backend.DedicatedColumnScopeSpan - if att.Scope == traceql.AttributeScopeResource { + var scope backend.DedicatedColumnScope + switch att.Scope { + case traceql.AttributeScopeSpan: + scope = backend.DedicatedColumnScopeSpan + case traceql.AttributeScopeResource: scope = backend.DedicatedColumnScopeResource + case traceql.AttributeScopeEvent: + scope = backend.DedicatedColumnScopeEvent + default: + return fmt.Errorf("dedicated columns must be scoped: %s", att.Scope) } fmt.Printf("add dedicated column scope=%s name=%s\n", scope, att.Name) dedicatedCols = append(dedicatedCols, backend.DedicatedColumn{ - Scope: scope, - Name: att.Name, - Type: backend.DedicatedColumnTypeString, + Scope: scope, + Name: att.Name, + Type: typ, + Options: options, }) } } else { diff --git a/tempodb/backend/block_meta.go b/tempodb/backend/block_meta.go index fc31719464c..794951cabfb 100644 --- a/tempodb/backend/block_meta.go +++ b/tempodb/backend/block_meta.go @@ -32,16 +32,18 @@ const ( DedicatedColumnScopeResource DedicatedColumnScope = "resource" DedicatedColumnScopeSpan DedicatedColumnScope = "span" + DedicatedColumnScopeEvent DedicatedColumnScope = "event" DedicatedColumnOptionArray DedicatedColumnOption = "array" + DedicatedColumnOptionBlob DedicatedColumnOption = "blob" DefaultDedicatedColumnType = DedicatedColumnTypeString DefaultDedicatedColumnScope = DedicatedColumnScopeSpan ) var maxSupportedColumns = map[DedicatedColumnType]map[DedicatedColumnScope]int{ - DedicatedColumnTypeString: {DedicatedColumnScopeSpan: 10, DedicatedColumnScopeResource: 10}, - DedicatedColumnTypeInt: {DedicatedColumnScopeSpan: 5, DedicatedColumnScopeResource: 5}, + DedicatedColumnTypeString: {DedicatedColumnScopeSpan: 10, DedicatedColumnScopeResource: 10, DedicatedColumnScopeEvent: 10}, + DedicatedColumnTypeInt: {DedicatedColumnScopeSpan: 5, DedicatedColumnScopeResource: 5, DedicatedColumnScopeEvent: 5}, } func DedicatedColumnTypeFromTempopb(t tempopb.DedicatedColumn_Type) (DedicatedColumnType, error) { diff --git a/tempodb/encoding/vparquet4/schema.go b/tempodb/encoding/vparquet4/schema.go index 9de5538e420..5eac03c3a1a 100644 --- a/tempodb/encoding/vparquet4/schema.go +++ b/tempodb/encoding/vparquet4/schema.go @@ -72,6 +72,13 @@ const ( FieldSpanAttrValInt = "rs.list.element.ss.list.element.Spans.list.element.Attrs.list.element.ValueInt.list.element" FieldSpanAttrValDouble = "rs.list.element.ss.list.element.Spans.list.element.Attrs.list.element.ValueDouble.list.element" FieldSpanAttrValBool = "rs.list.element.ss.list.element.Spans.list.element.Attrs.list.element.ValueBool.list.element" + + FieldEventAttrKey = "rs.list.element.ss.list.element.Spans.list.element.Events.list.element.Attrs.list.element.Key" + FieldEventAttrIsArray = "rs.list.element.ss.list.element.Spans.list.element.Events.list.element.Attrs.list.element.IsArray" + FieldEventAttrVal = "rs.list.element.ss.list.element.Spans.list.element.Events.list.element.Attrs.list.element.Value.list.element" + FieldEventAttrValInt = "rs.list.element.ss.list.element.Spans.list.element.Events.list.element.Attrs.list.element.ValueInt.list.element" + FieldEventAttrValDouble = "rs.list.element.ss.list.element.Spans.list.element.Events.list.element.Attrs.list.element.ValueDouble.list.element" + FieldEventAttrValBool = "rs.list.element.ss.list.element.Spans.list.element.Events.list.element.Attrs.list.element.ValueBool.list.element" ) var ( diff --git a/tempodb/encoding/vparquet5/block_autocomplete.go b/tempodb/encoding/vparquet5/block_autocomplete.go index f10fbf3d364..962323b28d0 100644 --- a/tempodb/encoding/vparquet5/block_autocomplete.go +++ b/tempodb/encoding/vparquet5/block_autocomplete.go @@ -167,6 +167,18 @@ func tagNamesForSpecialColumns(scope traceql.AttributeScope, pf *parquet.File, d } } } + + // add all event dedicated columns that have values + if scope == traceql.AttributeScopeNone || scope == traceql.AttributeScopeEvent { + dedCols := dedicatedColumnsToColumnMapping(dcs, backend.DedicatedColumnScopeEvent) + for name, col := range dedCols.mapping { + if hasValues(col.ColumnPath, pf) { + if cb(name, traceql.AttributeScopeEvent) { + return + } + } + } + } } func (b *backendBlock) FetchTagValues(ctx context.Context, req traceql.FetchTagValuesRequest, cb traceql.FetchTagValuesCallback, mcb common.MetricsCallback, opts common.SearchOptions) error { @@ -248,7 +260,7 @@ func autocompleteIter(ctx context.Context, tr tagRequest, pf *parquet.File, opts var currentIter parquetquery.Iterator if len(catConditions.event) > 0 || tr.keysRequested(traceql.AttributeScopeEvent) { - currentIter, err = createDistinctEventIterator(makeIter, makeNilIter, tr, currentIter, catConditions.event) + currentIter, err = createDistinctEventIterator(makeIter, makeNilIter, tr, currentIter, catConditions.event, dc) if err != nil { return nil, errors.Wrap(err, "creating event iterator") } @@ -297,12 +309,28 @@ func createDistinctEventIterator( tr tagRequest, primaryIter parquetquery.Iterator, conditions []traceql.Condition, + dedicatedColumns backend.DedicatedColumns, ) (parquetquery.Iterator, error) { var ( + columnSelectAs = map[string]string{} + columnPredicates = map[string][]parquetquery.Predicate{} iters []parquetquery.Iterator genericConditions []traceql.Condition + columnMapping = dedicatedColumnsToColumnMapping(dedicatedColumns, backend.DedicatedColumnScopeEvent) ) + addSelectAs := func(attr traceql.Attribute, columnPath string, selectAs string) { + if attr == tr.tag { + columnSelectAs[columnPath] = selectAs + } else { + columnSelectAs[columnPath] = "" // Don't select, just filter + } + } + + addPredicate := func(columnPath string, p parquetquery.Predicate) { + columnPredicates[columnPath] = append(columnPredicates[columnPath], p) + } + for _, cond := range conditions { // Intrinsic? if cond.Attribute.Intrinsic == traceql.IntrinsicEventName { @@ -318,6 +346,37 @@ func createDistinctEventIterator( continue } + // Attributes stored in dedicated columns + if c, ok := columnMapping.get(cond.Attribute.Name); ok { + // Operands that need special handling. + switch cond.Op { + case traceql.OpNone: + addPredicate(c.ColumnPath, nil) // No filtering + columnSelectAs[c.ColumnPath] = cond.Attribute.Name + continue + case traceql.OpExists: + addPredicate(c.ColumnPath, &parquetquery.SkipNilsPredicate{}) + columnSelectAs[c.ColumnPath] = cond.Attribute.Name + continue + case traceql.OpNotExists: + pred := parquetquery.NewNilValuePredicate() + iters = append(iters, makeIter(c.ColumnPath, pred, cond.Attribute.Name)) + continue + } + + // Compatible type? + typ, _ := c.Type.ToStaticType() + if typ == operandType(cond.Operands) { + pred, err := createPredicate(cond.Op, cond.Operands) + if err != nil { + return nil, errors.Wrap(err, "creating predicate") + } + addPredicate(c.ColumnPath, pred) + addSelectAs(cond.Attribute, c.ColumnPath, cond.Attribute.Name) + continue + } + } + // generic attr does not exist? if cond.Op == traceql.OpNotExists { pred := parquetquery.NewIncludeNilStringEqualPredicate([]byte(cond.Attribute.Name)) @@ -329,6 +388,10 @@ func createDistinctEventIterator( genericConditions = append(genericConditions, cond) } + for columnPath, predicates := range columnPredicates { + iters = append(iters, makeIter(columnPath, orIfNeeded(predicates), columnSelectAs[columnPath])) + } + attrIter, err := createDistinctAttributeIterator(makeIter, tr, genericConditions, DefinitionLevelResourceSpansILSSpanEventAttrs, columnPathEventAttrKey, columnPathEventAttrString, columnPathEventAttrInt, columnPathEventAttrDouble, columnPathEventAttrBool) if err != nil { @@ -1173,11 +1236,24 @@ func (d distinctValueCollector) KeepGroup(result *parquetquery.IteratorResult) b } func mapEventAttr(e entry) traceql.Static { - if e.Key == columnPathEventName { + switch e.Key { + case columnPathEventName: return traceql.NewStaticString(unsafeToString(e.Value.ByteArray())) + default: + // This exists for event-level dedicated columns + switch e.Value.Kind() { + case parquet.Boolean: + return traceql.NewStaticBool(e.Value.Boolean()) + case parquet.Int32, parquet.Int64: + return traceql.NewStaticInt(int(e.Value.Int64())) + case parquet.Float: + return traceql.NewStaticFloat(e.Value.Double()) + case parquet.ByteArray, parquet.FixedLenByteArray: + return traceql.NewStaticString(unsafeToString(e.Value.ByteArray())) + } } - return traceql.Static{} + return traceql.NewStaticNil() } func mapLinkAttr(_ entry) traceql.Static { diff --git a/tempodb/encoding/vparquet5/block_findtracebyid.go b/tempodb/encoding/vparquet5/block_findtracebyid.go index c18502fd761..309a442936e 100644 --- a/tempodb/encoding/vparquet5/block_findtracebyid.go +++ b/tempodb/encoding/vparquet5/block_findtracebyid.go @@ -260,7 +260,8 @@ func findTraceByID(ctx context.Context, traceID common.ID, meta *backend.BlockMe rowMatch += int64(res.RowNumber[0]) // seek to row and read - r := parquet.NewGenericReader[*Trace](pf) + _, _, readerOptions := SchemaWithDynamicChanges(meta.DedicatedColumns) + r := parquet.NewGenericReader[*Trace](pf, readerOptions...) defer r.Close() err = r.SeekToRow(rowMatch) diff --git a/tempodb/encoding/vparquet5/block_findtracebyid_test.go b/tempodb/encoding/vparquet5/block_findtracebyid_test.go index b7f4ea24e99..1d3a5558419 100644 --- a/tempodb/encoding/vparquet5/block_findtracebyid_test.go +++ b/tempodb/encoding/vparquet5/block_findtracebyid_test.go @@ -8,7 +8,6 @@ import ( "testing" "github.com/google/uuid" - "github.com/parquet-go/parquet-go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -101,7 +100,6 @@ func TestBackendBlockFindTraceByID(t *testing.T) { gotProto, err := b.FindTraceByID(ctx, tr.TraceID, common.DefaultSearchOptions()) require.NoError(t, err) require.Equal(t, wantProto, gotProto.Trace) - require.Greater(t, gotProto.Metrics.InspectedBytes, uint64(60000)) // approximate value } } @@ -126,7 +124,7 @@ func TestBackendBlockFindTraceByID_TestData(t *testing.T) { iter, err := b.rawIter(context.Background(), newRowPool(10)) require.NoError(t, err) - sch := parquet.SchemaOf(new(Trace)) + sch, _, _ := SchemaWithDynamicChanges(meta.DedicatedColumns) for { _, row, err := iter.Next(context.Background()) require.NoError(t, err) diff --git a/tempodb/encoding/vparquet5/block_iterator.go b/tempodb/encoding/vparquet5/block_iterator.go index 454e0b5f7ca..1e7d3e664b1 100644 --- a/tempodb/encoding/vparquet5/block_iterator.go +++ b/tempodb/encoding/vparquet5/block_iterator.go @@ -22,11 +22,13 @@ func (b *backendBlock) openForIteration(ctx context.Context) (*parquet.File, *pa // 128 MB memory buffering br := tempo_io.NewBufferedReaderAt(rr, int64(b.meta.Size_), 2*1024*1024, 64) + sch, _, readerOptions := SchemaWithDynamicChanges(b.meta.DedicatedColumns) + o := []parquet.FileOption{ parquet.SkipBloomFilters(true), parquet.SkipPageIndex(true), - parquet.FileSchema(parquetSchema), parquet.FileReadMode(parquet.ReadModeAsync), + parquet.FileSchema(sch), } pf, err := parquet.OpenFile(br, int64(b.meta.Size_), o...) @@ -34,7 +36,7 @@ func (b *backendBlock) openForIteration(ctx context.Context) (*parquet.File, *pa return nil, nil, err } - r := parquet.NewReader(pf, pf.Schema()) + r := parquet.NewReader(pf, append(readerOptions, sch)...) return pf, r, nil } diff --git a/tempodb/encoding/vparquet5/block_search.go b/tempodb/encoding/vparquet5/block_search.go index 8aad0b668d7..5b560e95052 100644 --- a/tempodb/encoding/vparquet5/block_search.go +++ b/tempodb/encoding/vparquet5/block_search.go @@ -60,12 +60,15 @@ func (b *backendBlock) openForSearch(ctx context.Context, opts common.SearchOpti // TODO: ctx is also cached when we cache backendReaderAt, not ideal but leaving it as is for now backendReaderAt := NewBackendReaderAt(ctx, b.r, DataFileName, b.meta) + + schema, _, _ := SchemaWithDynamicChanges(b.meta.DedicatedColumns) + // no searches currently require bloom filters or the page index. so just add them statically o := []parquet.FileOption{ parquet.SkipBloomFilters(true), parquet.SkipPageIndex(true), parquet.FileReadMode(parquet.ReadModeAsync), - parquet.FileSchema(parquetSchema), + parquet.FileSchema(schema), } // if the read buffer size provided is <= 0 then we'll use the parquet default diff --git a/tempodb/encoding/vparquet5/block_traceql.go b/tempodb/encoding/vparquet5/block_traceql.go index f39fd797057..48c22f4530b 100644 --- a/tempodb/encoding/vparquet5/block_traceql.go +++ b/tempodb/encoding/vparquet5/block_traceql.go @@ -1670,7 +1670,7 @@ func createAllIterator(ctx context.Context, primaryIter parquetquery.Iterator, c innerIterators = append(innerIterators, primaryIter) } - eventIter, err := createEventIterator(makeIter, makeNilIter, catConditions.event, allConditions, selectAll) + eventIter, err := createEventIterator(makeIter, makeNilIter, catConditions.event, allConditions, dc, selectAll) if err != nil { return nil, fmt.Errorf("creating event iterator: %w", err) } @@ -1704,13 +1704,41 @@ func createAllIterator(ctx context.Context, primaryIter parquetquery.Iterator, c return createTraceIterator(makeIter, resourceIter, catConditions.trace, start, end, allConditions, selectAll, traceSampler) } -func createEventIterator(makeIter, makeNilIter makeIterFn, conditions []traceql.Condition, allConditions bool, selectAll bool) (parquetquery.Iterator, error) { +func createEventIterator(makeIter, makeNilIter makeIterFn, conditions []traceql.Condition, allConditions bool, dedicatedColumns backend.DedicatedColumns, selectAll bool) (parquetquery.Iterator, error) { if len(conditions) == 0 { return nil, nil } - eventIters := make([]parquetquery.Iterator, 0, len(conditions)) - var genericConditions []traceql.Condition + var ( + columnSelectAs = map[string]string{} + columnPredicates = map[string][]parquetquery.Predicate{} + iters []parquetquery.Iterator + genericConditions []traceql.Condition + columnMapping = dedicatedColumnsToColumnMapping(dedicatedColumns, backend.DedicatedColumnScopeEvent) + ) + + addPredicate := func(columnPath string, p parquetquery.Predicate) { + columnPredicates[columnPath] = append(columnPredicates[columnPath], p) + } + + specialCase := func(cond traceql.Condition, columnPath string) (handled bool) { + // Operands that need special handling. + switch cond.Op { + case traceql.OpNone: + addPredicate(columnPath, nil) // No filtering + columnSelectAs[columnPath] = cond.Attribute.Name + return true + case traceql.OpExists: + addPredicate(columnPath, &parquetquery.SkipNilsPredicate{}) + columnSelectAs[columnPath] = cond.Attribute.Name + return true + case traceql.OpNotExists: + iters = append(iters, makeIter(columnPath, parquetquery.NewNilValuePredicate(), cond.Attribute.Name)) + return true + default: + return false + } + } for _, cond := range conditions { switch cond.Attribute.Intrinsic { @@ -1719,27 +1747,50 @@ func createEventIterator(makeIter, makeNilIter makeIterFn, conditions []traceql. if err != nil { return nil, err } - eventIters = append(eventIters, makeIter(columnPathEventName, pred, columnPathEventName)) + iters = append(iters, makeIter(columnPathEventName, pred, columnPathEventName)) continue case traceql.IntrinsicEventTimeSinceStart: pred, err := createIntPredicate(cond.Op, cond.Operands) if err != nil { return nil, err } - eventIters = append(eventIters, makeIter(columnPathEventTimeSinceStart, pred, columnPathEventTimeSinceStart)) + iters = append(iters, makeIter(columnPathEventTimeSinceStart, pred, columnPathEventTimeSinceStart)) continue } + // Attributes stored in dedicated columns + if c, ok := columnMapping.get(cond.Attribute.Name); ok { + if specialCase(cond, c.ColumnPath) { + continue + } + + // Compatible type? + typ, _ := c.Type.ToStaticType() + if typ == operandType(cond.Operands) { + pred, err := createPredicate(cond.Op, cond.Operands) + if err != nil { + return nil, fmt.Errorf("creating predicate: %w", err) + } + addPredicate(c.ColumnPath, pred) + columnSelectAs[c.ColumnPath] = cond.Attribute.Name + continue + } + } + if cond.Op == traceql.OpNotExists { // Generic attr doesn't exist pred := parquetquery.NewIncludeNilStringEqualPredicate([]byte(cond.Attribute.Name)) - eventIters = append(eventIters, makeNilIter(columnPathEventAttrKey, pred, cond.Attribute.Name)) + iters = append(iters, makeNilIter(columnPathEventAttrKey, pred, cond.Attribute.Name)) continue } genericConditions = append(genericConditions, cond) } + for columnPath, predicates := range columnPredicates { + iters = append(iters, makeIter(columnPath, orIfNeeded(predicates), columnSelectAs[columnPath])) + } + attrIter, err := createAttributeIterator(makeIter, genericConditions, DefinitionLevelResourceSpansILSSpanEventAttrs, columnPathEventAttrKey, columnPathEventAttrString, columnPathEventAttrInt, columnPathEventAttrDouble, columnPathEventAttrBool, allConditions, selectAll) if err != nil { @@ -1747,7 +1798,7 @@ func createEventIterator(makeIter, makeNilIter makeIterFn, conditions []traceql. } if attrIter != nil { - eventIters = append(eventIters, attrIter) + iters = append(iters, attrIter) } var required []parquetquery.Iterator @@ -1770,19 +1821,19 @@ func createEventIterator(makeIter, makeNilIter makeIterFn, conditions []traceql. // This is an optimization for when all of the span conditions must be met. // We simply move all iterators into the required list. if allConditions { - required = append(required, eventIters...) - eventIters = nil + required = append(required, iters...) + iters = nil } if len(required) == 0 { required = []parquetquery.Iterator{makeIter(columnPathEventName, nil, "")} } - if len(eventIters) == 0 && len(required) == 0 { + if len(iters) == 0 && len(required) == 0 { return nil, nil } - return parquetquery.NewLeftJoinIterator(DefinitionLevelResourceSpansILSSpanEvent, required, eventIters, eventCol, parquetquery.WithPool(pqEventPool)) + return parquetquery.NewLeftJoinIterator(DefinitionLevelResourceSpansILSSpanEvent, required, iters, eventCol, parquetquery.WithPool(pqEventPool)) } func createLinkIterator(makeIter, makeNilIter makeIterFn, conditions []traceql.Condition, allConditions, selectAll bool) (parquetquery.Iterator, error) { @@ -3551,12 +3602,25 @@ func (c *eventCollector) KeepGroup(res *parquetquery.IteratorResult) bool { s: traceql.NewStaticDuration(time.Duration(e.Value.Int64())), }) default: - // null value indicates attribute doesn't exist - if e.Value.IsNull() { - ev.attrs = append(ev.attrs, attrVal{ - a: newEventAttr(e.Key), - s: traceql.NewStaticString("nil"), - }) + // TODO - This exists for span-level dedicated columns like http.status_code + // Are nils possible here? + switch e.Value.Kind() { + case parquet.Boolean: + ev.attrs = append(ev.attrs, attrVal{a: newEventAttr(e.Key), s: traceql.NewStaticBool(e.Value.Boolean())}) + case parquet.Int32, parquet.Int64: + ev.attrs = append(ev.attrs, attrVal{a: newEventAttr(e.Key), s: traceql.NewStaticInt(int(e.Value.Int64()))}) + case parquet.Float: + ev.attrs = append(ev.attrs, attrVal{a: newEventAttr(e.Key), s: traceql.NewStaticFloat(e.Value.Double())}) + case parquet.ByteArray: + ev.attrs = append(ev.attrs, attrVal{a: newEventAttr(e.Key), s: traceql.NewStaticString(unsafeToString(e.Value.Bytes()))}) + default: + // null value indicates attribute doesn't exist + if e.Value.IsNull() { + ev.attrs = append(ev.attrs, attrVal{ + a: newEventAttr(e.Key), + s: traceql.NewStaticString("nil"), + }) + } } } } diff --git a/tempodb/encoding/vparquet5/block_traceql_test.go b/tempodb/encoding/vparquet5/block_traceql_test.go index f4f405c6d75..69524aa688b 100644 --- a/tempodb/encoding/vparquet5/block_traceql_test.go +++ b/tempodb/encoding/vparquet5/block_traceql_test.go @@ -1444,29 +1444,35 @@ func BenchmarkBackendBlockTraceQL(b *testing.B) { {"mixed", `{resource.namespace!="" && resource.service.name="cortex-gateway" && duration>50ms && resource.cluster=~"prod.*"}`}, {"complex", `{resource.k8s.cluster.name =~ "prod.*" && resource.k8s.namespace.name = "hosted-grafana" && resource.k8s.container.name="hosted-grafana-gateway" && name = "httpclient/grafana" && span.http.status_code = 200 && duration > 20ms}`}, {"select", `{resource.k8s.cluster.name =~ "prod.*" && resource.k8s.namespace.name = "tempo-prod"} | select(resource.container)`}, + + // TODO - Check block meta and automatically find dedicated and blob attributes to test. + // {"span generic", "{span.ICCID=~`.*bar.*`}"}, + // {"span blob", "{span.model=~`.*a.*`}"}, + // {"span dedicated", "{span.db.statement=~`.*bar.*`}"}, } - ctx := context.TODO() - opts := common.DefaultSearchOptions() - opts.StartPage = 3 - opts.TotalPages = 7 + var ( + ctx = b.Context() + opts = common.DefaultSearchOptions() + e = traceql.NewEngine() + ) block := blockForBenchmarks(b) _, _, err := block.openForSearch(ctx, opts) require.NoError(b, err) + f := traceql.NewSpansetFetcherWrapper(func(ctx context.Context, req traceql.FetchSpansRequest) (traceql.FetchSpansResponse, error) { + return block.Fetch(ctx, req, opts) + }) + for _, tc := range testCases { b.Run(tc.name, func(b *testing.B) { b.ResetTimer() bytesRead := 0 matches := 0 - for i := 0; i < b.N; i++ { - e := traceql.NewEngine() - - resp, err := e.ExecuteSearch(ctx, &tempopb.SearchRequest{Query: tc.query}, traceql.NewSpansetFetcherWrapper(func(ctx context.Context, req traceql.FetchSpansRequest) (traceql.FetchSpansResponse, error) { - return block.Fetch(ctx, req, opts) - }), false) + for b.Loop() { + resp, err := e.ExecuteSearch(ctx, &tempopb.SearchRequest{Query: tc.query}, f, false) require.NoError(b, err) require.NotNil(b, resp) @@ -1598,7 +1604,6 @@ func BenchmarkBackendBlockQueryRange(b *testing.B) { "{span.http.host != `` && span.http.flavor=`2`} | rate() by (span.http.flavor)", // Multiple conditions "{status=error} | rate()", "{} | quantile_over_time(duration, .99, .9, .5)", - "{} | quantile_over_time(duration, .99, .9, .5)", "{} | quantile_over_time(duration, .99) by (span.http.status_code)", "{} | histogram_over_time(duration)", "{} | avg_over_time(duration) by (span.http.status_code)", @@ -1614,13 +1619,13 @@ func BenchmarkBackendBlockQueryRange(b *testing.B) { // For sampler debugging log.Logger = kitlog.NewLogfmtLogger(kitlog.NewSyncWriter(os.Stderr)) - e := traceql.NewEngine() - ctx := context.TODO() - opts := common.DefaultSearchOptions() - opts.StartPage = 3 - opts.TotalPages = 7 + var ( + e = traceql.NewEngine() + ctx = b.Context() + opts = common.DefaultSearchOptions() + block = blockForBenchmarks(b) + ) - block := blockForBenchmarks(b) _, _, err := block.openForSearch(ctx, opts) require.NoError(b, err) @@ -1645,11 +1650,12 @@ func BenchmarkBackendBlockQueryRange(b *testing.B) { require.NoError(b, err) b.ResetTimer() - for i := 0; i < b.N; i++ { + for b.Loop() { err := eval.Do(ctx, f, st, end, int(req.MaxSeries)) require.NoError(b, err) } + // Always call results to include the final series processing in the benchmark. _ = eval.Results() bytes, spansTotal, _ := eval.Metrics() @@ -1659,6 +1665,51 @@ func BenchmarkBackendBlockQueryRange(b *testing.B) { } } +func BenchmarkReadAllTraces(b *testing.B) { + var ( + ctx = b.Context() + block = blockForBenchmarks(b) + pool = newRowPool(1_000_000) + ) + + iter, err := block.rawIter(ctx, pool) + require.NoError(b, err) + + for b.Loop() { + for { + id, row, err := iter.Next(ctx) + require.NoError(b, err) + if id == nil { + break + } + require.NotNil(b, row) + pool.Put(row) + } + } +} + +func BenchmarkReadSingleTrace(b *testing.B) { + var ( + ctx = b.Context() + opts = common.DefaultSearchOptions() + block = blockForBenchmarks(b) + ) + + // Get a trace ID from the first page. + pf, _, err := block.openForSearch(ctx, opts) + require.NoError(b, err) + index, err := pf.RowGroups()[0].ColumnChunks()[0].ColumnIndex() + require.NoError(b, err) + id := index.MaxValue(0).ByteArray() + require.NotNil(b, id) + + for b.Loop() { + tr, err := block.FindTraceByID(ctx, id, opts) + require.NoError(b, err) + require.NotNil(b, tr) + } +} + func TestSamplingError(t *testing.T) { // Comment this out to run tests when developing. t.Skip("Skipping sampling error test") diff --git a/tempodb/encoding/vparquet5/compactor.go b/tempodb/encoding/vparquet5/compactor.go index f60b30c2768..72c59c51ed1 100644 --- a/tempodb/encoding/vparquet5/compactor.go +++ b/tempodb/encoding/vparquet5/compactor.go @@ -74,13 +74,13 @@ func (c *Compactor) Compact(ctx context.Context, l log.Logger, r backend.Reader, return nil, fmt.Errorf("error creating iterator for block %s: %w", blockMeta.BlockID.String(), err) } - bookmarks = append(bookmarks, newBookmark[parquet.Row](iter)) + bookmarks = append(bookmarks, newBookmark(iter)) } var ( replicationFactor = inputs[0].ReplicationFactor nextCompactionLevel = compactionLevel + 1 - sch = parquet.SchemaOf(new(Trace)) + sch, _, _ = SchemaWithDynamicChanges(inputs[0].DedicatedColumns) ) // Dedupe rows and also call the metrics callback. diff --git a/tempodb/encoding/vparquet5/compactor_test.go b/tempodb/encoding/vparquet5/compactor_test.go index 38423a81200..af030a03412 100644 --- a/tempodb/encoding/vparquet5/compactor_test.go +++ b/tempodb/encoding/vparquet5/compactor_test.go @@ -154,7 +154,7 @@ func TestCountSpans(t *testing.T) { rootSpan := "foo" rootService := "bar" - sch := parquet.SchemaOf(new(Trace)) + sch, _, _ := SchemaWithDynamicChanges(backend.DedicatedColumns{}) traceID := make([]byte, 16) _, err := crand.Read(traceID) require.NoError(t, err) diff --git a/tempodb/encoding/vparquet5/create.go b/tempodb/encoding/vparquet5/create.go index 08ce9b91285..645b9f02177 100644 --- a/tempodb/encoding/vparquet5/create.go +++ b/tempodb/encoding/vparquet5/create.go @@ -62,10 +62,11 @@ func CreateBlock(ctx context.Context, cfg *common.BlockConfig, meta *backend.Blo } else { // Need to convert from proto->parquet obj var ( - buffer = &Trace{} - connected bool - resMapping = dedicatedColumnsToColumnMapping(meta.DedicatedColumns, backend.DedicatedColumnScopeResource) - spanMapping = dedicatedColumnsToColumnMapping(meta.DedicatedColumns, backend.DedicatedColumnScopeSpan) + buffer = &Trace{} + connected bool + resMapping = dedicatedColumnsToColumnMapping(meta.DedicatedColumns, backend.DedicatedColumnScopeResource) + spanMapping = dedicatedColumnsToColumnMapping(meta.DedicatedColumns, backend.DedicatedColumnScopeSpan) + eventMapping = dedicatedColumnsToColumnMapping(meta.DedicatedColumns, backend.DedicatedColumnScopeEvent) ) next = func(context.Context) error { id, tr, err := i.Next(ctx) @@ -79,7 +80,7 @@ func CreateBlock(ctx context.Context, cfg *common.BlockConfig, meta *backend.Blo // Copy ID to allow it to escape the iterator. id = append([]byte(nil), id...) - buffer, connected = traceToParquetWithMapping(id, tr, buffer, resMapping, spanMapping) + buffer, connected = traceToParquetWithMapping(id, tr, buffer, resMapping, spanMapping, eventMapping) if !connected { dataquality.WarnDisconnectedTrace(meta.TenantID, dataquality.PhaseTraceWalToComplete) } @@ -144,16 +145,18 @@ func newStreamingBlock(ctx context.Context, cfg *common.BlockConfig, meta *backe newMeta.StartTime = meta.StartTime newMeta.EndTime = meta.EndTime newMeta.ReplicationFactor = meta.ReplicationFactor - // remove unsupported dedicated columns newMeta.DedicatedColumns = filterDedicatedColumns(meta.DedicatedColumns) // TotalObjects is used here an an estimated count for the bloom filter. // The real number of objects is tracked below. bloom := common.NewBloom(cfg.BloomFP, uint(cfg.BloomShardSizeBytes), uint(meta.TotalObjects)) - w := &backendWriter{ctx, to, DataFileName, (uuid.UUID)(meta.BlockID), meta.TenantID, nil} - bw := createBufferedWriter(w) - pw := parquet.NewGenericWriter[*Trace](bw) + var ( + w = &backendWriter{ctx, to, DataFileName, (uuid.UUID)(meta.BlockID), meta.TenantID, nil} + bw = createBufferedWriter(w) + _, writerOptions, _ = SchemaWithDynamicChanges(meta.DedicatedColumns) + pw = parquet.NewGenericWriter[*Trace](bw, writerOptions...) + ) return &streamingBlock{ ctx: ctx, @@ -292,6 +295,7 @@ func estimateMarshalledSizeFromTrace(tr *Trace) (size int) { for _, rs := range tr.ResourceSpans { size += estimateAttrSize(rs.Resource.Attrs) size += 21 // 21 resource lvl fields including dedicated attributes + size += 10 // 10 dedicated columns for _, ils := range rs.ScopeSpans { size += 2 // 2 scope span lvl fields @@ -300,6 +304,7 @@ func estimateMarshalledSizeFromTrace(tr *Trace) (size int) { for _, s := range ils.Spans { size += 35 // 35 span lvl fields including dedicated attributes + size += 10 // 10 dedicated columns size += len(s.SpanID) + len(s.ParentSpanID) size += estimateAttrSize(s.Attrs) size += estimateEventsSize(s.Events) @@ -326,7 +331,8 @@ func estimateAttrSize(attrs []Attribute) (size int) { func estimateEventsSize(events []Event) (size int) { for _, e := range events { - size += 4 // 4 event lvl fields + size += 4 // 4 event lvl fields + size += 10 // 10 dedicated columns size += estimateAttrSize(e.Attrs) } return diff --git a/tempodb/encoding/vparquet5/create_test.go b/tempodb/encoding/vparquet5/create_test.go index cb1f422a0ce..004009544f5 100644 --- a/tempodb/encoding/vparquet5/create_test.go +++ b/tempodb/encoding/vparquet5/create_test.go @@ -48,48 +48,6 @@ func TestCreateBlockHonorsTraceStartEndTimesFromWalMeta(t *testing.T) { require.Equal(t, 305, int(outMeta.EndTime.Unix())) } -// func TestEstimateTraceSize(t *testing.T) { -// f := "" -// file, err := os.OpenFile(f, os.O_RDONLY, 0644) -// require.NoError(t, err) - -// count := 10000 - -// totalProtoSz := 0 -// totalParqSz := 0 - -// r := parquet.NewGenericReader[*Trace](file) -// tr := make([]*Trace, 1) -// for { -// count-- -// if count == 0 { -// break -// } - -// _, err := r.Read(tr) -// require.NoError(t, err) - -// if tr[0] == nil { -// break -// } -// protoTr, err := parquetTraceToTempopbTrace(tr[0]) -// require.NoError(t, err) - -// protoSz := protoTr.Size() -// parqSz := estimateTraceSize(tr[0]) - -// totalProtoSz += protoSz -// totalParqSz += parqSz - -// if float64(parqSz)/float64(protoSz) < .7 || -// float64(parqSz)/float64(protoSz) > 1.3 { -// fmt.Println(protoTr) -// break -// } -// } -// fmt.Println(totalParqSz, totalProtoSz) -// } - type testIterator struct { traces []*tempopb.Trace } diff --git a/tempodb/encoding/vparquet5/dedicated_columns.go b/tempodb/encoding/vparquet5/dedicated_columns.go index 0e566bdaee9..0e5dc288769 100644 --- a/tempodb/encoding/vparquet5/dedicated_columns.go +++ b/tempodb/encoding/vparquet5/dedicated_columns.go @@ -51,6 +51,27 @@ var DedicatedResourceColumnPaths = map[backend.DedicatedColumnScope]map[backend. "rs.list.element.ss.list.element.Spans.list.element.DedicatedAttributes.Int05", }, }, + backend.DedicatedColumnScopeEvent: { + backend.DedicatedColumnTypeString: { + "rs.list.element.ss.list.element.Spans.list.element.Events.list.element.DedicatedAttributes.String01", + "rs.list.element.ss.list.element.Spans.list.element.Events.list.element.DedicatedAttributes.String02", + "rs.list.element.ss.list.element.Spans.list.element.Events.list.element.DedicatedAttributes.String03", + "rs.list.element.ss.list.element.Spans.list.element.Events.list.element.DedicatedAttributes.String04", + "rs.list.element.ss.list.element.Spans.list.element.Events.list.element.DedicatedAttributes.String05", + "rs.list.element.ss.list.element.Spans.list.element.Events.list.element.DedicatedAttributes.String06", + "rs.list.element.ss.list.element.Spans.list.element.Events.list.element.DedicatedAttributes.String07", + "rs.list.element.ss.list.element.Spans.list.element.Events.list.element.DedicatedAttributes.String08", + "rs.list.element.ss.list.element.Spans.list.element.Events.list.element.DedicatedAttributes.String09", + "rs.list.element.ss.list.element.Spans.list.element.Events.list.element.DedicatedAttributes.String10", + }, + backend.DedicatedColumnTypeInt: { + "rs.list.element.ss.list.element.Spans.list.element.Events.list.element.DedicatedAttributes.Int01", + "rs.list.element.ss.list.element.Spans.list.element.Events.list.element.DedicatedAttributes.Int02", + "rs.list.element.ss.list.element.Spans.list.element.Events.list.element.DedicatedAttributes.Int03", + "rs.list.element.ss.list.element.Spans.list.element.Events.list.element.DedicatedAttributes.Int04", + "rs.list.element.ss.list.element.Spans.list.element.Events.list.element.DedicatedAttributes.Int05", + }, + }, } type dedicatedColumn struct { @@ -58,6 +79,7 @@ type dedicatedColumn struct { ColumnPath string ColumnIndex int IsArray bool + IsBlob bool } func (dc *dedicatedColumn) readValue(attrs *DedicatedAttributes) *v1.AnyValue { @@ -173,6 +195,15 @@ func (dm *dedicatedColumnMapping) get(attr string) (dedicatedColumn, bool) { return col, ok } +func (dm *dedicatedColumnMapping) usesPath(path string) bool { + for _, col := range dm.mapping { + if col.ColumnPath == path { + return true + } + } + return false +} + func (dm *dedicatedColumnMapping) items() iter.Seq2[string, dedicatedColumn] { return func(yield func(string, dedicatedColumn) bool) { for _, k := range dm.keys { @@ -215,20 +246,22 @@ func dedicatedColumnsToColumnMapping(dedicatedColumns backend.DedicatedColumns, continue // skip if there are not enough spare columns } - var isArray bool + dc := dedicatedColumn{ + Type: c.Type, + ColumnPath: spareColumnPaths[i], + ColumnIndex: i, + } + for _, opt := range c.Options { - if opt == backend.DedicatedColumnOptionArray { - isArray = true - break + switch opt { + case backend.DedicatedColumnOptionArray: + dc.IsArray = true + case backend.DedicatedColumnOptionBlob: + dc.IsBlob = true } } - mapping.put(c.Name, dedicatedColumn{ - Type: c.Type, - ColumnPath: spareColumnPaths[i], - ColumnIndex: i, - IsArray: isArray, - }) + mapping.put(c.Name, dc) indexByType[c.Type]++ } } diff --git a/tempodb/encoding/vparquet5/dedicated_columns_test.go b/tempodb/encoding/vparquet5/dedicated_columns_test.go index 753fc04c9d0..d18b0edbf11 100644 --- a/tempodb/encoding/vparquet5/dedicated_columns_test.go +++ b/tempodb/encoding/vparquet5/dedicated_columns_test.go @@ -476,6 +476,7 @@ func TestFilterDedicatedColumns(t *testing.T) { {Scope: "span", Name: "span-two", Type: "string"}, {Scope: "resource", Name: "res-one", Type: "string"}, {Scope: "resource", Name: "res-two", Type: "string"}, + {Scope: "event", Name: "res-three", Type: "string"}, }, }, { diff --git a/tempodb/encoding/vparquet5/encoding.go b/tempodb/encoding/vparquet5/encoding.go index 231f7c62787..61efea1579b 100644 --- a/tempodb/encoding/vparquet5/encoding.go +++ b/tempodb/encoding/vparquet5/encoding.go @@ -9,7 +9,7 @@ import ( "github.com/grafana/tempo/tempodb/encoding/common" ) -const VersionString = "vParquet5-preview5" +const VersionString = "vParquet5-preview6" type Encoding struct{} diff --git a/tempodb/encoding/vparquet5/schema.go b/tempodb/encoding/vparquet5/schema.go index 0b1e683989c..77ce61f68a7 100644 --- a/tempodb/encoding/vparquet5/schema.go +++ b/tempodb/encoding/vparquet5/schema.go @@ -2,6 +2,7 @@ package vparquet5 import ( "bytes" + "strings" "github.com/golang/protobuf/jsonpb" //nolint:all //deprecated "github.com/parquet-go/parquet-go" @@ -72,6 +73,13 @@ const ( FieldSpanAttrValInt = "rs.list.element.ss.list.element.Spans.list.element.Attrs.list.element.ValueInt.list.element" FieldSpanAttrValDouble = "rs.list.element.ss.list.element.Spans.list.element.Attrs.list.element.ValueDouble.list.element" FieldSpanAttrValBool = "rs.list.element.ss.list.element.Spans.list.element.Attrs.list.element.ValueBool.list.element" + + FieldEventAttrKey = "rs.list.element.ss.list.element.Spans.list.element.Events.list.element.Attrs.list.element.Key" + FieldEventAttrIsArray = "rs.list.element.ss.list.element.Spans.list.element.Events.list.element.Attrs.list.element.IsArray" + FieldEventAttrVal = "rs.list.element.ss.list.element.Spans.list.element.Events.list.element.Attrs.list.element.Value.list.element" + FieldEventAttrValInt = "rs.list.element.ss.list.element.Spans.list.element.Events.list.element.Attrs.list.element.ValueInt.list.element" + FieldEventAttrValDouble = "rs.list.element.ss.list.element.Spans.list.element.Events.list.element.Attrs.list.element.ValueDouble.list.element" + FieldEventAttrValBool = "rs.list.element.ss.list.element.Spans.list.element.Events.list.element.Attrs.list.element.ValueBool.list.element" ) const ( @@ -100,8 +108,6 @@ var ( traceqlResourceLabelMappings = map[string]string{ LabelServiceName: "rs.list.element.Resource.ServiceName", } - - parquetSchema = parquet.SchemaOf(&Trace{}) ) type Attribute struct { @@ -157,6 +163,8 @@ type Event struct { Name string `parquet:",snappy,dict"` Attrs []Attribute `parquet:",list"` DroppedAttributesCount int32 `parquet:",snappy,delta"` + + DedicatedAttributes DedicatedAttributes `parquet:""` } type Link struct { @@ -350,11 +358,12 @@ func traceToParquet(meta *backend.BlockMeta, id common.ID, tr *tempopb.Trace, ot // Dedicated attribute column assignments dedicatedResourceAttributes := dedicatedColumnsToColumnMapping(meta.DedicatedColumns, backend.DedicatedColumnScopeResource) dedicatedSpanAttributes := dedicatedColumnsToColumnMapping(meta.DedicatedColumns, backend.DedicatedColumnScopeSpan) + dedicatedEventAttributes := dedicatedColumnsToColumnMapping(meta.DedicatedColumns, backend.DedicatedColumnScopeEvent) - return traceToParquetWithMapping(id, tr, ot, dedicatedResourceAttributes, dedicatedSpanAttributes) + return traceToParquetWithMapping(id, tr, ot, dedicatedResourceAttributes, dedicatedSpanAttributes, dedicatedEventAttributes) } -func traceToParquetWithMapping(id common.ID, tr *tempopb.Trace, ot *Trace, dedicatedResourceAttributes, dedicatedSpanAttributes dedicatedColumnMapping) (*Trace, bool) { +func traceToParquetWithMapping(id common.ID, tr *tempopb.Trace, ot *Trace, dedicatedResourceAttributes, dedicatedSpanAttributes, dedicatedEventAttributes dedicatedColumnMapping) (*Trace, bool) { if ot == nil { ot = &Trace{} } @@ -442,7 +451,7 @@ func traceToParquetWithMapping(id common.ID, tr *tempopb.Trace, ot *Trace, dedic ss.Events = extendReuseSlice(len(s.Events), ss.Events) for ie, e := range s.Events { - eventToParquet(e, &ss.Events[ie], s.StartTimeUnixNano) + eventToParquet(e, &ss.Events[ie], s.StartTimeUnixNano, dedicatedEventAttributes) } // nested set values do not come from the proto, they are calculated @@ -486,24 +495,7 @@ func traceToParquetWithMapping(id common.ID, tr *tempopb.Trace, ot *Trace, dedic } ss.DroppedLinksCount = int32(s.DroppedLinksCount) - - ss.Attrs = extendReuseSlice(len(s.Attributes), ss.Attrs) - attrCount := 0 - for _, a := range s.Attributes { - written := false - - // Dynamically assigned dedicated span attribute columns - if spareColumn, exists := dedicatedSpanAttributes.get(a.Key); exists { - written = spareColumn.writeValue(&ss.DedicatedAttributes, a.Value) - } - - if !written { - // Other attributes put in generic columns - attrToParquet(a, &ss.Attrs[attrCount]) - attrCount++ - } - } - ss.Attrs = ss.Attrs[:attrCount] + writeAttrs(s.Attributes, &ss.Attrs, &ss.DedicatedAttributes, dedicatedSpanAttributes) } } } @@ -528,6 +520,25 @@ func traceToParquetWithMapping(id common.ID, tr *tempopb.Trace, ot *Trace, dedic return finalizeTrace(ot) } +func writeAttrs(input []*v1.KeyValue, generic *[]Attribute, dedicated *DedicatedAttributes, mapping dedicatedColumnMapping) { + *generic = extendReuseSlice(len(input), *generic) + + attrCount := 0 + for _, a := range input { + written := false + + if spareColumn, exists := mapping.get(a.Key); exists { + written = spareColumn.writeValue(dedicated, a.Value) + } + + if !written { + attrToParquet(a, &(*generic)[attrCount]) + attrCount++ + } + } + *generic = (*generic)[:attrCount] +} + // finalizeTrace augments and optimized the trace by calculating service stats, nested set model bounds // and removing redundant scope spans and resource spans. The function returns the modified trace as well // as a boolean indicating whether the trace is a connected graph. @@ -555,15 +566,11 @@ func instrumentationScopeToParquet(s *v1.InstrumentationScope, ss *Instrumentati } } -func eventToParquet(e *v1_trace.Span_Event, ee *Event, spanStartTime uint64) { +func eventToParquet(e *v1_trace.Span_Event, ee *Event, spanStartTime uint64, dedicatedEventAttributes dedicatedColumnMapping) { ee.Name = e.Name ee.TimeSinceStartNano = e.TimeUnixNano - spanStartTime ee.DroppedAttributesCount = int32(e.DroppedAttributesCount) - - ee.Attrs = extendReuseSlice(len(e.Attributes), ee.Attrs) - for i, a := range e.Attributes { - attrToParquet(a, &ee.Attrs[i]) - } + writeAttrs(e.Attributes, &ee.Attrs, &ee.DedicatedAttributes, dedicatedEventAttributes) } func linkToParquet(l *v1_trace.Span_Link, ll *Link) { @@ -836,3 +843,79 @@ func extendReuseSlice[T any](sz int, in []T) []T { in = in[:cap(in)] return append(in, make([]T, sz-len(in))...) } + +func SchemaWithDynamicChanges(dedicatedColumns backend.DedicatedColumns) (*parquet.Schema, []parquet.WriterOption, []parquet.ReaderOption) { + var ( + resMapping = dedicatedColumnsToColumnMapping(dedicatedColumns, backend.DedicatedColumnScopeResource) + spanMapping = dedicatedColumnsToColumnMapping(dedicatedColumns, backend.DedicatedColumnScopeSpan) + eventMapping = dedicatedColumnsToColumnMapping(dedicatedColumns, backend.DedicatedColumnScopeEvent) + ) + + schemaOptions := []parquet.SchemaOption{} + writerOptions := []parquet.WriterOption{} + readerOptions := []parquet.ReaderOption{} + + // Blobify + blobify := func(col dedicatedColumn) { + path := strings.Split(col.ColumnPath, ".") + + // Remove dictionary encoding and change compression. + option := parquet.StructTag(`parquet:",zstd,optional"`, path...) + schemaOptions = append(schemaOptions, option) + readerOptions = append(readerOptions, option) + writerOptions = append(writerOptions, option) + + // Minor optimization: skip page bounds for blob columns. The min/max values are not + // selective, and this saves a little bit of storage and overhead. + writerOptions = append(writerOptions, parquet.SkipPageBounds(path...)) + } + + for _, col := range spanMapping.mapping { + if col.IsBlob { + blobify(col) + } + } + for _, col := range resMapping.mapping { + if col.IsBlob { + blobify(col) + } + } + for _, col := range eventMapping.mapping { + if col.IsBlob { + blobify(col) + } + } + + // Remove unused dedicated columns. + del := func(path string) { + option := parquet.StructTag(`parquet:"-"`, strings.Split(path, ".")...) + schemaOptions = append(schemaOptions, option) + readerOptions = append(readerOptions, option) + writerOptions = append(writerOptions, option) + } + + for scope, m1 := range DedicatedResourceColumnPaths { + for _, paths := range m1 { + for _, path := range paths { + switch scope { + case backend.DedicatedColumnScopeResource: + if !resMapping.usesPath(path) { + del(path) + } + case backend.DedicatedColumnScopeSpan: + if !spanMapping.usesPath(path) { + del(path) + } + case backend.DedicatedColumnScopeEvent: + if !eventMapping.usesPath(path) { + del(path) + } + } + } + } + } + + schema := parquet.SchemaOf(&Trace{}, schemaOptions...) + + return schema, writerOptions, readerOptions +} diff --git a/tempodb/encoding/vparquet5/schema_test.go b/tempodb/encoding/vparquet5/schema_test.go index 28745d84f2c..ddb0c1e1558 100644 --- a/tempodb/encoding/vparquet5/schema_test.go +++ b/tempodb/encoding/vparquet5/schema_test.go @@ -802,7 +802,7 @@ func BenchmarkEventToParquet(b *testing.B) { ee := &Event{} for i := 0; i < b.N; i++ { - eventToParquet(e, ee, s.StartTimeUnixNano) + eventToParquet(e, ee, s.StartTimeUnixNano, dedicatedColumnMapping{}) } } diff --git a/tempodb/encoding/vparquet5/wal_block.go b/tempodb/encoding/vparquet5/wal_block.go index f7c74e72e04..ebd0eef987b 100644 --- a/tempodb/encoding/vparquet5/wal_block.go +++ b/tempodb/encoding/vparquet5/wal_block.go @@ -85,6 +85,7 @@ func openWALBlock(filename, path string, ingestionSlack, _ time.Duration) (commo ingestionSlack: ingestionSlack, dedcolsRes: dedicatedColumnsToColumnMapping(meta.DedicatedColumns, backend.DedicatedColumnScopeResource), dedcolsSpan: dedicatedColumnsToColumnMapping(meta.DedicatedColumns, backend.DedicatedColumnScopeSpan), + dedcolsEvent: dedicatedColumnsToColumnMapping(meta.DedicatedColumns, backend.DedicatedColumnScopeEvent), } // read all files in dir @@ -110,7 +111,7 @@ func openWALBlock(filename, path string, ingestionSlack, _ time.Duration) (commo } path := filepath.Join(dir, f.Name()) - page := newWalBlockFlush(path, nil) + page := newWalBlockFlush(path, nil, meta.DedicatedColumns) file, err := page.file(context.Background()) if err != nil { @@ -171,6 +172,7 @@ func createWALBlock(meta *backend.BlockMeta, filepath, dataEncoding string, inge ingestionSlack: ingestionSlack, dedcolsRes: dedicatedColumnsToColumnMapping(meta.DedicatedColumns, backend.DedicatedColumnScopeResource), dedcolsSpan: dedicatedColumnsToColumnMapping(meta.DedicatedColumns, backend.DedicatedColumnScopeSpan), + dedcolsEvent: dedicatedColumnsToColumnMapping(meta.DedicatedColumns, backend.DedicatedColumnScopeEvent), } // build folder @@ -205,14 +207,16 @@ func ownsWALBlock(entry fs.DirEntry) bool { } type walBlockFlush struct { - path string - ids *common.IDMap[int64] + path string + ids *common.IDMap[int64] + dedcols backend.DedicatedColumns } -func newWalBlockFlush(path string, ids *common.IDMap[int64]) *walBlockFlush { +func newWalBlockFlush(path string, ids *common.IDMap[int64], dedcols backend.DedicatedColumns) *walBlockFlush { return &walBlockFlush{ - path: path, - ids: ids, + path: path, + ids: ids, + dedcols: dedcols, } } @@ -234,11 +238,13 @@ func (w *walBlockFlush) file(ctx context.Context) (*pageFile, error) { } size := info.Size() + sch, _, _ := SchemaWithDynamicChanges(w.dedcols) + wr := newWalReaderAt(ctx, file) o := []parquet.FileOption{ parquet.SkipBloomFilters(true), parquet.SkipPageIndex(true), - parquet.FileSchema(parquetSchema), + parquet.FileSchema(sch), } pf, err := parquet.OpenFile(wr, size, o...) @@ -296,6 +302,7 @@ type walBlock struct { ingestionSlack time.Duration dedcolsRes dedicatedColumnMapping dedcolsSpan dedicatedColumnMapping + dedcolsEvent dedicatedColumnMapping // Unflushed data buffer *Trace @@ -347,7 +354,7 @@ func (b *walBlock) IngestionSlack() time.Duration { func (b *walBlock) AppendTrace(id common.ID, trace *tempopb.Trace, start, end uint32, adjustIngestionSlack bool) error { var connected bool - b.buffer, connected = traceToParquetWithMapping(id, trace, b.buffer, b.dedcolsRes, b.dedcolsSpan) + b.buffer, connected = traceToParquetWithMapping(id, trace, b.buffer, b.dedcolsRes, b.dedcolsSpan, b.dedcolsEvent) if !connected { dataquality.WarnDisconnectedTrace(b.meta.TenantID, dataquality.PhaseTraceFlushedToWal) } @@ -394,12 +401,13 @@ func (b *walBlock) openWriter() (err error) { } if b.writer == nil { - b.writer = parquet.NewGenericWriter[*Trace](b.file, &parquet.WriterConfig{ - Schema: parquetSchema, - // setting this value low massively reduces the amount of static memory we hold onto in highly multi-tenant environments at the cost of - // cutting pages more aggressively when writing column chunks - PageBufferSize: 1024, - }) + _, writerOptions, _ := SchemaWithDynamicChanges(b.meta.DedicatedColumns) + + // setting this value low massively reduces the amount of static memory we hold onto in highly multi-tenant environments at the cost of + // cutting pages more aggressively when writing column chunks + writerOptions = append(writerOptions, parquet.PageBufferSize(1024)) + + b.writer = parquet.NewGenericWriter[*Trace](b.file, writerOptions...) } else { b.writer.Reset(b.file) } @@ -444,7 +452,7 @@ func (b *walBlock) Flush() (err error) { return fmt.Errorf("error closing file: %w", err) } - b.writeFlush(newWalBlockFlush(b.file.Name(), b.ids)) + b.writeFlush(newWalBlockFlush(b.file.Name(), b.ids, b.meta.DedicatedColumns)) b.flushedSize += sz b.unflushedSize = 0 b.ids = common.NewIDMap[int64](b.ids.Len()) // Recreate new id map with same expected size @@ -470,7 +478,8 @@ func (b *walBlock) Iterator() (common.Iterator, error) { bookmarks = append(bookmarks, newBookmark[parquet.Row](iter)) } - sch := parquet.SchemaOf(new(Trace)) + sch, _, _ := SchemaWithDynamicChanges(b.meta.DedicatedColumns) + iter := newMultiblockIterator(bookmarks, func(rows []parquet.Row) (parquet.Row, error) { if len(rows) == 1 { return rows[0], nil @@ -530,7 +539,9 @@ func (b *walBlock) FindTraceByID(ctx context.Context, id common.ID, opts common. defer file.Close() pf := file.parquetFile - r := parquet.NewReader(pf) + _, _, readerOptions := SchemaWithDynamicChanges(page.dedcols) + + r := parquet.NewGenericReader[*Trace](pf, readerOptions...) defer r.Close() err = r.SeekToRow(rowNumber) @@ -539,8 +550,8 @@ func (b *walBlock) FindTraceByID(ctx context.Context, id common.ID, opts common. } tr := new(Trace) - err = r.Read(tr) - if err != nil { + n, err := r.Read([]*Trace{tr}) + if n == 0 || (err != nil && !errors.Is(err, io.EOF)) { return nil, fmt.Errorf("error reading row from backend: %w", err) } diff --git a/tempodb/tempodb_search_test.go b/tempodb/tempodb_search_test.go index 3edfad3bfc7..ed42dd126d7 100644 --- a/tempodb/tempodb_search_test.go +++ b/tempodb/tempodb_search_test.go @@ -1639,13 +1639,15 @@ func tagNamesRunner(t *testing.T, _ *tempopb.Trace, _ *tempopb.TraceSearchMetada if (bm.Version == vparquet4.VersionString || bm.Version == vparquet5.VersionString) && (tc.name == "resource match" || tc.name == "span match") { // v4 has scope, events, and links tc.expected["instrumentation"] = []string{"scope-attr-str"} - tc.expected["event"] = []string{"exception.message"} + tc.expected["event"] = []string{"event-dedicated.01", "exception.message"} tc.expected["link"] = []string{"relation"} } if bm.Version == vparquet5.VersionString && tc.name == "no matches" { // vp5 does not have well-known attribute columns + // Include all expected dedicated attribute names tc.expected["resource"] = []string{"res-dedicated.01", "res-dedicated.02", "service.name"} tc.expected["span"] = []string{"span-dedicated.01", "span-dedicated.02"} + tc.expected["event"] = []string{"event-dedicated.01"} } require.Equal(t, len(tc.expected), len(actualMap)) @@ -1912,6 +1914,7 @@ func runCompleteBlockSearchTest(t *testing.T, blockVersion string, runners ...ru {Scope: "resource", Name: "res-dedicated.02", Type: "string"}, {Scope: "span", Name: "span-dedicated.01", Type: "string"}, {Scope: "span", Name: "span-dedicated.02", Type: "string"}, + {Scope: "event", Name: "event-dedicated.01", Type: "string"}, } r, w, c, err := New(testingConfig(tempDir, blockVersion, dc), nil, log.NewNopLogger()) require.NoError(t, err) @@ -1975,7 +1978,15 @@ func runEventLinkInstrumentationSearchTest(t *testing.T, blockVersion string) { tempDir := t.TempDir() - r, w, c, err := New(testingConfig(tempDir, blockVersion, nil), nil, log.NewNopLogger()) + dc := backend.DedicatedColumns{ + {Scope: "resource", Name: "res-dedicated.01", Type: "string"}, + {Scope: "resource", Name: "res-dedicated.02", Type: "string"}, + {Scope: "span", Name: "span-dedicated.01", Type: "string"}, + {Scope: "span", Name: "span-dedicated.02", Type: "string"}, + {Scope: "event", Name: "event-dedicated.01", Type: "string"}, + } + + r, w, c, err := New(testingConfig(tempDir, blockVersion, dc), nil, log.NewNopLogger()) require.NoError(t, err) err = c.EnableCompaction(context.Background(), testingCompactorConfig, &mockSharder{}, &mockOverrides{}) @@ -1990,8 +2001,13 @@ func runEventLinkInstrumentationSearchTest(t *testing.T, blockVersion string) { searchesThatMatch := []*tempopb.SearchRequest{ { + // Event generic attribute Query: "{ event.exception.message = `random error` }", }, + { + // Dedicated column + Query: "{ event.event-dedicated.01 = `event-1a` }", + }, { Query: "{ event:name = `event name` }", }, @@ -2018,7 +2034,7 @@ func runEventLinkInstrumentationSearchTest(t *testing.T, blockVersion string) { // Write to wal wal := w.WAL() - meta := &backend.BlockMeta{BlockID: backend.NewUUID(), TenantID: testTenantID} + meta := &backend.BlockMeta{BlockID: backend.NewUUID(), TenantID: testTenantID, DedicatedColumns: dc} head, err := wal.NewBlock(meta, model.CurrentEncoding) require.NoError(t, err) dec := model.MustNewSegmentDecoder(model.CurrentEncoding) @@ -2223,6 +2239,7 @@ func makeExpectedTrace(traceID []byte) ( Name: "event name", Attributes: []*v1_common.KeyValue{ stringKV("exception.message", "random error"), + stringKV("event-dedicated.01", "event-1a"), }, }, }, @@ -2381,7 +2398,7 @@ func searchTestSuite() ( makeReq("http.url", "url/Hello/World"), makeReq("status.code", "error"), - // Dedicated span and resource attributes + // Dedicated attributes makeReq("res-dedicated.01", "res-1a"), makeReq("res-dedicated.02", "res-2b"), makeReq("span-dedicated.01", "span-1a"),