diff --git a/CHANGELOG.md b/CHANGELOG.md index d9c7ab76060..4e334f2b1ff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,7 @@ * [ENHANCEMENT] Implement trace comparison in Vulture [#904](https://github.com/grafana/tempo/pull/904) (@zalegrala) * [ENHANCEMENT] Dedupe search records while replaying WAL [#940](https://github.com/grafana/tempo/pull/940) (@annanay25) * [ENHANCEMENT] Add status endpoint to list the available endpoints [#938](https://github.com/grafana/tempo/pull/938) (@zalegrala) +* [ENHANCEMENT] Add search block headers [#943](https://github.com/grafana/tempo/pull/943) (@mdisibio) * [CHANGE] Renamed CLI flag from `--storage.trace.maintenance-cycle` to `--storage.trace.blocklist_poll`. This is a **breaking change** [#897](https://github.com/grafana/tempo/pull/897) (@mritunjaysharma394) * [CHANGE] update jsonnet alerts and recording rules to use `job_selectors` and `cluster_selectors` for configurable unique identifier labels [#935](https://github.com/grafana/tempo/pull/935) (@kevinschoonover) * [CHANGE] Modify generated tag keys in Vulture for easier filtering [#934](https://github.com/grafana/tempo/pull/934) (@zalegrala) diff --git a/modules/ingester/instance_search.go b/modules/ingester/instance_search.go index 0159b33e4fe..6274efe8d99 100644 --- a/modules/ingester/instance_search.go +++ b/modules/ingester/instance_search.go @@ -61,6 +61,7 @@ func (i *instance) Search(ctx context.Context, req *tempopb.SearchRequest) (*tem InspectedTraces: sr.TracesInspected(), InspectedBytes: sr.BytesInspected(), InspectedBlocks: sr.BlocksInspected(), + SkippedBlocks: sr.BlocksSkipped(), }, }, nil } diff --git a/modules/ingester/instance_search_test.go b/modules/ingester/instance_search_test.go index f70a4483a93..eae5950828a 100644 --- a/modules/ingester/instance_search_test.go +++ b/modules/ingester/instance_search_test.go @@ -17,6 +17,7 @@ import ( "github.com/grafana/tempo/pkg/tempopb" "github.com/grafana/tempo/pkg/util" "github.com/grafana/tempo/pkg/util/test" + "github.com/grafana/tempo/tempodb/search" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -357,7 +358,8 @@ func TestInstanceSearchMetrics(t *testing.T) { search := func() *tempopb.SearchMetrics { sr, err := i.Search(context.Background(), &tempopb.SearchRequest{ - Tags: map[string]string{"nomatch": "nomatch"}, + // Exhaustive search + Tags: map[string]string{search.SecretExhaustiveSearchTag: "!"}, }) require.NoError(t, err) return sr.Metrics diff --git a/modules/querier/querier.go b/modules/querier/querier.go index 964a5c0e7bb..7332ef44d06 100644 --- a/modules/querier/querier.go +++ b/modules/querier/querier.go @@ -375,6 +375,7 @@ func (q *Querier) postProcessSearchResults(req *tempopb.SearchRequest, rr []resp response.Metrics.InspectedBytes += sr.Metrics.InspectedBytes response.Metrics.InspectedTraces += sr.Metrics.InspectedTraces response.Metrics.InspectedBlocks += sr.Metrics.InspectedBlocks + response.Metrics.SkippedBlocks += sr.Metrics.SkippedBlocks } } diff --git a/pkg/tempofb/SearchBlockHeader.go b/pkg/tempofb/SearchBlockHeader.go new file mode 100644 index 00000000000..70a0b345a2f --- /dev/null +++ b/pkg/tempofb/SearchBlockHeader.go @@ -0,0 +1,97 @@ +// Code generated by the FlatBuffers compiler. DO NOT EDIT. + +package tempofb + +import ( + flatbuffers "github.com/google/flatbuffers/go" +) + +type SearchBlockHeader struct { + _tab flatbuffers.Table +} + +func GetRootAsSearchBlockHeader(buf []byte, offset flatbuffers.UOffsetT) *SearchBlockHeader { + n := flatbuffers.GetUOffsetT(buf[offset:]) + x := &SearchBlockHeader{} + x.Init(buf, n+offset) + return x +} + +func GetSizePrefixedRootAsSearchBlockHeader(buf []byte, offset flatbuffers.UOffsetT) *SearchBlockHeader { + n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:]) + x := &SearchBlockHeader{} + x.Init(buf, n+offset+flatbuffers.SizeUint32) + return x +} + +func (rcv *SearchBlockHeader) Init(buf []byte, i flatbuffers.UOffsetT) { + rcv._tab.Bytes = buf + rcv._tab.Pos = i +} + +func (rcv *SearchBlockHeader) Table() flatbuffers.Table { + return rcv._tab +} + +func (rcv *SearchBlockHeader) Tags(obj *KeyValues, j int) bool { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + x := rcv._tab.Vector(o) + x += flatbuffers.UOffsetT(j) * 4 + x = rcv._tab.Indirect(x) + obj.Init(rcv._tab.Bytes, x) + return true + } + return false +} + +func (rcv *SearchBlockHeader) TagsLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func (rcv *SearchBlockHeader) MinDurationNanos() uint64 { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + return rcv._tab.GetUint64(o + rcv._tab.Pos) + } + return 0 +} + +func (rcv *SearchBlockHeader) MutateMinDurationNanos(n uint64) bool { + return rcv._tab.MutateUint64Slot(6, n) +} + +func (rcv *SearchBlockHeader) MaxDurationNanos() uint64 { + o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) + if o != 0 { + return rcv._tab.GetUint64(o + rcv._tab.Pos) + } + return 0 +} + +func (rcv *SearchBlockHeader) MutateMaxDurationNanos(n uint64) bool { + return rcv._tab.MutateUint64Slot(8, n) +} + +func SearchBlockHeaderStart(builder *flatbuffers.Builder) { + builder.StartObject(3) +} +func SearchBlockHeaderAddTags(builder *flatbuffers.Builder, tags flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(tags), 0) +} +func SearchBlockHeaderStartTagsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { + return builder.StartVector(4, numElems, 4) +} +func SearchBlockHeaderAddMinDurationNanos(builder *flatbuffers.Builder, minDurationNanos uint64) { + builder.PrependUint64Slot(1, minDurationNanos, 0) +} +func SearchBlockHeaderAddMaxDurationNanos(builder *flatbuffers.Builder, maxDurationNanos uint64) { + builder.PrependUint64Slot(2, maxDurationNanos, 0) +} +func SearchBlockHeaderEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { + return builder.EndObject() +} diff --git a/pkg/tempofb/SearchBlockHeader_util.go b/pkg/tempofb/SearchBlockHeader_util.go new file mode 100644 index 00000000000..82cf22305e3 --- /dev/null +++ b/pkg/tempofb/SearchBlockHeader_util.go @@ -0,0 +1,56 @@ +package tempofb + +import flatbuffers "github.com/google/flatbuffers/go" + +type SearchBlockHeaderBuilder struct { + Tags SearchDataMap + MinDur uint64 + MaxDur uint64 +} + +func NewSearchBlockHeaderBuilder() *SearchBlockHeaderBuilder { + return &SearchBlockHeaderBuilder{ + Tags: SearchDataMap{}, + } +} + +func (s *SearchBlockHeaderBuilder) AddEntry(e *SearchEntry) { + + kv := &KeyValues{} //buffer + + // Record all unique keyvalues + for i, ii := 0, e.TagsLength(); i < ii; i++ { + e.Tags(kv, i) + for j, jj := 0, kv.ValueLength(); j < jj; j++ { + s.AddTag(string(kv.Key()), string(kv.Value(j))) + } + } + + // Record min/max durations + dur := e.EndTimeUnixNano() - e.StartTimeUnixNano() + if s.MinDur == 0 || dur < s.MinDur { + s.MinDur = dur + } + if dur > s.MaxDur { + s.MaxDur = dur + } +} + +// AddTag adds the unique tag name and value to the search data. No effect if the pair is already present. +func (s *SearchBlockHeaderBuilder) AddTag(k string, v string) { + s.Tags.Add(k, v) +} + +func (s *SearchBlockHeaderBuilder) ToBytes() []byte { + b := flatbuffers.NewBuilder(1024) + + tags := s.Tags.WriteToBuilder(b) + + SearchBlockHeaderStart(b) + SearchBlockHeaderAddMinDurationNanos(b, s.MinDur) + SearchBlockHeaderAddMaxDurationNanos(b, s.MaxDur) + SearchBlockHeaderAddTags(b, tags) + offset := SearchBlockHeaderEnd(b) + b.Finish(offset) + return b.FinishedBytes() +} diff --git a/pkg/tempofb/tempo.fbs b/pkg/tempofb/tempo.fbs index 8a9b74e060f..1a0533ec422 100644 --- a/pkg/tempofb/tempo.fbs +++ b/pkg/tempofb/tempo.fbs @@ -22,4 +22,16 @@ table SearchPage { // Trace entries entries : [SearchEntry]; +} + +table SearchBlockHeader { + // This is a rollup of all distinct tags/values in the + // block for quick elimination. + tags : [KeyValues]; + + // Smallest trace duration in the block + min_duration_nanos: uint64; + + // Largest trace duration in the block + max_duration_nanos: uint64; } \ No newline at end of file diff --git a/pkg/tempopb/tempo.pb.go b/pkg/tempopb/tempo.pb.go index 5f5bea0ac0a..b941694a6e2 100644 --- a/pkg/tempopb/tempo.pb.go +++ b/pkg/tempopb/tempo.pb.go @@ -342,6 +342,7 @@ type SearchMetrics struct { InspectedTraces uint32 `protobuf:"varint,1,opt,name=inspectedTraces,proto3" json:"inspectedTraces,omitempty"` InspectedBytes uint64 `protobuf:"varint,2,opt,name=inspectedBytes,proto3" json:"inspectedBytes,omitempty"` InspectedBlocks uint32 `protobuf:"varint,3,opt,name=inspectedBlocks,proto3" json:"inspectedBlocks,omitempty"` + SkippedBlocks uint32 `protobuf:"varint,4,opt,name=skippedBlocks,proto3" json:"skippedBlocks,omitempty"` } func (m *SearchMetrics) Reset() { *m = SearchMetrics{} } @@ -398,6 +399,13 @@ func (m *SearchMetrics) GetInspectedBlocks() uint32 { return 0 } +func (m *SearchMetrics) GetSkippedBlocks() uint32 { + if m != nil { + return m.SkippedBlocks + } + return 0 +} + type SearchTagsRequest struct { } @@ -802,62 +810,63 @@ func init() { func init() { proto.RegisterFile("pkg/tempopb/tempo.proto", fileDescriptor_f22805646f4f62b6) } var fileDescriptor_f22805646f4f62b6 = []byte{ - // 880 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x55, 0x4f, 0x8f, 0xda, 0x46, - 0x14, 0xc7, 0x0b, 0x2c, 0xeb, 0xc7, 0x9f, 0xec, 0x4e, 0x36, 0xe0, 0xba, 0x2b, 0x40, 0xd6, 0xaa, - 0xe5, 0xd0, 0x40, 0x42, 0xba, 0x4a, 0x93, 0x1e, 0x2a, 0x21, 0xd2, 0x36, 0x52, 0x89, 0x52, 0x43, - 0x73, 0x1f, 0xcc, 0x94, 0xb5, 0x16, 0x6c, 0x32, 0x1e, 0xa3, 0xe5, 0xd6, 0x53, 0x2f, 0xbd, 0xf4, - 0xab, 0xf4, 0x5b, 0xe4, 0x52, 0x29, 0xa7, 0xaa, 0xea, 0x21, 0xaa, 0x76, 0xa5, 0x7e, 0x8e, 0x6a, - 0x66, 0xec, 0xc1, 0x36, 0x24, 0x39, 0x31, 0xef, 0xf7, 0x7e, 0xef, 0xf9, 0xbd, 0xdf, 0xbc, 0x37, - 0x40, 0x63, 0x75, 0x35, 0xef, 0x31, 0xb2, 0x5c, 0xf9, 0xab, 0xa9, 0xfc, 0xed, 0xae, 0xa8, 0xcf, - 0x7c, 0x54, 0x8a, 0x40, 0xf3, 0x94, 0x51, 0xec, 0x90, 0xde, 0xfa, 0x61, 0x4f, 0x1c, 0xa4, 0xdb, - 0xbc, 0x3f, 0x77, 0xd9, 0x65, 0x38, 0xed, 0x3a, 0xfe, 0xb2, 0x37, 0xf7, 0xe7, 0x7e, 0x4f, 0xc0, - 0xd3, 0xf0, 0x67, 0x61, 0x09, 0x43, 0x9c, 0x24, 0xdd, 0xfa, 0x55, 0x83, 0xe3, 0x09, 0x0f, 0x1f, - 0x6c, 0x9e, 0x0f, 0x6d, 0xf2, 0x3a, 0x24, 0x01, 0x43, 0x06, 0x94, 0x44, 0xca, 0xe7, 0x43, 0x43, - 0x6b, 0x6b, 0x9d, 0x8a, 0x1d, 0x9b, 0xa8, 0x09, 0x30, 0x5d, 0xf8, 0xce, 0xd5, 0x98, 0x61, 0xca, - 0x8c, 0x83, 0xb6, 0xd6, 0xd1, 0xed, 0x04, 0x82, 0x4c, 0x38, 0x12, 0xd6, 0x33, 0x6f, 0x66, 0xe4, - 0x85, 0x57, 0xd9, 0xe8, 0x0c, 0xf4, 0xd7, 0x21, 0xa1, 0x9b, 0x91, 0x3f, 0x23, 0x46, 0x51, 0x38, - 0xb7, 0x80, 0xf5, 0x04, 0x4e, 0x12, 0x75, 0x04, 0x2b, 0xdf, 0x0b, 0x08, 0x3a, 0x87, 0xa2, 0xf8, - 0xb2, 0x28, 0xa3, 0xdc, 0xaf, 0x75, 0xa3, 0xde, 0xbb, 0x82, 0x6a, 0x4b, 0xa7, 0xf5, 0x9f, 0x06, - 0xd5, 0x31, 0xc1, 0xd4, 0xb9, 0x8c, 0x1b, 0x78, 0x0a, 0x85, 0x09, 0x9e, 0x07, 0x86, 0xd6, 0xce, - 0x77, 0xca, 0xfd, 0xb6, 0x0a, 0x4b, 0xb1, 0xba, 0x9c, 0xf2, 0xcc, 0x63, 0x74, 0x33, 0x28, 0xbc, - 0x79, 0xd7, 0xca, 0xd9, 0x22, 0x06, 0x9d, 0x43, 0x75, 0xe4, 0x7a, 0xc3, 0x90, 0x62, 0xe6, 0xfa, - 0xde, 0x28, 0x10, 0x5d, 0x56, 0xed, 0x34, 0x28, 0x58, 0xf8, 0x3a, 0xc1, 0xca, 0x47, 0xac, 0x24, - 0x88, 0x4e, 0xa1, 0xf8, 0x83, 0xbb, 0x74, 0x99, 0x51, 0x10, 0x5e, 0x69, 0x98, 0x8f, 0x41, 0x57, - 0x9f, 0x46, 0xc7, 0x90, 0xbf, 0x22, 0x1b, 0xd1, 0xa0, 0x6e, 0xf3, 0x23, 0x0f, 0x5a, 0xe3, 0x45, - 0x48, 0x22, 0x79, 0xa5, 0xf1, 0xf4, 0xe0, 0x2b, 0xcd, 0xba, 0x86, 0x5a, 0xdc, 0x41, 0x24, 0xd0, - 0x97, 0x70, 0x28, 0x34, 0x88, 0x5b, 0x3d, 0x4b, 0x2b, 0x24, 0xd9, 0x23, 0xc2, 0xf0, 0x0c, 0x33, - 0x6c, 0x47, 0x5c, 0xf4, 0x00, 0x4a, 0x4b, 0xc2, 0xa8, 0xeb, 0xc8, 0xe6, 0xca, 0xfd, 0x7a, 0x46, - 0xa1, 0x91, 0xf4, 0xda, 0x31, 0xcd, 0xfa, 0x53, 0x83, 0xbb, 0x7b, 0x32, 0x66, 0x27, 0x45, 0xdf, - 0x4e, 0x4a, 0x07, 0xee, 0x50, 0xdf, 0x67, 0x63, 0x42, 0xd7, 0xae, 0x43, 0x5e, 0xe0, 0x65, 0xdc, - 0x4f, 0x16, 0xe6, 0x52, 0x72, 0x48, 0xa4, 0x17, 0x3c, 0x39, 0x38, 0x69, 0x10, 0x7d, 0x01, 0x27, - 0x01, 0x1f, 0xb1, 0x89, 0xbb, 0x24, 0x3f, 0x79, 0xee, 0xf5, 0x0b, 0xec, 0xf9, 0x42, 0xd6, 0x82, - 0xbd, 0xeb, 0xe0, 0x73, 0x3a, 0xdb, 0xde, 0x4d, 0x51, 0xa8, 0x9f, 0x40, 0xac, 0xdf, 0xd4, 0xc8, - 0x44, 0xad, 0xf2, 0x7a, 0x5d, 0x2f, 0x58, 0x11, 0x87, 0x91, 0xd9, 0x24, 0x96, 0x94, 0x87, 0x65, - 0x61, 0xf4, 0x19, 0xd4, 0x14, 0x34, 0xd8, 0x30, 0x22, 0x45, 0x2c, 0xd8, 0x19, 0x34, 0x95, 0x71, - 0xc0, 0x97, 0x20, 0x1e, 0x92, 0x2c, 0x6c, 0xdd, 0x85, 0x13, 0x59, 0x0c, 0x1f, 0x8b, 0x68, 0x3a, - 0xad, 0x07, 0x80, 0x92, 0x60, 0x74, 0xe1, 0x26, 0x1c, 0x31, 0x3c, 0xe7, 0x8a, 0xc8, 0x2b, 0xd7, - 0x6d, 0x65, 0x5b, 0x7d, 0xa8, 0xab, 0x88, 0x57, 0x7c, 0x68, 0x82, 0xe4, 0x42, 0x4b, 0x96, 0xba, - 0x26, 0x69, 0x5a, 0x8f, 0xa1, 0xb1, 0x13, 0x13, 0x7d, 0xea, 0x0c, 0x74, 0x16, 0x83, 0xd1, 0xb7, - 0xb6, 0x80, 0x35, 0x80, 0xa2, 0xd0, 0x03, 0x3d, 0x81, 0xd2, 0x14, 0x33, 0xe7, 0x52, 0xcd, 0x60, - 0x4b, 0x0d, 0x93, 0x7c, 0x97, 0xd6, 0x0f, 0xbb, 0x36, 0x09, 0xfc, 0x90, 0x3a, 0x64, 0xbc, 0xc2, - 0x5e, 0x60, 0xc7, 0x7c, 0x6b, 0x08, 0xe5, 0x97, 0x61, 0xa0, 0xb6, 0xf6, 0x02, 0x8a, 0xc2, 0x13, - 0x6d, 0xfb, 0x47, 0xf3, 0x48, 0xb6, 0x55, 0x83, 0x8a, 0xcc, 0x22, 0xeb, 0xb6, 0xfe, 0xd2, 0xe0, - 0x98, 0x03, 0xe2, 0x16, 0xe2, 0xdc, 0x8f, 0xe0, 0x88, 0xca, 0xa3, 0x2c, 0xb3, 0x32, 0x68, 0xf0, - 0x9d, 0xff, 0xe7, 0x5d, 0xab, 0xfa, 0x92, 0x12, 0xbc, 0x58, 0xf8, 0x8e, 0xbc, 0x4b, 0xcd, 0x56, - 0x44, 0x74, 0x5f, 0x6d, 0xd7, 0x81, 0x08, 0xb9, 0xb7, 0x37, 0x44, 0xad, 0xd5, 0xe7, 0x90, 0x77, - 0x67, 0xfc, 0x92, 0x3f, 0xc0, 0xe5, 0x0c, 0x74, 0x01, 0x10, 0x08, 0xd1, 0x87, 0x98, 0x61, 0xa3, - 0xf0, 0x21, 0x7e, 0x82, 0x68, 0x9d, 0x03, 0x44, 0x4f, 0x24, 0x1f, 0xaf, 0x7a, 0x6a, 0xf5, 0x2b, - 0x71, 0x15, 0xfd, 0x5f, 0x34, 0x38, 0xe4, 0xed, 0x13, 0x8a, 0x2e, 0xa0, 0xc0, 0x4f, 0xe8, 0x54, - 0x29, 0x99, 0x90, 0xdb, 0xbc, 0x97, 0x41, 0x23, 0xf9, 0x72, 0xe8, 0x1b, 0xd0, 0x95, 0x7e, 0xe8, - 0x93, 0x14, 0x2b, 0xa9, 0xe9, 0x7b, 0x13, 0xf4, 0xff, 0x38, 0x80, 0xd2, 0x8f, 0x21, 0xa1, 0x2e, - 0xa1, 0xe8, 0x7b, 0xa8, 0x7e, 0xeb, 0x7a, 0x33, 0xf5, 0xb6, 0x27, 0x12, 0x66, 0xff, 0x77, 0x4c, - 0x73, 0x9f, 0x4b, 0x95, 0xf5, 0x35, 0x1c, 0xca, 0x51, 0x45, 0xf5, 0xfd, 0x0f, 0xba, 0xd9, 0xd8, - 0xc1, 0x55, 0xf0, 0x77, 0x00, 0xdb, 0x6d, 0x42, 0x66, 0x86, 0x98, 0xd8, 0x3b, 0xf3, 0xd3, 0xbd, - 0x3e, 0x95, 0xe8, 0x15, 0xdc, 0xc9, 0x2c, 0x0c, 0x6a, 0xed, 0x46, 0xa4, 0xd6, 0xcf, 0x6c, 0xbf, - 0x9f, 0x10, 0xe7, 0x1d, 0x18, 0x6f, 0x6e, 0x9a, 0xda, 0xdb, 0x9b, 0xa6, 0xf6, 0xef, 0x4d, 0x53, - 0xfb, 0xfd, 0xb6, 0x99, 0x7b, 0x7b, 0xdb, 0xcc, 0xfd, 0x7d, 0xdb, 0xcc, 0x4d, 0x0f, 0xc5, 0x3f, - 0xf5, 0xa3, 0xff, 0x03, 0x00, 0x00, 0xff, 0xff, 0x73, 0x50, 0xfc, 0xa3, 0x12, 0x08, 0x00, 0x00, + // 890 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x84, 0x55, 0x41, 0x8f, 0xda, 0x46, + 0x14, 0xc6, 0x0b, 0x2c, 0xeb, 0xb7, 0xcb, 0x66, 0x77, 0xb2, 0x01, 0xd7, 0x5d, 0x01, 0xb2, 0x56, + 0x2d, 0x87, 0x06, 0x12, 0xd2, 0x55, 0x9a, 0xf4, 0x50, 0x09, 0x91, 0xb6, 0x91, 0x4a, 0x94, 0x1a, + 0x9a, 0xfb, 0x60, 0xa6, 0xac, 0x05, 0xd8, 0xce, 0x78, 0x8c, 0x96, 0x5b, 0x4f, 0x3d, 0xf7, 0xaf, + 0xe4, 0x5f, 0xe4, 0x52, 0x29, 0xa7, 0xaa, 0xea, 0x21, 0xaa, 0x76, 0xa5, 0xfe, 0x8e, 0x6a, 0x66, + 0xec, 0xc1, 0x36, 0x24, 0x39, 0xe1, 0xf7, 0xbd, 0xef, 0x3d, 0xcf, 0xfb, 0xfc, 0xbd, 0x01, 0xea, + 0xc1, 0x7c, 0xd6, 0x65, 0x64, 0x19, 0xf8, 0xc1, 0x44, 0xfe, 0x76, 0x02, 0xea, 0x33, 0x1f, 0x55, + 0x62, 0xd0, 0x3c, 0x63, 0x14, 0x3b, 0xa4, 0xbb, 0x7a, 0xd8, 0x15, 0x0f, 0x32, 0x6d, 0xde, 0x9f, + 0xb9, 0xec, 0x2a, 0x9a, 0x74, 0x1c, 0x7f, 0xd9, 0x9d, 0xf9, 0x33, 0xbf, 0x2b, 0xe0, 0x49, 0xf4, + 0xab, 0x88, 0x44, 0x20, 0x9e, 0x24, 0xdd, 0xfa, 0x5d, 0x83, 0x93, 0x31, 0x2f, 0xef, 0xaf, 0x9f, + 0x0f, 0x6c, 0xf2, 0x3a, 0x22, 0x21, 0x43, 0x06, 0x54, 0x44, 0xcb, 0xe7, 0x03, 0x43, 0x6b, 0x69, + 0xed, 0x23, 0x3b, 0x09, 0x51, 0x03, 0x60, 0xb2, 0xf0, 0x9d, 0xf9, 0x88, 0x61, 0xca, 0x8c, 0xbd, + 0x96, 0xd6, 0xd6, 0xed, 0x14, 0x82, 0x4c, 0x38, 0x10, 0xd1, 0x33, 0x6f, 0x6a, 0x14, 0x45, 0x56, + 0xc5, 0xe8, 0x1c, 0xf4, 0xd7, 0x11, 0xa1, 0xeb, 0xa1, 0x3f, 0x25, 0x46, 0x59, 0x24, 0x37, 0x80, + 0xf5, 0x04, 0x4e, 0x53, 0xe7, 0x08, 0x03, 0xdf, 0x0b, 0x09, 0xba, 0x80, 0xb2, 0x78, 0xb3, 0x38, + 0xc6, 0x61, 0xef, 0xb8, 0x13, 0xcf, 0xde, 0x11, 0x54, 0x5b, 0x26, 0xad, 0xff, 0x34, 0xa8, 0x8e, + 0x08, 0xa6, 0xce, 0x55, 0x32, 0xc0, 0x53, 0x28, 0x8d, 0xf1, 0x2c, 0x34, 0xb4, 0x56, 0xb1, 0x7d, + 0xd8, 0x6b, 0xa9, 0xb2, 0x0c, 0xab, 0xc3, 0x29, 0xcf, 0x3c, 0x46, 0xd7, 0xfd, 0xd2, 0xdb, 0xf7, + 0xcd, 0x82, 0x2d, 0x6a, 0xd0, 0x05, 0x54, 0x87, 0xae, 0x37, 0x88, 0x28, 0x66, 0xae, 0xef, 0x0d, + 0x43, 0x31, 0x65, 0xd5, 0xce, 0x82, 0x82, 0x85, 0xaf, 0x53, 0xac, 0x62, 0xcc, 0x4a, 0x83, 0xe8, + 0x0c, 0xca, 0x3f, 0xb9, 0x4b, 0x97, 0x19, 0x25, 0x91, 0x95, 0x81, 0xf9, 0x18, 0x74, 0xf5, 0x6a, + 0x74, 0x02, 0xc5, 0x39, 0x59, 0x8b, 0x01, 0x75, 0x9b, 0x3f, 0xf2, 0xa2, 0x15, 0x5e, 0x44, 0x24, + 0x96, 0x57, 0x06, 0x4f, 0xf7, 0xbe, 0xd1, 0xac, 0x6b, 0x38, 0x4e, 0x26, 0x88, 0x05, 0xfa, 0x1a, + 0xf6, 0x85, 0x06, 0xc9, 0xa8, 0xe7, 0x59, 0x85, 0x24, 0x7b, 0x48, 0x18, 0x9e, 0x62, 0x86, 0xed, + 0x98, 0x8b, 0x1e, 0x40, 0x65, 0x49, 0x18, 0x75, 0x1d, 0x39, 0xdc, 0x61, 0xaf, 0x96, 0x53, 0x68, + 0x28, 0xb3, 0x76, 0x42, 0xb3, 0xfe, 0xd4, 0xe0, 0xee, 0x8e, 0x8e, 0x79, 0xa7, 0xe8, 0x1b, 0xa7, + 0xb4, 0xe1, 0x0e, 0xf5, 0x7d, 0x36, 0x22, 0x74, 0xe5, 0x3a, 0xe4, 0x05, 0x5e, 0x26, 0xf3, 0xe4, + 0x61, 0x2e, 0x25, 0x87, 0x44, 0x7b, 0xc1, 0x93, 0xc6, 0xc9, 0x82, 0xe8, 0x2b, 0x38, 0x0d, 0xb9, + 0xc5, 0xc6, 0xee, 0x92, 0xfc, 0xe2, 0xb9, 0xd7, 0x2f, 0xb0, 0xe7, 0x0b, 0x59, 0x4b, 0xf6, 0x76, + 0x82, 0xfb, 0x74, 0xba, 0xf9, 0x36, 0x65, 0xa1, 0x7e, 0x0a, 0xb1, 0xde, 0x28, 0xcb, 0xc4, 0xa3, + 0xf2, 0xf3, 0xba, 0x5e, 0x18, 0x10, 0x87, 0x91, 0xe9, 0x38, 0x91, 0x94, 0x97, 0xe5, 0x61, 0xf4, + 0x05, 0x1c, 0x2b, 0xa8, 0xbf, 0x66, 0x44, 0x8a, 0x58, 0xb2, 0x73, 0x68, 0xa6, 0x63, 0x9f, 0x2f, + 0x41, 0x62, 0x92, 0x3c, 0xcc, 0x15, 0x08, 0xe7, 0x6e, 0x10, 0x28, 0x9e, 0xb4, 0x4b, 0x16, 0xb4, + 0xee, 0xc2, 0xa9, 0x3c, 0x32, 0x37, 0x4f, 0xec, 0x61, 0xeb, 0x01, 0xa0, 0x34, 0x18, 0xdb, 0xc2, + 0x84, 0x03, 0x86, 0x67, 0x5c, 0x37, 0x69, 0x0c, 0xdd, 0x56, 0xb1, 0xd5, 0x83, 0x9a, 0xaa, 0x78, + 0xc5, 0xad, 0x15, 0xa6, 0xd7, 0x5e, 0xb2, 0xd4, 0xc7, 0x94, 0xa1, 0xf5, 0x18, 0xea, 0x5b, 0x35, + 0xf1, 0xab, 0xce, 0x41, 0x67, 0x09, 0x18, 0xbf, 0x6b, 0x03, 0x58, 0x7d, 0x28, 0x0b, 0xd5, 0xd0, + 0x13, 0xa8, 0x4c, 0x30, 0x73, 0xae, 0x94, 0x53, 0x9b, 0xca, 0x72, 0xf2, 0xf6, 0x5a, 0x3d, 0xec, + 0xd8, 0x24, 0xf4, 0x23, 0xea, 0x90, 0x51, 0x80, 0xbd, 0xd0, 0x4e, 0xf8, 0xd6, 0x00, 0x0e, 0x5f, + 0x46, 0xa1, 0xda, 0xed, 0x4b, 0x28, 0x8b, 0x4c, 0x7c, 0x27, 0x7c, 0xb2, 0x8f, 0x64, 0x5b, 0xc7, + 0x70, 0x24, 0xbb, 0xc8, 0x73, 0x5b, 0x7f, 0x69, 0x70, 0xc2, 0x01, 0xf1, 0xad, 0x92, 0xde, 0x8f, + 0xe0, 0x80, 0xca, 0x47, 0x79, 0xcc, 0xa3, 0x7e, 0x9d, 0xdf, 0x0c, 0xff, 0xbc, 0x6f, 0x56, 0x5f, + 0x52, 0x82, 0x17, 0x0b, 0xdf, 0x91, 0x5f, 0x5c, 0xb3, 0x15, 0x11, 0xdd, 0x57, 0x3b, 0xb8, 0x27, + 0x4a, 0xee, 0xed, 0x2c, 0x51, 0xcb, 0xf7, 0x25, 0x14, 0xdd, 0x29, 0xb7, 0xc2, 0x47, 0xb8, 0x9c, + 0x81, 0x2e, 0x01, 0x42, 0x21, 0xfa, 0x00, 0x33, 0x6c, 0x94, 0x3e, 0xc6, 0x4f, 0x11, 0xad, 0x0b, + 0x80, 0xf8, 0x22, 0xe5, 0x26, 0xac, 0x65, 0x2e, 0x88, 0xa3, 0xe4, 0x14, 0xbd, 0xdf, 0x34, 0xd8, + 0xe7, 0xe3, 0x13, 0x8a, 0x2e, 0xa1, 0xc4, 0x9f, 0xd0, 0x99, 0x52, 0x32, 0x25, 0xb7, 0x79, 0x2f, + 0x87, 0xc6, 0xf2, 0x15, 0xd0, 0x77, 0xa0, 0x2b, 0xfd, 0xd0, 0x67, 0x19, 0x56, 0x5a, 0xd3, 0x0f, + 0x36, 0xe8, 0xbd, 0xd9, 0x83, 0xca, 0xcf, 0x11, 0xa1, 0x2e, 0xa1, 0xe8, 0x47, 0xa8, 0x7e, 0xef, + 0x7a, 0x53, 0xf5, 0x0f, 0x90, 0x6a, 0x98, 0xff, 0x77, 0x32, 0xcd, 0x5d, 0x29, 0x75, 0xac, 0x6f, + 0x61, 0x5f, 0x5a, 0x15, 0xd5, 0x76, 0x5f, 0xfb, 0x66, 0x7d, 0x0b, 0x57, 0xc5, 0x3f, 0x00, 0x6c, + 0xb6, 0x09, 0x99, 0x39, 0x62, 0x6a, 0xef, 0xcc, 0xcf, 0x77, 0xe6, 0x54, 0xa3, 0x57, 0x70, 0x27, + 0xb7, 0x30, 0xa8, 0xb9, 0x5d, 0x91, 0x59, 0x3f, 0xb3, 0xf5, 0x61, 0x42, 0xd2, 0xb7, 0x6f, 0xbc, + 0xbd, 0x69, 0x68, 0xef, 0x6e, 0x1a, 0xda, 0xbf, 0x37, 0x0d, 0xed, 0x8f, 0xdb, 0x46, 0xe1, 0xdd, + 0x6d, 0xa3, 0xf0, 0xf7, 0x6d, 0xa3, 0x30, 0xd9, 0x17, 0xff, 0xe7, 0x8f, 0xfe, 0x0f, 0x00, 0x00, + 0xff, 0xff, 0xcf, 0x63, 0xe2, 0x49, 0x38, 0x08, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -1422,6 +1431,11 @@ func (m *SearchMetrics) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.SkippedBlocks != 0 { + i = encodeVarintTempo(dAtA, i, uint64(m.SkippedBlocks)) + i-- + dAtA[i] = 0x20 + } if m.InspectedBlocks != 0 { i = encodeVarintTempo(dAtA, i, uint64(m.InspectedBlocks)) i-- @@ -1899,6 +1913,9 @@ func (m *SearchMetrics) Size() (n int) { if m.InspectedBlocks != 0 { n += 1 + sovTempo(uint64(m.InspectedBlocks)) } + if m.SkippedBlocks != 0 { + n += 1 + sovTempo(uint64(m.SkippedBlocks)) + } return n } @@ -2935,6 +2952,25 @@ func (m *SearchMetrics) Unmarshal(dAtA []byte) error { break } } + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field SkippedBlocks", wireType) + } + m.SkippedBlocks = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTempo + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.SkippedBlocks |= uint32(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipTempo(dAtA[iNdEx:]) diff --git a/pkg/tempopb/tempo.proto b/pkg/tempopb/tempo.proto index 6fd1025c698..a72fb93c254 100644 --- a/pkg/tempopb/tempo.proto +++ b/pkg/tempopb/tempo.proto @@ -54,6 +54,7 @@ message SearchMetrics { uint32 inspectedTraces = 1; uint64 inspectedBytes = 2; uint32 inspectedBlocks = 3; + uint32 skippedBlocks = 4; } message SearchTagsRequest { @@ -93,18 +94,8 @@ message PushBytesRequest { repeated bytes ids = 3 [(gogoproto.nullable) = false, (gogoproto.customtype) = "PreallocBytes"]; // search data, length must match traces repeated bytes searchData = 4 [(gogoproto.nullable) = false, (gogoproto.customtype) = "PreallocBytes"]; - //repeated SearchData searchData = 4 [(gogoproto.nullable) = false]; } -/*message SearchData { - map RootData = 1 [(gogoproto.nullable) = false]; -}*/ - - -/*message TraceHeader { - string rootSpanName = 1; - map rootSpanTags = 2; -}*/ message TraceBytes { // pre-marshalled Traces diff --git a/tempodb/search/backend_search_block.go b/tempodb/search/backend_search_block.go index 066bbe983db..8f5705d5a5f 100644 --- a/tempodb/search/backend_search_block.go +++ b/tempodb/search/backend_search_block.go @@ -4,11 +4,11 @@ import ( "bytes" "context" "io" - "os" "github.com/google/uuid" "github.com/pkg/errors" + tempo_io "github.com/grafana/tempo/pkg/io" "github.com/grafana/tempo/pkg/tempofb" "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/backend/local" @@ -26,78 +26,6 @@ type BackendSearchBlock struct { l *local.Backend } -//nolint:golint -type SearchDataCombiner struct{} - -func (*SearchDataCombiner) Combine(_ string, searchData ...[]byte) ([]byte, bool) { - - if len(searchData) <= 0 { - return nil, false - } - - if len(searchData) == 1 { - return searchData[0], false - } - - // Squash all datas into 1 - data := tempofb.SearchEntryMutable{} - kv := &tempofb.KeyValues{} // buffer - for _, sb := range searchData { - sd := tempofb.SearchEntryFromBytes(sb) - for i := 0; i < sd.TagsLength(); i++ { - sd.Tags(kv, i) - for j := 0; j < kv.ValueLength(); j++ { - data.AddTag(string(kv.Key()), string(kv.Value(j))) - } - } - data.SetStartTimeUnixNano(sd.StartTimeUnixNano()) - data.SetEndTimeUnixNano(sd.EndTimeUnixNano()) - data.TraceID = sd.Id() - } - - return data.ToBytes(), true -} - -var _ common.ObjectCombiner = (*SearchDataCombiner)(nil) - -//nolint:golint -type SearchDataIterator struct { - currentIndex int - records []common.Record - file *os.File - - buffer []byte -} - -func (s *SearchDataIterator) Next(_ context.Context) (common.ID, []byte, error) { - if s.currentIndex >= len(s.records) { - return nil, nil, io.EOF - } - - currentRecord := s.records[s.currentIndex] - - // resize/extend buffer - if cap(s.buffer) < int(currentRecord.Length) { - s.buffer = make([]byte, currentRecord.Length) - } - s.buffer = s.buffer[:currentRecord.Length] - - _, err := s.file.ReadAt(s.buffer, int64(currentRecord.Start)) - if err != nil { - return nil, nil, errors.Wrap(err, "error reading search file") - } - - s.currentIndex++ - - return currentRecord.ID, s.buffer, nil -} - -func (*SearchDataIterator) Close() { - // file will be closed by StreamingSearchBlock -} - -var _ encoding.Iterator = (*SearchDataIterator)(nil) - // NewBackendSearchBlock iterates through the given WAL search data and writes it to the persistent backend // in a more efficient paged form. Multiple traces are written in the same page to make sure of the flatbuffer // CreateSharedString feature which dedupes strings across the entire buffer. @@ -117,26 +45,21 @@ func NewBackendSearchBlock(input *StreamingSearchBlock, l *local.Backend, blockI pageSizeBytes = defaultBackendSearchBlockPageSize } + header := tempofb.NewSearchBlockHeaderBuilder() + w, err := newBackendSearchBlockWriter(blockID, tenantID, l, version, enc) if err != nil { return err } + a := encoding.NewBufferedAppenderGeneric(w, pageSizeBytes) - // set up deduping iterator for streaming search block - combiner := &SearchDataCombiner{} - searchIterator := &SearchDataIterator{ - records: input.appender.Records(), - file: input.file, - } - iter, err := encoding.NewDedupingIterator(searchIterator, combiner, "") + iter, err := input.Iterator() if err != nil { - return errors.Wrap(err, "error creating deduping iterator") + return errors.Wrap(err, "error getting streaming search block iterator") } - a := encoding.NewBufferedAppenderGeneric(w, pageSizeBytes) // Copy records into the appender for { - // Read id, data, err := iter.Next(ctx) if err != nil && err != io.EOF { @@ -152,16 +75,18 @@ func NewBackendSearchBlock(input *StreamingSearchBlock, l *local.Backend, blockI } s := tempofb.SearchEntryFromBytes(data) + + header.AddEntry(s) + entry := &tempofb.SearchEntryMutable{ TraceID: id, StartTimeUnixNano: s.StartTimeUnixNano(), EndTimeUnixNano: s.EndTimeUnixNano(), } - l := s.TagsLength() - for i := 0; i < l; i++ { + for i, l := 0, s.TagsLength(); i < l; i++ { s.Tags(kv, i) - for j := 0; j < kv.ValueLength(); j++ { + for j, ll := 0, kv.ValueLength(); j < ll; j++ { entry.AddTag(string(kv.Key()), string(kv.Value(j))) } } @@ -189,6 +114,13 @@ func NewBackendSearchBlock(input *StreamingSearchBlock, l *local.Backend, blockI return err } + // Write header + hb := header.ToBytes() + err = l.Write(ctx, "search-header", backend.KeyPathForBlock(blockID, tenantID), bytes.NewReader(hb), int64(len(hb)), true) + if err != nil { + return err + } + // Write meta sm := &BlockMeta{ IndexPageSize: uint32(indexPageSize), @@ -216,8 +148,6 @@ func (s *BackendSearchBlock) Search(ctx context.Context, p Pipeline, sr *Results indexBuf := []common.Record{{}} entry := &tempofb.SearchEntry{} // Buffer - sr.AddBlockInspected() - meta, err := ReadSearchBlockMeta(ctx, s.l, s.id, s.tenantID) if err != nil { return err @@ -228,6 +158,29 @@ func (s *BackendSearchBlock) Search(ctx context.Context, p Pipeline, sr *Results return err } + // Read header + // Verify something in the block matches by checking the header + hbr, hbrlen, err := s.l.Read(ctx, "search-header", backend.KeyPathForBlock(s.id, s.tenantID), true) + if err != nil { + return err + } + + sr.bytesInspected.Add(uint64(hbrlen)) + + hb, err := tempo_io.ReadAllWithEstimate(hbr, hbrlen) + if err != nil { + return err + } + + header := tempofb.GetRootAsSearchBlockHeader(hb, 0) + if !p.MatchesBlock(header) { + // Block filtered out + sr.AddBlockSkipped() + return nil + } + + sr.AddBlockInspected() + // Read index bmeta := backend.NewBlockMeta(s.tenantID, s.id, meta.Version, meta.Encoding, "") cr := backend.NewContextReader(bmeta, "search-index", backend.NewReader(s.l), false) @@ -270,20 +223,19 @@ func (s *BackendSearchBlock) Search(ctx context.Context, p Pipeline, sr *Results sr.AddBytesInspected(uint64(len(dataBuf))) - batch := tempofb.GetRootAsSearchPage(dataBuf, 0) - - // Verify something in the batch matches - if !p.MatchesTags(batch) { + page := tempofb.GetRootAsSearchPage(dataBuf, 0) + if !p.MatchesPage(page) { + // Nothing in the page matches // Increment metric still - sr.AddTraceInspected(uint32(batch.EntriesLength())) + sr.AddTraceInspected(uint32(page.EntriesLength())) continue } - l := batch.EntriesLength() + l := page.EntriesLength() for j := 0; j < l; j++ { sr.AddTraceInspected(1) - batch.Entries(entry, j) + page.Entries(entry, j) if !p.Matches(entry) { continue diff --git a/tempodb/search/backend_search_block_test.go b/tempodb/search/backend_search_block_test.go index 30ce3b0e0c0..5322c6b5336 100644 --- a/tempodb/search/backend_search_block_test.go +++ b/tempodb/search/backend_search_block_test.go @@ -2,6 +2,7 @@ package search import ( "context" + "encoding/binary" "fmt" "os" "path" @@ -15,7 +16,6 @@ import ( "github.com/grafana/tempo/pkg/tempopb" "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/backend/local" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -28,15 +28,16 @@ func genSearchData(traceID []byte, i int) [][]byte { } func newBackendSearchBlockWithTraces(t testing.TB, traceCount int, enc backend.Encoding, pageSizeBytes int) *BackendSearchBlock { - id := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15} // 16-byte ids required - f, err := os.OpenFile(path.Join(t.TempDir(), "searchdata"), os.O_CREATE|os.O_RDWR, 0644) require.NoError(t, err) b1, err := NewStreamingSearchBlockForFile(f) require.NoError(t, err) + for i := 0; i < traceCount; i++ { - assert.NoError(t, b1.Append(context.Background(), id, genSearchData(id, i))) + id := make([]byte, 16) + binary.LittleEndian.PutUint32(id, uint32(i)) + require.NoError(t, b1.Append(context.Background(), id, genSearchData(id, i))) } l, err := local.NewBackend(&local.Config{ @@ -54,12 +55,12 @@ func newBackendSearchBlockWithTraces(t testing.TB, traceCount int, enc backend.E } func TestBackendSearchBlockSearch(t *testing.T) { - traceCount := 1_000 + traceCount := 50_000 b2 := newBackendSearchBlockWithTraces(t, traceCount, backend.EncNone, 0) p := NewSearchPipeline(&tempopb.SearchRequest{ - Tags: map[string]string{"key1": "value_A_1", "key20": "value_B_20"}, + Tags: map[string]string{"key20": "value_B_20"}, }) sr := NewResults() @@ -77,7 +78,7 @@ func TestBackendSearchBlockSearch(t *testing.T) { results = append(results, r) } require.Equal(t, 1, len(results)) - require.Equal(t, 1, int(sr.TracesInspected())) + require.Equal(t, traceCount, int(sr.TracesInspected())) } func BenchmarkBackendSearchBlockSearch(b *testing.B) { @@ -89,9 +90,9 @@ func BenchmarkBackendSearchBlockSearch(b *testing.B) { b2 := newBackendSearchBlockWithTraces(b, b.N, enc, int(sz*1024*1024)) - // Matches nothing, will perform an exhaustive search. + // Use secret tag to perform exhaustive search p := NewSearchPipeline(&tempopb.SearchRequest{ - Tags: map[string]string{"nomatch": "nomatch"}, + Tags: map[string]string{SecretExhaustiveSearchTag: "!"}, }) sr := NewResults() @@ -114,7 +115,6 @@ func BenchmarkBackendSearchBlockSearch(b *testing.B) { } wg.Wait() elapsed := time.Since(start) - fmt.Printf("BackendSearchBlock search throughput: %v elapsed %.2f MB = %.2f MiB/s \t %d traces = %.2fM traces/s \n", elapsed, float64(sr.bytesInspected.Load())/(1024*1024), diff --git a/tempodb/search/data_combiner.go b/tempodb/search/data_combiner.go new file mode 100644 index 00000000000..41387ed0b9d --- /dev/null +++ b/tempodb/search/data_combiner.go @@ -0,0 +1,42 @@ +package search + +import ( + "github.com/grafana/tempo/pkg/tempofb" + "github.com/grafana/tempo/tempodb/encoding/common" +) + +type DataCombiner struct{} + +var _ common.ObjectCombiner = (*DataCombiner)(nil) + +var staticCombiner = DataCombiner{} + +func (*DataCombiner) Combine(_ string, searchData ...[]byte) ([]byte, bool) { + + if len(searchData) <= 0 { + return nil, false + } + + if len(searchData) == 1 { + return searchData[0], false + } + + // Squash all datas into 1 + data := tempofb.SearchEntryMutable{} + kv := &tempofb.KeyValues{} // buffer + for _, sb := range searchData { + sd := tempofb.SearchEntryFromBytes(sb) + for i, ii := 0, sd.TagsLength(); i < ii; i++ { + sd.Tags(kv, i) + for j, jj := 0, kv.ValueLength(); j < jj; j++ { + data.AddTag(string(kv.Key()), string(kv.Value(j))) + } + } + + data.SetStartTimeUnixNano(sd.StartTimeUnixNano()) + data.SetEndTimeUnixNano(sd.EndTimeUnixNano()) + data.TraceID = sd.Id() + } + + return data.ToBytes(), true +} diff --git a/tempodb/search/pipeline.go b/tempodb/search/pipeline.go index f3b76d208c0..b7c946dc847 100644 --- a/tempodb/search/pipeline.go +++ b/tempodb/search/pipeline.go @@ -8,11 +8,15 @@ import ( "github.com/grafana/tempo/pkg/tempopb" ) -type tracefilter func(header *tempofb.SearchEntry) (matches bool) -type tagfilter func(c tempofb.TagContainer) (matches bool) +const SecretExhaustiveSearchTag = "x-dbg-exhaustive" + +type tracefilter func(entry *tempofb.SearchEntry) (matches bool) +type tagfilter func(page tempofb.TagContainer) (matches bool) +type blockfilter func(header *tempofb.SearchBlockHeader) (matches bool) type Pipeline struct { - tagfilters []tagfilter + blockfilters []blockfilter + tagfilters []tagfilter // shared by pages and traces tracefilters []tracefilter } @@ -20,16 +24,32 @@ func NewSearchPipeline(req *tempopb.SearchRequest) Pipeline { p := Pipeline{} if req.MinDurationMs > 0 { - minDuration := uint64(time.Duration(req.MinDurationMs) * time.Millisecond) + minDurationNanos := uint64(time.Duration(req.MinDurationMs) * time.Millisecond) + p.tracefilters = append(p.tracefilters, func(s *tempofb.SearchEntry) bool { - return (s.EndTimeUnixNano() - s.StartTimeUnixNano()) >= minDuration + et := s.EndTimeUnixNano() + st := s.StartTimeUnixNano() + return (et - st) >= minDurationNanos + }) + + p.blockfilters = append(p.blockfilters, func(s *tempofb.SearchBlockHeader) bool { + max := s.MaxDurationNanos() + return max >= minDurationNanos }) } if req.MaxDurationMs > 0 { - maxDuration := uint64(time.Duration(req.MaxDurationMs) * time.Millisecond) + maxDurationNanos := uint64(time.Duration(req.MaxDurationMs) * time.Millisecond) + p.tracefilters = append(p.tracefilters, func(s *tempofb.SearchEntry) bool { - return (s.EndTimeUnixNano() - s.StartTimeUnixNano()) <= maxDuration + et := s.EndTimeUnixNano() + st := s.StartTimeUnixNano() + return (et - st) <= maxDurationNanos + }) + + p.blockfilters = append(p.blockfilters, func(s *tempofb.SearchBlockHeader) bool { + min := s.MinDurationNanos() + return min <= maxDurationNanos }) } @@ -39,6 +59,17 @@ func NewSearchPipeline(req *tempopb.SearchRequest) Pipeline { vb := make([][]byte, 0, len(req.Tags)) for k, v := range req.Tags { + if k == SecretExhaustiveSearchTag { + // Perform an exhaustive search by: + // * no block or page filters means all blocks and pages match + // * substitute this trace filter instead rejects everything. therefore it never + // quits early due to enough results + p.tracefilters = append(p.tracefilters, func(s *tempofb.SearchEntry) bool { + return false + }) + continue + } + kb = append(kb, []byte(strings.ToLower(k))) vb = append(vb, []byte(strings.ToLower(v))) } @@ -60,16 +91,16 @@ func NewSearchPipeline(req *tempopb.SearchRequest) Pipeline { return p } -func (p *Pipeline) Matches(header *tempofb.SearchEntry) bool { +func (p *Pipeline) Matches(e *tempofb.SearchEntry) bool { for _, f := range p.tracefilters { - if !f(header) { + if !f(e) { return false } } for _, f := range p.tagfilters { - if !f(header) { + if !f(e) { return false } } @@ -77,10 +108,26 @@ func (p *Pipeline) Matches(header *tempofb.SearchEntry) bool { return true } -func (p *Pipeline) MatchesTags(c tempofb.TagContainer) bool { +// nolint:interfacer +func (p *Pipeline) MatchesPage(pg *tempofb.SearchPage) bool { + for _, f := range p.tagfilters { + if !f(pg) { + return false + } + } + + return true +} + +func (p *Pipeline) MatchesBlock(block *tempofb.SearchBlockHeader) bool { + for _, f := range p.blockfilters { + if !f(block) { + return false + } + } for _, f := range p.tagfilters { - if !f(c) { + if !f(block) { return false } } diff --git a/tempodb/search/pipeline_test.go b/tempodb/search/pipeline_test.go index 0348bf93200..8a25a2609a6 100644 --- a/tempodb/search/pipeline_test.go +++ b/tempodb/search/pipeline_test.go @@ -63,7 +63,7 @@ func TestPipelineMatchesTags(t *testing.T) { } } -func TestPipelineMatchesDuratiob(t *testing.T) { +func TestPipelineMatchesTraceDuration(t *testing.T) { testCases := []struct { name string @@ -133,6 +133,56 @@ func TestPipelineMatchesDuratiob(t *testing.T) { } } +func TestPipelineMatchesBlock(t *testing.T) { + + // Run all tests against this header + commonBlock := tempofb.NewSearchBlockHeaderBuilder() + commonBlock.AddTag("tag", "value") + commonBlock.MinDur = uint64(1 * time.Second) + commonBlock.MaxDur = uint64(10 * time.Second) + header := tempofb.GetRootAsSearchBlockHeader(commonBlock.ToBytes(), 0) + + testCases := []struct { + name string + request tempopb.SearchRequest + shouldMatch bool + }{ + { + name: "no filters", + request: tempopb.SearchRequest{}, + shouldMatch: true, + }, + { + name: "matches all", + request: tempopb.SearchRequest{Tags: map[string]string{"tag": "value"}, MinDurationMs: 5000, MaxDurationMs: 6000}, + shouldMatch: true, + }, + { + name: "no matching tag", + request: tempopb.SearchRequest{Tags: map[string]string{"nomatch": "value"}}, + shouldMatch: false, + }, + { + name: "no matching min duration", + request: tempopb.SearchRequest{MinDurationMs: 20000}, // Above max duration in block + shouldMatch: false, + }, + { + name: "no matching max duration", + request: tempopb.SearchRequest{MaxDurationMs: 500}, // Below smallest duration in block + shouldMatch: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + p := NewSearchPipeline(&tc.request) + matches := p.MatchesBlock(header) + require.Equal(t, tc.shouldMatch, matches) + }) + } +} + func BenchmarkPipelineMatches(b *testing.B) { entry := tempofb.SearchEntryFromBytes((&tempofb.SearchEntryMutable{ diff --git a/tempodb/search/results.go b/tempodb/search/results.go index bc7a2c19688..5d56aa79c87 100644 --- a/tempodb/search/results.go +++ b/tempodb/search/results.go @@ -20,6 +20,7 @@ type Results struct { tracesInspected atomic.Uint32 bytesInspected atomic.Uint64 blocksInspected atomic.Uint32 + blocksSkipped atomic.Uint32 } func NewResults() *Results { @@ -125,3 +126,11 @@ func (sr *Results) AddBlockInspected() { func (sr *Results) BlocksInspected() uint32 { return sr.blocksInspected.Load() } + +func (sr *Results) AddBlockSkipped() { + sr.blocksSkipped.Inc() +} + +func (sr *Results) BlocksSkipped() uint32 { + return sr.blocksSkipped.Load() +} diff --git a/tempodb/search/streaming_search_block.go b/tempodb/search/streaming_search_block.go index c659e2d88af..22e364dee6e 100644 --- a/tempodb/search/streaming_search_block.go +++ b/tempodb/search/streaming_search_block.go @@ -2,11 +2,13 @@ package search import ( "context" + "io" "os" "github.com/grafana/tempo/pkg/tempofb" "github.com/grafana/tempo/tempodb/encoding" "github.com/grafana/tempo/tempodb/encoding/common" + "github.com/pkg/errors" ) var _ SearchableBlock = (*StreamingSearchBlock)(nil) @@ -67,7 +69,7 @@ func NewStreamingSearchBlockForFile(f *os.File) (*StreamingSearchBlock, error) { // Append the given search data to the streaming block. Multiple byte buffers of search data for // the same trace can be passed and are merged into one entry. func (s *StreamingSearchBlock) Append(ctx context.Context, id common.ID, searchData [][]byte) error { - combined, _ := (&SearchDataCombiner{}).Combine("", searchData...) + combined, _ := staticCombiner.Combine("", searchData...) return s.appender.Append(id, combined) } @@ -120,3 +122,47 @@ func (s *StreamingSearchBlock) Search(ctx context.Context, p Pipeline, sr *Resul return nil } + +func (s *StreamingSearchBlock) Iterator() (encoding.Iterator, error) { + iter := &streamingSearchBlockIterator{ + records: s.appender.Records(), + file: s.file, + } + + combiner := &DataCombiner{} + + // Streaming (wal) blocks have to be deduped. + return encoding.NewDedupingIterator(iter, combiner, "") +} + +type streamingSearchBlockIterator struct { + currentIndex int + records []common.Record + file *os.File +} + +var _ encoding.Iterator = (*streamingSearchBlockIterator)(nil) + +func (s *streamingSearchBlockIterator) Next(_ context.Context) (common.ID, []byte, error) { + if s.currentIndex >= len(s.records) { + return nil, nil, io.EOF + } + + currentRecord := s.records[s.currentIndex] + + // Use unique buffer that can be returned to the caller. + // This is primarily for DedupingIterator which uses 2 buffers at once. + buffer := make([]byte, currentRecord.Length) + _, err := s.file.ReadAt(buffer, int64(currentRecord.Start)) + if err != nil { + return nil, nil, errors.Wrap(err, "error reading search file") + } + + s.currentIndex++ + + return currentRecord.ID, buffer, nil +} + +func (*streamingSearchBlockIterator) Close() { + // file will be closed by StreamingSearchBlock +}