-
Notifications
You must be signed in to change notification settings - Fork 219
Expand file tree
/
Copy pathtxnmgr.go
More file actions
106 lines (89 loc) · 2.05 KB
/
txnmgr.go
File metadata and controls
106 lines (89 loc) · 2.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
/*
* Radon
*
* Copyright 2018 The Radon Authors.
* Code is licensed under the GPLv3.
*
*/
package backend
import (
"errors"
"sync"
"sync/atomic"
"config"
"github.com/xelabs/go-mysqlstack/xlog"
)
// TxnManager tuple.
type TxnManager struct {
log *xlog.Log
xaCheck *XaCheck
txnid uint64
txnNums int64
commitLock sync.RWMutex
}
// NewTxnManager creates new TxnManager.
func NewTxnManager(log *xlog.Log) *TxnManager {
return &TxnManager{
log: log,
txnid: 0,
}
}
// Init is used to init the async worker xaCheck.
func (mgr *TxnManager) Init(scatter *Scatter, ScatterConf *config.ScatterConfig) error {
xaChecker := NewXaCheck(scatter, ScatterConf)
if err := xaChecker.Init(); err != nil {
return err
}
mgr.xaCheck = xaChecker
return nil
}
// Close is used to close the async worker xaCheck.
func (mgr *TxnManager) Close() {
if mgr.xaCheck != nil {
mgr.xaCheck.Close()
mgr.xaCheck = nil
}
}
// GetID returns a new txnid.
func (mgr *TxnManager) GetID() uint64 {
return atomic.AddUint64(&mgr.txnid, 1)
}
// Add used to add a txn to mgr.
func (mgr *TxnManager) Add() error {
atomic.AddInt64(&mgr.txnNums, 1)
return nil
}
// Remove used to remove a txn from mgr.
func (mgr *TxnManager) Remove() error {
atomic.AddInt64(&mgr.txnNums, -1)
return nil
}
// CreateTxn creates new txn.
func (mgr *TxnManager) CreateTxn(backends map[string]*Poolz) (*Txn, error) {
if len(backends) == 0 {
return nil, errors.New("backends.is.NULL")
}
txid := mgr.GetID()
txn, err := NewTxn(mgr.log, txid, mgr, backends)
if err != nil {
return nil, err
}
mgr.Add()
return txn, nil
}
// CommitLock used to acquire the commit.
func (mgr *TxnManager) CommitLock() {
mgr.commitLock.Lock()
}
// CommitUnlock used to release the commit.
func (mgr *TxnManager) CommitUnlock() {
mgr.commitLock.Unlock()
}
// CommitRLock used to acquire the read lock of commit.
func (mgr *TxnManager) CommitRLock() {
mgr.commitLock.RLock()
}
// CommitRUnlock used to release the read lock of commit.
func (mgr *TxnManager) CommitRUnlock() {
mgr.commitLock.RUnlock()
}