Skip to content

Commit d056c40

Browse files
author
wanghuaiyuan
committed
Merge branch 'develop' into adaptive_lock
# Conflicts: # store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
2 parents 4b7fa65 + 2956f6d commit d056c40

File tree

80 files changed

+1521
-882
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

80 files changed

+1521
-882
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,5 @@ bazel-out
1717
bazel-bin
1818
bazel-rocketmq
1919
bazel-testlogs
20-
.vscode
20+
.vscode
21+
MODULE.bazel.lock

MODULE.bazel

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
###############################################################################
18+
# Bazel now uses Bzlmod by default to manage external dependencies.
19+
# Please consider migrating your external dependencies from WORKSPACE to MODULE.bazel.
20+
#
21+
# For more details, please check https://github.com/bazelbuild/bazel/issues/18958
22+
###############################################################################

WORKSPACE

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,8 @@ maven_install(
112112
"com.alipay.sofa:hessian:3.3.6",
113113
"io.netty:netty-tcnative-boringssl-static:2.0.48.Final",
114114
"org.mockito:mockito-junit-jupiter:4.11.0",
115+
"com.alibaba.fastjson2:fastjson2:2.0.43",
116+
"org.junit.jupiter:junit-jupiter-api:5.9.1",
115117
],
116118
fetch_sources = True,
117119
repositories = [

broker/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,9 @@ java_library(
9191
"@maven//:io_github_aliyunmq_rocketmq_slf4j_api",
9292
"@maven//:org_powermock_powermock_core",
9393
"@maven//:io_opentelemetry_opentelemetry_api",
94+
"@maven//:com_googlecode_concurrentlinkedhashmap_concurrentlinkedhashmap_lru",
95+
"@maven//:org_apache_rocketmq_rocketmq_rocksdb",
96+
"@maven//:commons_collections_commons_collections",
9497
],
9598
)
9699

broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1010,7 +1010,7 @@ private void initialTransaction() {
10101010

10111011
private void initialAcl() {
10121012
if (!this.brokerConfig.isAclEnable()) {
1013-
LOG.info("The broker dose not enable acl");
1013+
LOG.info("The broker does not enable acl");
10141014
return;
10151015
}
10161016

broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java

Lines changed: 42 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.Map;
2626
import java.util.Map.Entry;
2727
import java.util.concurrent.ConcurrentHashMap;
28+
import java.util.concurrent.ConcurrentMap;
2829
import java.util.concurrent.CopyOnWriteArrayList;
2930
import org.apache.rocketmq.broker.util.PositiveAtomicCounter;
3031
import org.apache.rocketmq.common.constant.LoggerName;
@@ -39,11 +40,11 @@ public class ProducerManager {
3940
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
4041
private static final long CHANNEL_EXPIRED_TIMEOUT = 1000 * 120;
4142
private static final int GET_AVAILABLE_CHANNEL_RETRY_COUNT = 3;
42-
private final ConcurrentHashMap<String /* group name */, ConcurrentHashMap<Channel, ClientChannelInfo>> groupChannelTable =
43+
private final ConcurrentMap<String /* group name */, ConcurrentMap<Channel, ClientChannelInfo>> groupChannelTable =
4344
new ConcurrentHashMap<>();
44-
private final ConcurrentHashMap<String, Channel> clientChannelTable = new ConcurrentHashMap<>();
45+
private final ConcurrentMap<String, Channel> clientChannelTable = new ConcurrentHashMap<>();
4546
protected final BrokerStatsManager brokerStatsManager;
46-
private PositiveAtomicCounter positiveAtomicCounter = new PositiveAtomicCounter();
47+
private final PositiveAtomicCounter positiveAtomicCounter = new PositiveAtomicCounter();
4748
private final List<ProducerChangeListener> producerChangeListenerList = new CopyOnWriteArrayList<>();
4849

4950
public ProducerManager() {
@@ -63,22 +64,22 @@ public boolean groupOnline(String group) {
6364
return channels != null && !channels.isEmpty();
6465
}
6566

66-
public ConcurrentHashMap<String, ConcurrentHashMap<Channel, ClientChannelInfo>> getGroupChannelTable() {
67+
public ConcurrentMap<String, ConcurrentMap<Channel, ClientChannelInfo>> getGroupChannelTable() {
6768
return groupChannelTable;
6869
}
6970

7071
public ProducerTableInfo getProducerTable() {
7172
Map<String, List<ProducerInfo>> map = new HashMap<>();
7273
for (String group : this.groupChannelTable.keySet()) {
73-
for (Entry<Channel, ClientChannelInfo> entry: this.groupChannelTable.get(group).entrySet()) {
74+
for (Entry<Channel, ClientChannelInfo> entry : this.groupChannelTable.get(group).entrySet()) {
7475
ClientChannelInfo clientChannelInfo = entry.getValue();
7576
if (map.containsKey(group)) {
7677
map.get(group).add(new ProducerInfo(
77-
clientChannelInfo.getClientId(),
78-
clientChannelInfo.getChannel().remoteAddress().toString(),
79-
clientChannelInfo.getLanguage(),
80-
clientChannelInfo.getVersion(),
81-
clientChannelInfo.getLastUpdateTimestamp()
78+
clientChannelInfo.getClientId(),
79+
clientChannelInfo.getChannel().remoteAddress().toString(),
80+
clientChannelInfo.getLanguage(),
81+
clientChannelInfo.getVersion(),
82+
clientChannelInfo.getLastUpdateTimestamp()
8283
));
8384
} else {
8485
map.put(group, new ArrayList<>(Collections.singleton(new ProducerInfo(
@@ -95,13 +96,13 @@ public ProducerTableInfo getProducerTable() {
9596
}
9697

9798
public void scanNotActiveChannel() {
98-
Iterator<Map.Entry<String, ConcurrentHashMap<Channel, ClientChannelInfo>>> iterator = this.groupChannelTable.entrySet().iterator();
99+
Iterator<Map.Entry<String, ConcurrentMap<Channel, ClientChannelInfo>>> iterator = this.groupChannelTable.entrySet().iterator();
99100

100101
while (iterator.hasNext()) {
101-
Map.Entry<String, ConcurrentHashMap<Channel, ClientChannelInfo>> entry = iterator.next();
102+
Map.Entry<String, ConcurrentMap<Channel, ClientChannelInfo>> entry = iterator.next();
102103

103104
final String group = entry.getKey();
104-
final ConcurrentHashMap<Channel, ClientChannelInfo> chlMap = entry.getValue();
105+
final ConcurrentMap<Channel, ClientChannelInfo> chlMap = entry.getValue();
105106

106107
Iterator<Entry<Channel, ClientChannelInfo>> it = chlMap.entrySet().iterator();
107108
while (it.hasNext()) {
@@ -117,8 +118,8 @@ public void scanNotActiveChannel() {
117118
clientChannelTable.remove(info.getClientId());
118119
}
119120
log.warn(
120-
"ProducerManager#scanNotActiveChannel: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}",
121-
RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group);
121+
"ProducerManager#scanNotActiveChannel: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}",
122+
RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group);
122123
callProducerChangeListener(ProducerGroupEvent.CLIENT_UNREGISTER, group, info);
123124
RemotingHelper.closeChannel(info.getChannel());
124125
}
@@ -132,25 +133,22 @@ public void scanNotActiveChannel() {
132133
}
133134
}
134135

135-
public synchronized boolean doChannelCloseEvent(final String remoteAddr, final Channel channel) {
136+
public boolean doChannelCloseEvent(final String remoteAddr, final Channel channel) {
136137
boolean removed = false;
137138
if (channel != null) {
138-
for (final Map.Entry<String, ConcurrentHashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable
139-
.entrySet()) {
139+
for (final Map.Entry<String, ConcurrentMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable.entrySet()) {
140140
final String group = entry.getKey();
141-
final ConcurrentHashMap<Channel, ClientChannelInfo> clientChannelInfoTable =
142-
entry.getValue();
143-
final ClientChannelInfo clientChannelInfo =
144-
clientChannelInfoTable.remove(channel);
141+
final ConcurrentMap<Channel, ClientChannelInfo> clientChannelInfoTable = entry.getValue();
142+
final ClientChannelInfo clientChannelInfo = clientChannelInfoTable.remove(channel);
145143
if (clientChannelInfo != null) {
146144
clientChannelTable.remove(clientChannelInfo.getClientId());
147145
removed = true;
148146
log.info(
149-
"NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable, producer group: {}",
150-
clientChannelInfo.toString(), remoteAddr, group);
147+
"NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable, producer group: {}",
148+
clientChannelInfo.toString(), remoteAddr, group);
151149
callProducerChangeListener(ProducerGroupEvent.CLIENT_UNREGISTER, group, clientChannelInfo);
152150
if (clientChannelInfoTable.isEmpty()) {
153-
ConcurrentHashMap<Channel, ClientChannelInfo> oldGroupTable = this.groupChannelTable.remove(group);
151+
ConcurrentMap<Channel, ClientChannelInfo> oldGroupTable = this.groupChannelTable.remove(group);
154152
if (oldGroupTable != null) {
155153
log.info("unregister a producer group[{}] from groupChannelTable", group);
156154
callProducerChangeListener(ProducerGroupEvent.GROUP_UNREGISTER, group, null);
@@ -163,37 +161,44 @@ public synchronized boolean doChannelCloseEvent(final String remoteAddr, final C
163161
return removed;
164162
}
165163

166-
public synchronized void registerProducer(final String group, final ClientChannelInfo clientChannelInfo) {
167-
ClientChannelInfo clientChannelInfoFound = null;
164+
public void registerProducer(final String group, final ClientChannelInfo clientChannelInfo) {
165+
ClientChannelInfo clientChannelInfoFound;
168166

169-
ConcurrentHashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
167+
ConcurrentMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
170168
if (null == channelTable) {
171169
channelTable = new ConcurrentHashMap<>();
172-
this.groupChannelTable.put(group, channelTable);
170+
// Make sure channelTable will NOT be cleaned by #scanNotActiveChannel
171+
channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo);
172+
ConcurrentMap<Channel, ClientChannelInfo> prev = this.groupChannelTable.putIfAbsent(group, channelTable);
173+
if (null == prev) {
174+
// Add client-id to channel mapping for new producer group
175+
clientChannelTable.put(clientChannelInfo.getClientId(), clientChannelInfo.getChannel());
176+
} else {
177+
channelTable = prev;
178+
}
173179
}
174180

175181
clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel());
182+
// Add client-channel info to existing producer group
176183
if (null == clientChannelInfoFound) {
177184
channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo);
178185
clientChannelTable.put(clientChannelInfo.getClientId(), clientChannelInfo.getChannel());
179-
log.info("new producer connected, group: {} channel: {}", group,
180-
clientChannelInfo.toString());
186+
log.info("new producer connected, group: {} channel: {}", group, clientChannelInfo.toString());
181187
}
182188

183-
189+
// Refresh existing client-channel-info update-timestamp
184190
if (clientChannelInfoFound != null) {
185191
clientChannelInfoFound.setLastUpdateTimestamp(System.currentTimeMillis());
186192
}
187193
}
188194

189-
public synchronized void unregisterProducer(final String group, final ClientChannelInfo clientChannelInfo) {
190-
ConcurrentHashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
195+
public void unregisterProducer(final String group, final ClientChannelInfo clientChannelInfo) {
196+
ConcurrentMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
191197
if (null != channelTable && !channelTable.isEmpty()) {
192198
ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel());
193199
clientChannelTable.remove(clientChannelInfo.getClientId());
194200
if (old != null) {
195-
log.info("unregister a producer[{}] from groupChannelTable {}", group,
196-
clientChannelInfo.toString());
201+
log.info("unregister a producer[{}] from groupChannelTable {}", group, clientChannelInfo.toString());
197202
callProducerChangeListener(ProducerGroupEvent.CLIENT_UNREGISTER, group, clientChannelInfo);
198203
}
199204

@@ -210,7 +215,7 @@ public Channel getAvailableChannel(String groupId) {
210215
return null;
211216
}
212217
List<Channel> channelList;
213-
ConcurrentHashMap<Channel, ClientChannelInfo> channelClientChannelInfoHashMap = groupChannelTable.get(groupId);
218+
ConcurrentMap<Channel, ClientChannelInfo> channelClientChannelInfoHashMap = groupChannelTable.get(groupId);
214219
if (channelClientChannelInfoHashMap != null) {
215220
channelList = new ArrayList<>(channelClientChannelInfoHashMap.keySet());
216221
} else {

broker/src/main/java/org/apache/rocketmq/broker/client/net/Broker2Client.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.rocketmq.logging.org.slf4j.Logger;
3838
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
3939
import org.apache.rocketmq.remoting.common.RemotingHelper;
40+
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
4041
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
4142
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
4243
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
@@ -49,6 +50,7 @@
4950
import org.apache.rocketmq.remoting.protocol.header.GetConsumerStatusRequestHeader;
5051
import org.apache.rocketmq.remoting.protocol.header.NotifyConsumerIdsChangedRequestHeader;
5152
import org.apache.rocketmq.remoting.protocol.header.ResetOffsetRequestHeader;
53+
import org.apache.rocketmq.store.exception.ConsumeQueueException;
5254

5355
public class Broker2Client {
5456
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
@@ -100,13 +102,12 @@ public void notifyConsumerIdsChanged(
100102
}
101103
}
102104

103-
104-
public RemotingCommand resetOffset(String topic, String group, long timeStamp, boolean isForce) {
105+
public RemotingCommand resetOffset(String topic, String group, long timeStamp, boolean isForce) throws RemotingCommandException {
105106
return resetOffset(topic, group, timeStamp, isForce, false);
106107
}
107108

108109
public RemotingCommand resetOffset(String topic, String group, long timeStamp, boolean isForce,
109-
boolean isC) {
110+
boolean isC) throws RemotingCommandException {
110111
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
111112

112113
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
@@ -135,8 +136,11 @@ public RemotingCommand resetOffset(String topic, String group, long timeStamp, b
135136

136137
long timeStampOffset;
137138
if (timeStamp == -1) {
138-
139-
timeStampOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
139+
try {
140+
timeStampOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
141+
} catch (ConsumeQueueException e) {
142+
throw new RemotingCommandException("Failed to get max offset in queue", e);
143+
}
140144
} else {
141145
timeStampOffset = this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, i, timeStamp);
142146
}

broker/src/main/java/org/apache/rocketmq/broker/longpolling/LmqPullRequestHoldService.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.apache.rocketmq.logging.org.slf4j.Logger;
2323
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
2424

25-
2625
public class LmqPullRequestHoldService extends PullRequestHoldService {
2726
private static final Logger LOGGER = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
2827

@@ -48,8 +47,8 @@ public void checkHoldRequest() {
4847
}
4948
String topic = key.substring(0, idx);
5049
int queueId = Integer.parseInt(key.substring(idx + 1));
51-
final long offset = brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
5250
try {
51+
final long offset = brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
5352
this.notifyMessageArriving(topic, queueId, offset);
5453
} catch (Throwable e) {
5554
LOGGER.error("check hold request failed. topic={}, queueId={}", topic, queueId, e);

broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.rocketmq.logging.org.slf4j.Logger;
2929
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
3030
import org.apache.rocketmq.store.ConsumeQueueExt;
31+
import org.apache.rocketmq.store.exception.ConsumeQueueException;
3132

3233
public class PullRequestHoldService extends ServiceThread {
3334
private static final Logger log = LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
@@ -103,8 +104,8 @@ protected void checkHoldRequest() {
103104
if (2 == kArray.length) {
104105
String topic = kArray[0];
105106
int queueId = Integer.parseInt(kArray[1]);
106-
final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
107107
try {
108+
final long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
108109
this.notifyMessageArriving(topic, queueId, offset);
109110
} catch (Throwable e) {
110111
log.error(
@@ -131,7 +132,12 @@ public void notifyMessageArriving(final String topic, final int queueId, final l
131132
for (PullRequest request : requestList) {
132133
long newestOffset = maxOffset;
133134
if (newestOffset <= request.getPullFromThisOffset()) {
134-
newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
135+
try {
136+
newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
137+
} catch (ConsumeQueueException e) {
138+
log.error("Failed tp get max offset in queue", e);
139+
continue;
140+
}
135141
}
136142

137143
if (newestOffset > request.getPullFromThisOffset()) {

0 commit comments

Comments
 (0)