Skip to content

Commit 95b88ff

Browse files
3424672656wanghuaiyuan
andauthored
[ISSUE #8442][RIP-70-3] Extract adaptive lock mechanism (#8663)
* extract the adaptive lock * extract the adaptive lock * feat(): perfect the adaptive lock * feat(): perfect the adaptive lock * Optimized code type * Optimized code type * Optimized code type * fix fail test * Optimize the adaptive locking mechanism logic * Optimize the adaptive locking mechanism logic * feat:Adaptive locking mechanism adjustment * feat:Adaptive locking mechanism adjustment * feat:Adaptive locking mechanism adjustment * Optimize the adaptive locking mechanism logic * Optimize the adaptive locking mechanism logic * Optimize the adaptive locking mechanism logic * feat:Supports the hot activation of ABS locks * feat:Supports the hot activation of ABS locks * feat:Supports the hot activation of ABS locks * feat:Supports the hot activation of ABS locks * Optimize code style * Optimize code style * Optimize code style * Optimize code style * Optimize code style * Optimize code style * Updated the locking mechanism name * Optimize the logic of switching to spin locks * Optimize the logic of switching to spin locks * Optimize the logic of switching to spin locks * Optimize the logic of switching to spin locks * Optimize the logic of switching to spin locks * Optimize the logic of switching to spin locks * Optimize the logic of switching to spin locks * Optimize the logic of switching to spin locks * delete unused import * Optimize the logic of switching to spin locks * Revert "Optimize the logic of switching to spin locks" This reverts commit 1d7bac5. * Optimize the logic of switching to spin locks * Optimize the logic of switching to spin locks * Optimize the logic of switching to spin locks * Optimize the logic of switching to spin locks * Optimize the logic of switching to spin locks * Optimize the logic of switching to spin locks * Optimize the logic of switching to spin locks * Optimized locking logic * Optimized locking logic * Optimized locking logic * fix test * fix test * fix test * fix test * Optimize code style * Optimize code style * fix test * fix test * optimize client rebalancing logic --------- Co-authored-by: wanghuaiyuan <[email protected]>
1 parent 738c9f3 commit 95b88ff

File tree

7 files changed

+504
-1
lines changed

7 files changed

+504
-1
lines changed

store/src/main/java/org/apache/rocketmq/store/CommitLog.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import org.apache.rocketmq.store.exception.StoreException;
6363
import org.apache.rocketmq.store.ha.HAService;
6464
import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
65+
import org.apache.rocketmq.store.lock.AdaptiveBackOffSpinLockImpl;
6566
import org.apache.rocketmq.store.logfile.MappedFile;
6667
import org.apache.rocketmq.store.util.LibC;
6768
import org.rocksdb.RocksDBException;
@@ -130,7 +131,11 @@ protected PutMessageThreadLocal initialValue() {
130131
return new PutMessageThreadLocal(defaultMessageStore.getMessageStoreConfig());
131132
}
132133
};
133-
this.putMessageLock = messageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
134+
135+
PutMessageLock adaptiveBackOffSpinLock = new AdaptiveBackOffSpinLockImpl();
136+
137+
this.putMessageLock = messageStore.getMessageStoreConfig().getUseABSLock() ? adaptiveBackOffSpinLock :
138+
messageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
134139

135140
this.flushDiskWatcher = new FlushDiskWatcher();
136141

store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -445,6 +445,17 @@ public class MessageStoreConfig {
445445
*/
446446
private String bottomMostCompressionTypeForConsumeQueueStore = "zstd";
447447

448+
/**
449+
* Spin number in the retreat strategy of spin lock
450+
* Default is 1000
451+
*/
452+
private int spinLockCollisionRetreatOptimalDegree = 1000;
453+
454+
/**
455+
* Use AdaptiveBackOffLock
456+
**/
457+
private boolean useABSLock = false;
458+
448459
public boolean isRocksdbCQDoubleWriteEnable() {
449460
return rocksdbCQDoubleWriteEnable;
450461
}
@@ -1898,4 +1909,20 @@ public String getBottomMostCompressionTypeForConsumeQueueStore() {
18981909
public void setBottomMostCompressionTypeForConsumeQueueStore(String bottomMostCompressionTypeForConsumeQueueStore) {
18991910
this.bottomMostCompressionTypeForConsumeQueueStore = bottomMostCompressionTypeForConsumeQueueStore;
19001911
}
1912+
1913+
public int getSpinLockCollisionRetreatOptimalDegree() {
1914+
return spinLockCollisionRetreatOptimalDegree;
1915+
}
1916+
1917+
public void setSpinLockCollisionRetreatOptimalDegree(int spinLockCollisionRetreatOptimalDegree) {
1918+
this.spinLockCollisionRetreatOptimalDegree = spinLockCollisionRetreatOptimalDegree;
1919+
}
1920+
1921+
public void setUseABSLock(boolean useABSLock) {
1922+
this.useABSLock = useABSLock;
1923+
}
1924+
1925+
public boolean getUseABSLock() {
1926+
return useABSLock;
1927+
}
19011928
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.rocketmq.store.lock;
18+
19+
import org.apache.rocketmq.store.PutMessageLock;
20+
import org.apache.rocketmq.store.config.MessageStoreConfig;
21+
22+
public interface AdaptiveBackOffSpinLock extends PutMessageLock {
23+
/**
24+
* Configuration update
25+
* @param messageStoreConfig
26+
*/
27+
default void update(MessageStoreConfig messageStoreConfig) {
28+
}
29+
30+
/**
31+
* Locking mechanism switching
32+
*/
33+
default void swap() {
34+
}
35+
}
Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.rocketmq.store.lock;
18+
19+
import org.apache.rocketmq.store.config.MessageStoreConfig;
20+
21+
import java.time.LocalTime;
22+
import java.util.ArrayList;
23+
import java.util.HashMap;
24+
import java.util.List;
25+
import java.util.Map;
26+
import java.util.concurrent.ConcurrentHashMap;
27+
import java.util.concurrent.atomic.AtomicBoolean;
28+
import java.util.concurrent.atomic.AtomicInteger;
29+
30+
public class AdaptiveBackOffSpinLockImpl implements AdaptiveBackOffSpinLock {
31+
private AdaptiveBackOffSpinLock adaptiveLock;
32+
//state
33+
private AtomicBoolean state = new AtomicBoolean(true);
34+
35+
// Used to determine the switchover between a mutex lock and a spin lock
36+
private final static float SWAP_SPIN_LOCK_RATIO = 0.8f;
37+
38+
// It is used to adjust the spin number K of the escape spin lock
39+
// When (retreat number / TPS) <= (1 / BASE_SWAP_ADAPTIVE_RATIO * SPIN_LOCK_ADAPTIVE_RATIO), K is decreased
40+
private final static int SPIN_LOCK_ADAPTIVE_RATIO = 4;
41+
42+
// It is used to adjust the spin number K of the escape spin lock
43+
// When (retreat number / TPS) >= (1 / BASE_SWAP_ADAPTIVE_RATIO), K is increased
44+
private final static int BASE_SWAP_LOCK_RATIO = 320;
45+
46+
private final static String BACK_OFF_SPIN_LOCK = "SpinLock";
47+
48+
private final static String REENTRANT_LOCK = "ReentrantLock";
49+
50+
private Map<String, AdaptiveBackOffSpinLock> locks;
51+
52+
private final List<AtomicInteger> tpsTable;
53+
54+
private final List<Map<Thread, Byte>> threadTable;
55+
56+
private int swapCriticalPoint;
57+
58+
private AtomicInteger currentThreadNum = new AtomicInteger(0);
59+
60+
private AtomicBoolean isOpen = new AtomicBoolean(true);
61+
62+
public AdaptiveBackOffSpinLockImpl() {
63+
this.locks = new HashMap<>();
64+
this.locks.put(REENTRANT_LOCK, new BackOffReentrantLock());
65+
this.locks.put(BACK_OFF_SPIN_LOCK, new BackOffSpinLock());
66+
67+
this.threadTable = new ArrayList<>(2);
68+
this.threadTable.add(new ConcurrentHashMap<>());
69+
this.threadTable.add(new ConcurrentHashMap<>());
70+
71+
this.tpsTable = new ArrayList<>(2);
72+
this.tpsTable.add(new AtomicInteger(0));
73+
this.tpsTable.add(new AtomicInteger(0));
74+
75+
adaptiveLock = this.locks.get(BACK_OFF_SPIN_LOCK);
76+
}
77+
78+
@Override
79+
public void lock() {
80+
int slot = LocalTime.now().getSecond() % 2;
81+
this.threadTable.get(slot).putIfAbsent(Thread.currentThread(), Byte.MAX_VALUE);
82+
this.tpsTable.get(slot).getAndIncrement();
83+
boolean state;
84+
do {
85+
state = this.state.get();
86+
} while (!state);
87+
88+
currentThreadNum.incrementAndGet();
89+
this.adaptiveLock.lock();
90+
}
91+
92+
@Override
93+
public void unlock() {
94+
this.adaptiveLock.unlock();
95+
currentThreadNum.decrementAndGet();
96+
if (isOpen.get()) {
97+
swap();
98+
}
99+
}
100+
101+
@Override
102+
public void update(MessageStoreConfig messageStoreConfig) {
103+
this.adaptiveLock.update(messageStoreConfig);
104+
}
105+
106+
@Override
107+
public void swap() {
108+
if (!this.state.get()) {
109+
return;
110+
}
111+
boolean needSwap = false;
112+
int slot = 1 - LocalTime.now().getSecond() % 2;
113+
int tps = this.tpsTable.get(slot).get() + 1;
114+
int threadNum = this.threadTable.get(slot).size();
115+
this.tpsTable.get(slot).set(-1);
116+
this.threadTable.get(slot).clear();
117+
if (tps == 0) {
118+
return;
119+
}
120+
121+
if (this.adaptiveLock instanceof BackOffSpinLock) {
122+
BackOffSpinLock lock = (BackOffSpinLock) this.adaptiveLock;
123+
// Avoid frequent adjustment of K, and make a reasonable range through experiments
124+
// reasonable range : (retreat number / TPS) > (1 / BASE_SWAP_ADAPTIVE_RATIO * SPIN_LOCK_ADAPTIVE_RATIO) &&
125+
// (retreat number / TPS) < (1 / BASE_SWAP_ADAPTIVE_RATIO)
126+
if (lock.getNumberOfRetreat(slot) * BASE_SWAP_LOCK_RATIO >= tps) {
127+
if (lock.isAdapt()) {
128+
lock.adapt(true);
129+
} else {
130+
// It is used to switch between mutex lock and spin lock
131+
this.swapCriticalPoint = tps * threadNum;
132+
needSwap = true;
133+
}
134+
} else if (lock.getNumberOfRetreat(slot) * BASE_SWAP_LOCK_RATIO * SPIN_LOCK_ADAPTIVE_RATIO <= tps) {
135+
lock.adapt(false);
136+
}
137+
lock.setNumberOfRetreat(slot, 0);
138+
} else {
139+
if (tps * threadNum <= this.swapCriticalPoint * SWAP_SPIN_LOCK_RATIO) {
140+
needSwap = true;
141+
}
142+
}
143+
144+
if (needSwap) {
145+
if (this.state.compareAndSet(true, false)) {
146+
// Ensures that no threads are in contention locks as well as in critical zones
147+
int currentThreadNum;
148+
do {
149+
currentThreadNum = this.currentThreadNum.get();
150+
} while (currentThreadNum != 0);
151+
152+
try {
153+
if (this.adaptiveLock instanceof BackOffSpinLock) {
154+
this.adaptiveLock = this.locks.get(REENTRANT_LOCK);
155+
} else {
156+
this.adaptiveLock = this.locks.get(BACK_OFF_SPIN_LOCK);
157+
((BackOffSpinLock) this.adaptiveLock).adapt(false);
158+
}
159+
} catch (Exception e) {
160+
//ignore
161+
} finally {
162+
this.state.compareAndSet(false, true);
163+
}
164+
}
165+
}
166+
}
167+
168+
public List<AdaptiveBackOffSpinLock> getLocks() {
169+
return (List<AdaptiveBackOffSpinLock>) this.locks.values();
170+
}
171+
172+
public void setLocks(Map<String, AdaptiveBackOffSpinLock> locks) {
173+
this.locks = locks;
174+
}
175+
176+
public boolean getState() {
177+
return this.state.get();
178+
}
179+
180+
public void setState(boolean state) {
181+
this.state.set(state);
182+
}
183+
184+
public AdaptiveBackOffSpinLock getAdaptiveLock() {
185+
return adaptiveLock;
186+
}
187+
188+
public List<AtomicInteger> getTpsTable() {
189+
return tpsTable;
190+
}
191+
192+
public void setSwapCriticalPoint(int swapCriticalPoint) {
193+
this.swapCriticalPoint = swapCriticalPoint;
194+
}
195+
196+
public int getSwapCriticalPoint() {
197+
return swapCriticalPoint;
198+
}
199+
200+
public boolean isOpen() {
201+
return this.isOpen.get();
202+
}
203+
204+
public void setOpen(boolean open) {
205+
this.isOpen.set(open);
206+
}
207+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.rocketmq.store.lock;
18+
19+
import java.util.concurrent.locks.ReentrantLock;
20+
21+
public class BackOffReentrantLock implements AdaptiveBackOffSpinLock {
22+
private ReentrantLock putMessageNormalLock = new ReentrantLock(); // NonfairSync
23+
24+
@Override
25+
public void lock() {
26+
putMessageNormalLock.lock();
27+
}
28+
29+
@Override
30+
public void unlock() {
31+
putMessageNormalLock.unlock();
32+
}
33+
}

0 commit comments

Comments
 (0)