-
Notifications
You must be signed in to change notification settings - Fork 219
Expand file tree
/
Copy pathmultistmt_txn.go
More file actions
177 lines (150 loc) · 6.02 KB
/
multistmt_txn.go
File metadata and controls
177 lines (150 loc) · 6.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
/*
* Radon
*
* Copyright 2018 The Radon Authors.
* Code is licensed under the GPLv3.
*
*/
package proxy
import (
"backend"
"github.com/pkg/errors"
"github.com/xelabs/go-mysqlstack/driver"
"github.com/xelabs/go-mysqlstack/sqlparser"
"github.com/xelabs/go-mysqlstack/sqlparser/depends/sqltypes"
)
func (spanner *Spanner) handleMultiStmtTxn(session *driver.Session, query string, node sqlparser.Statement) (*sqltypes.Result, error) {
var err error
var qr *sqltypes.Result
log := spanner.log
snode := node.(*sqlparser.Transaction)
switch snode.Action {
case sqlparser.StartTxnStr:
qr, err = spanner.handleStartTransaction(session, snode.Action, node)
case sqlparser.BeginTxnStr:
qr, err = spanner.handleBegin(session, snode.Action, node)
case sqlparser.RollbackTxnStr:
qr, err = spanner.handleRollback(session, snode.Action, node)
case sqlparser.CommitTxnStr:
qr, err = spanner.handleCommit(session, snode.Action, node)
}
if err != nil {
log.Error("proxy.query.multistmt.txn.[%s].error:%s", query, err)
}
return qr, err
}
// handleStartTransaction used to handle Multi-statement transaction "start transaction"
func (spanner *Spanner) handleStartTransaction(session *driver.Session, query string, node sqlparser.Statement) (*sqltypes.Result, error) {
return spanner.ExecuteBegin(session, query, node)
}
// handleBegin used to handle Multi-statement transaction "begin"
func (spanner *Spanner) handleBegin(session *driver.Session, query string, node sqlparser.Statement) (*sqltypes.Result, error) {
return spanner.ExecuteBegin(session, query, node)
}
// handleRollback used to handle Multi-statement transaction "rollback"
func (spanner *Spanner) handleRollback(session *driver.Session, query string, node sqlparser.Statement) (*sqltypes.Result, error) {
return spanner.ExecuteRollback(session, query, node)
}
// handleCommit used to handle Multi-statement transaction "commit"
func (spanner *Spanner) handleCommit(session *driver.Session, query string, node sqlparser.Statement) (*sqltypes.Result, error) {
return spanner.ExecuteCommit(session, query, node)
}
// ExecuteBegin used to execute "start transaction" or "begin".
func (spanner *Spanner) ExecuteBegin(session *driver.Session, query string, node sqlparser.Statement) (*sqltypes.Result, error) {
log := spanner.log
conf := spanner.conf
sessions := spanner.sessions
scatter := spanner.scatter
var txn backend.Transaction
var err error
if !spanner.isTwoPC() {
log.Error("spanner.execute.2pc.disable")
return nil, errors.Errorf("spanner.query.execute.multistmt.txn.error[twopc-disable]")
}
currentSession := sessions.getTxnSession(session)
txn = currentSession.transaction
//1. If the previous cmd is not in transaction, and the autocommit = 0, the begin will implicit commit in mysql,
// the case is not supported. https://dev.mysql.com/doc/refman/5.7/en/implicit-commit.html
//2. If txn is not nil, it isn't supported. e.g., begin;sql1;sql2;.. begin;(return err, and the txn isn't free);
if txn != nil {
// the last txn isn't free
log.Error("spanner.execute.multistmt.begin.nestedTxn.unsupported.")
return nil, errors.Errorf("ExecuteMultiStatBegin.nestedTxn.unsupported")
}
txn, err = scatter.CreateTransaction()
if err != nil {
log.Error("spanner.txn.create.error:[%v]", err)
return nil, err
}
txn.SetTimeout(conf.Proxy.QueryTimeout)
txn.SetMaxResult(conf.Proxy.MaxResultSize)
txn.SetMaxJoinRows(conf.Proxy.MaxJoinRows)
txn.SetMultiStmtTxn()
sessions.MultiStmtTxnBinding(session, txn, node, query)
if err := txn.BeginScatter(); err != nil {
txn.Finish()
sessions.MultiStmtTxnUnBinding(session, true)
log.Error("spanner.execute.multistmt.txn.begin.scatter.error:[%v]", err)
return nil, err
}
qr := &sqltypes.Result{}
return qr, nil
}
// ExecuteRollback used to execute multiple-statement transaction sql:"rollback"
func (spanner *Spanner) ExecuteRollback(session *driver.Session, query string, node sqlparser.Statement) (*sqltypes.Result, error) {
log := spanner.log
sessions := spanner.sessions
var txn backend.Transaction
if !spanner.isTwoPC() {
log.Error("spanner.execute.multistmt.txn.rollback.2pc.disable")
qr := &sqltypes.Result{Warnings: 1}
return qr, errors.Errorf("spanner.execute.multistmt.txn.rollback.error[twopc-disable]")
}
// transaction.
currentSession := sessions.getTxnSession(session)
txn = currentSession.transaction
// return err if query is "rollback" without begin a multi-transaction.
if txn == nil {
log.Error("spanner.execute.multistmt.txn.rollback.error.txn.not.begin")
qr := &sqltypes.Result{}
return qr, errors.Errorf("unsupported: rollback.without.txn.begin")
}
sessions.MultiStmtTxnBinding(session, nil, node, query)
if err := txn.RollbackScatter(); err != nil {
log.Error("spanner.execute.multistmt.txn.rollback.scattr.error:[%v]", err)
return nil, err
}
sessions.MultiStmtTxnUnBinding(session, true)
txn.Finish()
qr := &sqltypes.Result{}
return qr, nil
}
// ExecuteCommit used to execute multiple-statement transaction: "commit"
func (spanner *Spanner) ExecuteCommit(session *driver.Session, query string, node sqlparser.Statement) (*sqltypes.Result, error) {
log := spanner.log
sessions := spanner.sessions
var txn backend.Transaction
if !spanner.isTwoPC() {
log.Error("spanner.execute.multistmt.txn.commit.error.2pc.disable")
qr := &sqltypes.Result{Warnings: 1}
return qr, errors.Errorf("spanner.execute.multistmt.txn.commit.error:[twopc-disable]")
}
// transaction.
currentSession := sessions.getTxnSession(session)
txn = currentSession.transaction
// return err if "commit" was sent without begin a multi-transaction.
if txn == nil {
log.Error("spanner.execute.multistmt.txn.commit.error.txn.not.begin")
qr := &sqltypes.Result{}
return qr, errors.Errorf("unsupported: commit.without.txn.begin")
}
sessions.MultiStmtTxnBinding(session, nil, node, query)
if err := txn.CommitScatter(); err != nil {
log.Error("spanner.execute.multistmt.txn.commit.scattr.error:[%v]", err)
return nil, err
}
sessions.MultiStmtTxnUnBinding(session, true)
txn.Finish()
qr := &sqltypes.Result{}
return qr, nil
}