-
Notifications
You must be signed in to change notification settings - Fork 219
Expand file tree
/
Copy pathaggregate_executor.go
More file actions
129 lines (114 loc) · 3.28 KB
/
aggregate_executor.go
File metadata and controls
129 lines (114 loc) · 3.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
/*
* Radon
*
* Copyright 2018 The Radon Authors.
* Code is licensed under the GPLv3.
*
*/
package executor
import (
"sort"
"expression"
"planner"
"xcontext"
"github.com/xelabs/go-mysqlstack/sqlparser/depends/sqltypes"
"github.com/xelabs/go-mysqlstack/xlog"
)
var (
_ Executor = &AggregateExecutor{}
)
// AggregateExecutor represents aggregate executor.
// Including: COUNT/MAX/MIN/SUM/AVG/GROUPBY.
type AggregateExecutor struct {
log *xlog.Log
plan planner.Plan
}
// NewAggregateExecutor creates new AggregateExecutor.
func NewAggregateExecutor(log *xlog.Log, plan planner.Plan) *AggregateExecutor {
return &AggregateExecutor{
log: log,
plan: plan,
}
}
// Execute used to execute the executor.
func (executor *AggregateExecutor) Execute(ctx *xcontext.ResultContext) error {
rs := ctx.Results
executor.aggregate(rs)
return nil
}
// Aggregate used to do rows-aggregator(COUNT/SUM/MIN/MAX/AVG) and grouped them into group-by fields.
// Don't use `group by` alone, `group by` needs to be used with the aggregation function. Otherwise
// the result of radon may be different from the result of mysql.
// eg: select a,b from tb group by b. ×
// select count(a),b from tb group by b. √
// select b from tb group by b. √
func (executor *AggregateExecutor) aggregate(result *sqltypes.Result) {
var deIdxs []int
plan := executor.plan.(*planner.AggregatePlan)
if plan.Empty() {
return
}
aggPlans := plan.NormalAggregators()
aggPlansLen := len(aggPlans)
groupAggrs := plan.GroupAggregators()
if len(groupAggrs) > 0 {
sort.Slice(result.Rows, func(i, j int) bool {
for _, key := range groupAggrs {
cmp := sqltypes.NullsafeCompare(result.Rows[i][key.Index], result.Rows[j][key.Index])
if cmp == 0 {
continue
}
return cmp < 0
}
return true
})
}
type group struct {
row []sqltypes.Value
evalCtxs []*expression.AggEvaluateContext
}
aggrs := expression.NewAggregations(aggPlans, plan.IsPushDown, result.Fields)
var groups []*group
for _, row := range result.Rows {
length := len(groups)
if length == 0 {
evalCtxs := expression.NewAggEvalCtxs(aggrs, row)
groups = append(groups, &group{row, evalCtxs})
continue
}
equal := executor.keysEqual(groups[length-1].row, row, groupAggrs)
if equal {
if aggPlansLen > 0 {
for i, aggr := range aggrs {
aggr.Update(row, groups[length-1].evalCtxs[i])
}
}
} else {
evalCtxs := expression.NewAggEvalCtxs(aggrs, row)
groups = append(groups, &group{row, evalCtxs})
}
}
// Handle the avg operator and rebuild the results.
i := 0
result.Rows = make([][]sqltypes.Value, len(groups))
for _, g := range groups {
result.Rows[i], deIdxs = expression.GetResults(aggrs, g.evalCtxs, g.row)
i++
}
if len(groups) == 0 && aggPlansLen > 0 {
result.Rows = make([][]sqltypes.Value, 1)
evalCtxs := expression.NewAggEvalCtxs(aggrs, nil)
result.Rows[0], deIdxs = expression.GetResults(aggrs, evalCtxs, make([]sqltypes.Value, len(result.Fields)))
}
// Remove avg decompose columns.
result.RemoveColumns(deIdxs...)
}
func (executor *AggregateExecutor) keysEqual(row1, row2 []sqltypes.Value, groups []planner.Aggregator) bool {
for _, v := range groups {
cmp := sqltypes.NullsafeCompare(row1[v.Index], row2[v.Index])
if cmp != 0 {
return false
}
}
return true
}