forked from grafana/tempo
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstorage.go
More file actions
297 lines (245 loc) · 9.63 KB
/
storage.go
File metadata and controls
297 lines (245 loc) · 9.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
package traceql
import (
"context"
)
type Operands []Static
type Condition struct {
Attribute Attribute
Op Operator
Operands Operands
}
func SearchMetaConditions() []Condition {
return []Condition{
{NewIntrinsic(IntrinsicTraceRootService), OpNone, nil},
{NewIntrinsic(IntrinsicTraceRootSpan), OpNone, nil},
{NewIntrinsic(IntrinsicTraceDuration), OpNone, nil},
{NewIntrinsic(IntrinsicTraceID), OpNone, nil},
{NewIntrinsic(IntrinsicTraceStartTime), OpNone, nil},
{NewIntrinsic(IntrinsicSpanID), OpNone, nil},
{NewIntrinsic(IntrinsicSpanStartTime), OpNone, nil},
{NewIntrinsic(IntrinsicDuration), OpNone, nil},
{NewIntrinsic(IntrinsicServiceStats), OpNone, nil},
}
}
func SearchMetaConditionsWithout(remove []Condition, allConditions bool) []Condition {
metaConds := SearchMetaConditions()
retConds := make([]Condition, 0, len(metaConds))
for _, c := range metaConds {
// if we can't find c in the remove conditions then add it to retConds
found := false
for _, r := range remove {
if r.Attribute == c.Attribute {
// We can reuse the existing condition of a metadata field in two cases:
// (1) OpNone, since it has no filtering it will always return a value, there is no need to read it again.
// (2) AllConditions - No matter the operation, it will return a value for all results.
// If neither of those apply then we have to select
// the metadata field again without filtering.
if r.Op == OpNone || allConditions {
found = true
break
}
}
}
if !found {
retConds = append(retConds, c)
}
}
return retConds
}
// SecondPassFn is a method that is called in between the first and second
// pass of a fetch spans request. See below.
type SecondPassFn func(*Spanset) ([]*Spanset, error)
type FetchSpansRequest struct {
StartTimeUnixNanos uint64
EndTimeUnixNanos uint64
Conditions []Condition
// mdisibio - Better to push trace by ID filtering into Conditions with a new between op?
ShardID uint32
ShardCount uint32
// Hints
// By default the storage layer fetches spans meeting any of the criteria.
// This hint is for common cases like { x && y && z } where the storage layer
// can make extra optimizations by returning only spansets that meet
// all criteria.
AllConditions bool
// SecondPassFn and Conditions allow a caller to retrieve one set of data
// in the first pass, filter using the SecondPassFn callback and then
// request a different set of data in the second pass. This is particularly
// useful for retrieving data required to resolve a TraceQL query in the first
// pass and only selecting metadata in the second pass.
// TODO: extend this to an arbitrary number of passes
SecondPass SecondPassFn
SecondPassConditions []Condition
SecondPassSelectAll bool // Ignore second pass conditions and select all attributes
}
func (f *FetchSpansRequest) appendCondition(c ...Condition) {
f.Conditions = append(f.Conditions, c...)
}
func (f *FetchSpansRequest) HasAttribute(a Attribute) bool {
for _, cc := range f.Conditions {
if cc.Attribute == a {
return true
}
}
for _, cc := range f.SecondPassConditions {
if cc.Attribute == a {
return true
}
}
return false
}
type Span interface {
// AttributeFor returns the attribute for the given key. If the attribute is not found then
// the second return value will be false.
AttributeFor(Attribute) (Static, bool)
// AllAttributes returns a map of all attributes for this span. AllAttributes should be used sparingly
// and is expected to be significantly slower than AttributeFor.
AllAttributes() map[Attribute]Static
// AllAttributesFunc is a way to access all attributes for this span, letting the span determine the
// optimal method. Avoids allocating a map like AllAttributes.
AllAttributesFunc(func(Attribute, Static))
ID() []byte
StartTimeUnixNanos() uint64
DurationNanos() uint64
// SiblingOf returns all spans on the RHS side that have siblings in the LHS. If falseForAll is true
// then the returned spans will be those that do not have siblings in the LHS. buffer is an optional
// buffer to use to avoid allocations.
SiblingOf(lhs []Span, rhs []Span, falseForAll bool, union bool, buffer []Span) []Span
// DescendantOf returns all spans on the RHS side that have descendants in the LHS. If falseForAll is true
// then the returned spans will be those that do not have descendants in the LHS. invert is used to invert
// the relationship. If invert is true then this will behave like "AncestorOf". buffer is an optional
// buffer to use to avoid allocations.
DescendantOf(lhs []Span, rhs []Span, falseForAll bool, invert bool, union bool, buffer []Span) []Span
// ChildOf returns all spans on the RHS side that have children in the LHS. If falseForAll is true
// then the returned spans will be those that do not have children in the LHS. invert is used to invert
// the relationship. If invert is true then this will behave like "ParentOf". buffer is an optional
// buffer to use to avoid allocations.
ChildOf(lhs []Span, rhs []Span, falseForAll bool, invert bool, union bool, buffer []Span) []Span
}
// should we just make matched a field on the spanset instead of a special attribute?
const attributeMatched = "__matched"
type SpansetAttribute struct {
Name string
Val Static
}
type ServiceStats struct {
SpanCount uint32
ErrorCount uint32
}
type Spanset struct {
// these fields are actually used by the engine to evaluate queries
Scalar Static
Spans []Span
TraceID []byte
RootSpanName string
RootServiceName string
StartTimeUnixNanos uint64
DurationNanos uint64
ServiceStats map[string]ServiceStats
Attributes []*SpansetAttribute
// Set this function to provide upstream callers with a method to
// release this spanset and all its spans when finished. This method will be
// called with the spanset itself as the argument. This is done for a worthwhile
// memory savings as the same function pointer can then be reused across spansets.
ReleaseFn func(*Spanset)
}
func (s *Spanset) AddAttribute(key string, value Static) {
s.Attributes = append(s.Attributes, &SpansetAttribute{Name: key, Val: value})
}
// Release the spanset and all its span. This is just a wrapper of ReleaseFn that
// performs nil checks.
func (s *Spanset) Release() {
if s.ReleaseFn != nil {
s.ReleaseFn(s)
}
}
func (s *Spanset) clone() *Spanset {
ss := *s
return &ss
}
type SpansetIterator interface {
Next(context.Context) (*Spanset, error)
Close()
}
type FetchSpansResponse struct {
Results SpansetIterator
// callback to get the size of data read during Fetch
Bytes func() uint64
}
type SpansetFetcher interface {
Fetch(context.Context, FetchSpansRequest) (FetchSpansResponse, error)
}
// FetchTagValuesCallback is called to collect unique tag values.
// Returns true if it has exceeded the maximum number of results.
type FetchTagValuesCallback func(static Static) bool
type FetchTagValuesRequest struct {
Conditions []Condition
TagName Attribute
// TODO: Add start and end time?
}
type TagValuesFetcher interface {
Fetch(context.Context, FetchTagValuesRequest, FetchTagValuesCallback) error
}
type TagValuesFetcherWrapper struct {
f func(context.Context, FetchTagValuesRequest, FetchTagValuesCallback) error
}
var _ TagValuesFetcher = (*TagValuesFetcherWrapper)(nil)
func NewTagValuesFetcherWrapper(f func(context.Context, FetchTagValuesRequest, FetchTagValuesCallback) error) TagValuesFetcher {
return TagValuesFetcherWrapper{f}
}
func (s TagValuesFetcherWrapper) Fetch(ctx context.Context, request FetchTagValuesRequest, callback FetchTagValuesCallback) error {
return s.f(ctx, request, callback)
}
// MustExtractFetchSpansRequestWithMetadata parses the given traceql query and returns
// the storage layer conditions. Panics if the query fails to parse.
func MustExtractFetchSpansRequestWithMetadata(query string) FetchSpansRequest {
c, err := ExtractFetchSpansRequest(query)
if err != nil {
panic(err)
}
c.SecondPass = func(s *Spanset) ([]*Spanset, error) { return []*Spanset{s}, nil }
c.SecondPassConditions = SearchMetaConditions()
return c
}
// ExtractFetchSpansRequest parses the given traceql query and returns
// the storage layer conditions. Returns an error if the query fails to parse.
func ExtractFetchSpansRequest(query string) (FetchSpansRequest, error) {
ast, err := Parse(query)
if err != nil {
return FetchSpansRequest{}, err
}
req := FetchSpansRequest{
AllConditions: true,
}
ast.Pipeline.extractConditions(&req)
return req, nil
}
type SpansetFetcherWrapper struct {
f func(ctx context.Context, req FetchSpansRequest) (FetchSpansResponse, error)
}
var _ = (SpansetFetcher)(&SpansetFetcherWrapper{})
func NewSpansetFetcherWrapper(f func(ctx context.Context, req FetchSpansRequest) (FetchSpansResponse, error)) SpansetFetcher {
return SpansetFetcherWrapper{f}
}
func (s SpansetFetcherWrapper) Fetch(ctx context.Context, request FetchSpansRequest) (FetchSpansResponse, error) {
return s.f(ctx, request)
}
type FetchTagsCallback func(tag string, scope AttributeScope) bool
type FetchTagsRequest struct {
Conditions []Condition
Scope AttributeScope
// TODO: Add start and end time?
}
type TagNamesFetcher interface {
Fetch(context.Context, FetchTagsRequest, FetchTagsCallback) error
}
type TagNamesFetcherWrapper struct {
f func(context.Context, FetchTagsRequest, FetchTagsCallback) error
}
var _ TagNamesFetcher = (*TagNamesFetcherWrapper)(nil)
func NewTagNamesFetcherWrapper(f func(context.Context, FetchTagsRequest, FetchTagsCallback) error) TagNamesFetcher {
return TagNamesFetcherWrapper{f}
}
func (s TagNamesFetcherWrapper) Fetch(ctx context.Context, request FetchTagsRequest, callback FetchTagsCallback) error {
return s.f(ctx, request, callback)
}