-
Notifications
You must be signed in to change notification settings - Fork 219
Expand file tree
/
Copy pathaggregate_operator.go
More file actions
135 lines (118 loc) · 3.4 KB
/
aggregate_operator.go
File metadata and controls
135 lines (118 loc) · 3.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
/*
* Radon
*
* Copyright 2018 The Radon Authors.
* Code is licensed under the GPLv3.
*
*/
package operator
import (
"sort"
"planner/builder"
"xcontext"
"github.com/xelabs/go-mysqlstack/sqlparser/depends/sqltypes"
"github.com/xelabs/go-mysqlstack/xlog"
)
var (
_ Operator = &AggregateOperator{}
)
// AggregateOperator represents aggregate operator.
// Including: COUNT/MAX/MIN/SUM/AVG/GROUPBY.
type AggregateOperator struct {
log *xlog.Log
plan builder.ChildPlan
}
// NewAggregateOperator creates new AggregateOperator.
func NewAggregateOperator(log *xlog.Log, plan builder.ChildPlan) *AggregateOperator {
return &AggregateOperator{
log: log,
plan: plan,
}
}
// Execute used to execute the operator.
func (operator *AggregateOperator) Execute(ctx *xcontext.ResultContext) error {
rs := ctx.Results
operator.aggregate(rs)
return nil
}
// Aggregate used to do rows-aggregator(COUNT/SUM/MIN/MAX/AVG) and grouped them into group-by fields.
// Don't use `group by` alone, `group by` needs to be used with the aggregation function. Otherwise
// the result of radon may be different from the result of mysql.
// eg: select a,b from tb group by b. ×
// select count(a),b from tb group by b. √
// select b from tb group by b. √
func (operator *AggregateOperator) aggregate(result *sqltypes.Result) {
var deIdxs []int
plan := operator.plan.(*builder.AggregatePlan)
if plan.Empty() {
return
}
aggPlans := plan.NormalAggregators()
aggPlansLen := len(aggPlans)
groupAggrs := plan.GroupAggregators()
if len(groupAggrs) > 0 {
sort.Slice(result.Rows, func(i, j int) bool {
for _, key := range groupAggrs {
cmp := sqltypes.NullsafeCompare(result.Rows[i][key.Index], result.Rows[j][key.Index])
if cmp == 0 {
continue
}
return cmp < 0
}
return true
})
}
type group struct {
row []sqltypes.Value
evalCtxs []*sqltypes.AggEvaluateContext
}
var aggrs []*sqltypes.Aggregation
for _, aggPlan := range aggPlans {
aggr := sqltypes.NewAggregation(aggPlan.Index, aggPlan.Type, aggPlan.Distinct, plan.IsPushDown)
aggr.FixField(result.Fields[aggPlan.Index])
aggrs = append(aggrs, aggr)
}
var groups []*group
for _, row := range result.Rows {
length := len(groups)
if length == 0 {
evalCtxs := sqltypes.NewAggEvalCtxs(aggrs, row)
groups = append(groups, &group{row, evalCtxs})
continue
}
equal := keysEqual(groups[length-1].row, row, groupAggrs)
if equal {
if aggPlansLen > 0 {
for i, aggr := range aggrs {
aggr.Update(row, groups[length-1].evalCtxs[i])
}
}
} else {
evalCtxs := sqltypes.NewAggEvalCtxs(aggrs, row)
groups = append(groups, &group{row, evalCtxs})
}
}
// Handle the avg operator and rebuild the results.
i := 0
result.Rows = make([][]sqltypes.Value, len(groups))
for _, g := range groups {
result.Rows[i], deIdxs = sqltypes.GetResults(aggrs, g.evalCtxs, g.row)
i++
}
if len(groups) == 0 && aggPlansLen > 0 {
result.Rows = make([][]sqltypes.Value, 1)
evalCtxs := sqltypes.NewAggEvalCtxs(aggrs, nil)
result.Rows[0], deIdxs = sqltypes.GetResults(aggrs, evalCtxs, make([]sqltypes.Value, len(result.Fields)))
}
// Remove avg decompose columns.
result.RemoveColumns(deIdxs...)
}
func keysEqual(row1, row2 []sqltypes.Value, groups []builder.Aggregator) bool {
for _, v := range groups {
cmp := sqltypes.NullsafeCompare(row1[v.Index], row2[v.Index])
if cmp != 0 {
return false
}
}
return true
}