forked from grafana/tempo
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmetrics_query_range.go
More file actions
79 lines (69 loc) · 2.6 KB
/
metrics_query_range.go
File metadata and controls
79 lines (69 loc) · 2.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package combiner
import (
"sort"
"strings"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/traceql"
)
var _ GRPCCombiner[*tempopb.QueryRangeResponse] = (*genericCombiner[*tempopb.QueryRangeResponse])(nil)
// NewQueryRange returns a query range combiner.
func NewQueryRange(req *tempopb.QueryRangeRequest, trackDiffs bool) (Combiner, error) {
combiner, err := traceql.QueryRangeCombinerFor(req, traceql.AggregateModeFinal, trackDiffs)
if err != nil {
return nil, err
}
return &genericCombiner[*tempopb.QueryRangeResponse]{
httpStatusCode: 200,
new: func() *tempopb.QueryRangeResponse { return &tempopb.QueryRangeResponse{} },
current: &tempopb.QueryRangeResponse{Metrics: &tempopb.SearchMetrics{}},
combine: func(partial *tempopb.QueryRangeResponse, _ *tempopb.QueryRangeResponse, _ PipelineResponse) error {
if partial.Metrics != nil {
// this is a coordination between the sharder and combiner. the sharder returns one response with summary metrics
// only. the combiner correctly takes and accumulates that job. however, if the response has no jobs this is
// an indicator this is a "real" response so we set CompletedJobs to 1 to increment in the combiner.
if partial.Metrics.TotalJobs == 0 {
partial.Metrics.CompletedJobs = 1
}
}
combiner.Combine(partial)
return nil
},
finalize: func(_ *tempopb.QueryRangeResponse) (*tempopb.QueryRangeResponse, error) {
resp := combiner.Response()
if resp == nil {
resp = &tempopb.QueryRangeResponse{}
}
sortResponse(resp)
return resp, nil
},
diff: func(_ *tempopb.QueryRangeResponse) (*tempopb.QueryRangeResponse, error) {
resp := combiner.Diff()
if resp == nil {
resp = &tempopb.QueryRangeResponse{}
}
sortResponse(resp)
return resp, nil
},
}, nil
}
func NewTypedQueryRange(req *tempopb.QueryRangeRequest, trackDiffs bool) (GRPCCombiner[*tempopb.QueryRangeResponse], error) {
c, err := NewQueryRange(req, trackDiffs)
if err != nil {
return nil, err
}
return c.(GRPCCombiner[*tempopb.QueryRangeResponse]), nil
}
func sortResponse(res *tempopb.QueryRangeResponse) {
// Sort all output, series alphabetically, samples by time
sort.SliceStable(res.Series, func(i, j int) bool {
return strings.Compare(res.Series[i].PromLabels, res.Series[j].PromLabels) == -1
})
for _, series := range res.Series {
sort.Slice(series.Samples, func(i, j int) bool {
return series.Samples[i].TimestampMs < series.Samples[j].TimestampMs
})
sort.Slice(series.Exemplars, func(i, j int) bool {
return series.Exemplars[i].TimestampMs < series.Exemplars[j].TimestampMs
})
}
}