-
Notifications
You must be signed in to change notification settings - Fork 219
Expand file tree
/
Copy pathshiftmanager.go
More file actions
296 lines (261 loc) · 9.63 KB
/
shiftmanager.go
File metadata and controls
296 lines (261 loc) · 9.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
/*
* Radon
*
* Copyright 2018-2020 The Radon Authors.
* Code is licensed under the GPLv3.
*
*/
package shiftmanager
import (
"fmt"
"runtime"
"sync"
"github.com/radondb/shift/build"
"github.com/radondb/shift/shift"
sxlog "github.com/radondb/shift/xlog"
"github.com/xelabs/go-mysqlstack/sqlparser/depends/sqltypes"
"github.com/xelabs/go-mysqlstack/xlog"
)
const maxShiftInstancesAlived = 10
// ShiftManager used to manager the infos about the shift
// Called by reshard/rebalance
type ShiftManager struct {
log *xlog.Log
mu sync.Mutex
wg sync.WaitGroup
// Limit max 10 shift instances
// key: for reshard, key is `db`.`table`, for rebalance, key is `db`.`table`_backend
instancesAlived map[string]*shiftInstancesAlived
// Store shift infos no matter success or failed
// key: for reshard, key is `db`.`table`, for rebalance, key is `db`.`table`_backend
instancesFinished map[string]*shiftInstancesFinished
}
// shiftInstancesAlived used to store alived shift instances.
type shiftInstancesAlived struct {
shift shift.ShiftHandler
status ShiftStatus
progress string
shiftType ShiftType
}
// shiftInstancesFinished used to store the finished shift instances no matter success or failed.
type shiftInstancesFinished struct {
status ShiftStatus
progress string
shiftType ShiftType
}
// NewShiftManager -- used to create a new shift manager.
func NewShiftManager(log *xlog.Log) ShiftMgrHandler {
return &ShiftManager{
log: log,
}
}
// Init -- used to init the plug module.
func (shiftMgr *ShiftManager) Init() error {
shiftMgr.instancesAlived = make(map[string]*shiftInstancesAlived)
shiftMgr.instancesFinished = make(map[string]*shiftInstancesFinished)
return nil
}
// NewShiftInstance -- used to new a shift instance
func (shiftMgr *ShiftManager) NewShiftInstance(shiftInfo *ShiftInfo, typ ShiftType) (shift.ShiftHandler, error) {
runtime.GOMAXPROCS(runtime.NumCPU())
build := build.GetInfo()
shiftMgr.log.Warning("shift:[%+v]\n", build)
shiftMgr.log.Warning(`
IMPORTANT: Please check that the shift run completes successfully.
At the end of a successful shift run prints "shift.completed.OK!".`)
cfg := &shift.Config{
From: shiftInfo.From,
FromUser: shiftInfo.FromUser,
FromPassword: shiftInfo.FromPassword,
FromDatabase: shiftInfo.FromDatabase,
FromTable: shiftInfo.FromTable,
To: shiftInfo.To,
ToUser: shiftInfo.ToUser,
ToPassword: shiftInfo.ToPassword,
ToDatabase: shiftInfo.ToDatabase,
ToTable: shiftInfo.ToTable,
Rebalance: shiftInfo.Rebalance,
Cleanup: shiftInfo.Cleanup,
MySQLDump: shiftInfo.MysqlDump,
Threads: shiftInfo.Threads,
Behinds: shiftInfo.PosBehinds,
RadonURL: shiftInfo.RadonURL,
Checksum: shiftInfo.Checksum,
WaitTimeBeforeChecksum: shiftInfo.WaitTimeBeforeChecksum,
}
switch typ {
case ShiftTypeReshard:
cfg.ToFlavor = shift.ToRadonDBFlavor
case ShiftTypeRebalance:
cfg.ToFlavor = shift.ToMySQLFlavor
default:
shiftMgr.log.Error(`shift.wrong.shifttype: [%+v].`, typ)
return nil, fmt.Errorf("shift.wrong.shifttype: [%+v].", typ)
}
shiftMgr.log.Info("shift.cfg:%+v", cfg)
slog := sxlog.NewStdLog(sxlog.Level(sxlog.WARNING))
return shift.NewShift(slog, cfg), nil
}
// StartShiftInstance -- used to start a new shift instance
func (shiftMgr *ShiftManager) StartShiftInstance(key string, shift shift.ShiftHandler, typ ShiftType) error {
shiftMgr.mu.Lock()
defer shiftMgr.mu.Unlock()
// check if the instance specified by key has been already in instancesAlived
if _, ok := shiftMgr.instancesAlived[key]; ok {
return fmt.Errorf("shift.instance[%v].is.already.running", key)
}
if typ == ShiftTypeNone {
return fmt.Errorf("shift.instance.type.should.not.be.none")
}
if err := shift.Start(); err != nil {
shiftMgr.log.Error("shift.instance.start.error:%+v", err)
return err
}
if err := shiftMgr.addAnAlivedInstance(key, typ, shift); err != nil {
shiftMgr.log.Error("shiftMgr.add.instance.error:%+v", err)
return err
}
return nil
}
// addAnInstance -- used to add an shift instance to shift manager
// key: for reshard, key is `db`.`table`, for rebalance, key is `db`.`table`_backend
func (shiftMgr *ShiftManager) addAnAlivedInstance(key string, typ ShiftType, shift shift.ShiftHandler) error {
if len(shiftMgr.instancesAlived) < maxShiftInstancesAlived {
shiftMgr.instancesAlived[key] = &shiftInstancesAlived{
shift: shift,
status: ShiftStatusMigrating,
progress: "",
shiftType: typ,
}
return nil
}
return fmt.Errorf("shift.instances.num.exceeding.10.limits")
}
func (shiftMgr *ShiftManager) updateFinishedInstance(key string, status ShiftStatus, typ ShiftType) {
shiftMgr.mu.Lock()
defer shiftMgr.mu.Unlock()
finished := &shiftInstancesFinished{
status: status,
progress: "",
shiftType: typ,
}
// the finished instance added into instancesFinished
shiftMgr.instancesFinished[key] = finished
// the finished instance in instancesAlived should be removed
delete(shiftMgr.instancesAlived, key)
}
// WaitInstanceFinishThread -- used to start a thread to excute wait finish in background.
func (shiftMgr *ShiftManager) WaitInstanceFinishThread(key string) error {
shiftMgr.mu.Lock()
defer shiftMgr.mu.Unlock()
// get instance specified by key
instance, ok := shiftMgr.instancesAlived[key]
if !ok {
shiftMgr.log.Error("shift.manager.wait.thread.start.error:instance[%v].not.found", key)
return fmt.Errorf("shift.manager.wait.thread.start.error:instance[%v].not.found", key)
}
shiftMgr.wg.Add(1)
go func(shiftMgr *ShiftManager, instance *shiftInstancesAlived) {
defer shiftMgr.wg.Done()
err := instance.shift.WaitFinish()
if err != nil {
shiftMgr.log.Error("shift.manager.shift.instance[%v].wait.thread.finish.error:%+v", key, err)
shiftMgr.updateFinishedInstance(key, ShiftStatusFail, instance.shiftType)
} else {
shiftMgr.log.Info("shift.manager.shift.instance[%v].wait.thread.finish.success.", key)
shiftMgr.updateFinishedInstance(key, ShiftStatusSuccess, instance.shiftType)
}
}(shiftMgr, instance)
return nil
}
// WaitInstanceFinish -- used to wait instance run until finished.
func (shiftMgr *ShiftManager) WaitInstanceFinish(key string) error {
shiftMgr.mu.Lock()
// get instance specified by key
instance, ok := shiftMgr.instancesAlived[key]
if !ok {
shiftMgr.log.Error("shift.manager.wait.thread.start.error:instance[%v].not.found", key)
return fmt.Errorf("shift.manager.wait.thread.start.error:instance[%v].not.found", key)
}
// release lock, let instance do finish work alone
shiftMgr.mu.Unlock()
err := instance.shift.WaitFinish()
if err != nil {
shiftMgr.log.Error("shift.manager.shift.instance[%v].wait.thread.finish.error:%+v", key, err)
shiftMgr.updateFinishedInstance(key, ShiftStatusFail, instance.shiftType)
} else {
shiftMgr.log.Info("shift.manager.shift.instance[%v].wait.thread.finish.success.", key)
shiftMgr.updateFinishedInstance(key, ShiftStatusSuccess, instance.shiftType)
}
return err
}
// getInstancesAlivedNums() used to count instancesAlived nums
func (shiftMgr *ShiftManager) getInstancesAlivedNums() int {
shiftMgr.mu.Lock()
defer shiftMgr.mu.Unlock()
return len(shiftMgr.instancesAlived)
}
// getInstancesFinishedNums() used to count instancesAlived nums
func (shiftMgr *ShiftManager) getInstancesFinishedNums() int {
shiftMgr.mu.Lock()
defer shiftMgr.mu.Unlock()
return len(shiftMgr.instancesFinished)
}
// GetStatus -- used to get shift status specified by key
// key: for reshard, key is `db`.`table`, for rebalance, key is `db`.`table`_backend
func (shiftMgr *ShiftManager) GetStatus(key string) ShiftStatus {
if v, ok := shiftMgr.instancesAlived[key]; ok {
return v.status
}
if v, ok := shiftMgr.instancesFinished[key]; ok {
return v.status
}
shiftMgr.log.Error("shift.manager.get.status.error:instance[%v].not.found", key)
return ShiftStatusNone
}
// GetProgress -- used to get shift progress specified by key
// key: for reshard, key is `db`.`table`, for rebalance, key is `db`.`table`_backend
func (shiftMgr *ShiftManager) GetProgress(key string) *sqltypes.Result {
return nil
}
// GetShiftType -- used to get shift type specified by key
func (shiftMgr *ShiftManager) GetShiftType(key string) ShiftType {
if v, ok := shiftMgr.instancesAlived[key]; ok {
return v.shiftType
}
if v, ok := shiftMgr.instancesFinished[key]; ok {
return v.shiftType
}
shiftMgr.log.Error("shift.manager.get.shift.type.error:instance[%v].not.found", key)
return ShiftTypeNone
}
// StopOneInstance used to stop one shift instance specified by key
// key: for reshard, key is `db`.`table`, for rebalance, key is `db`.`table`_backend
func (shiftMgr *ShiftManager) StopOneInstance(key string) error {
instance, ok := shiftMgr.instancesAlived[key]
if ok {
instance.shift.SetStopSignal()
return nil
}
shiftMgr.log.Error("shift.manager.instance[%v].not.found", key)
return fmt.Errorf("shift.manager.instance[%v].not.found", key)
}
// StopAllInstance used to stop all shift instances
// When call Close(), WaitInstanceFinishThread will get err and exit goroutine
func (shiftMgr *ShiftManager) StopAllInstance() error {
for _, instance := range shiftMgr.instancesAlived {
instance.shift.SetStopSignal()
}
return nil
}
// Close -- used to close all the shift instances that are on working
// When call Close(), WaitInstanceFinishThread will get err and exit goroutine
func (shiftMgr *ShiftManager) Close() error {
err := shiftMgr.StopAllInstance()
if err != nil {
return err
}
// wait all instances running in background to finish wait
shiftMgr.wg.Wait()
return nil
}