Skip to content

Commit f499b19

Browse files
authored
Merge pull request #254 from ni-ze/develop
[ISSUE #255] keep state topic consistent with source topic
2 parents 5607482 + 89272f9 commit f499b19

File tree

4 files changed

+84
-19
lines changed

4 files changed

+84
-19
lines changed

core/src/main/java/org/apache/rocketmq/streams/core/common/Constant.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,4 +50,6 @@ public class Constant {
5050

5151
public static final String WORKER_THREAD_NAME = "worker_thread";
5252

53+
public static final String STATIC_TOPIC_BROKER_NAME = "__syslo__global__";
54+
5355
}

core/src/main/java/org/apache/rocketmq/streams/core/running/WorkerThread.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,13 +56,17 @@ public class WorkerThread extends Thread {
5656
private final TopologyBuilder topologyBuilder;
5757
private final PlanetaryEngine<?, ?> planetaryEngine;
5858
private final Properties properties;
59+
private final String jobId;
5960

6061

6162
public WorkerThread(String threadName, TopologyBuilder topologyBuilder, Properties properties) throws MQClientException {
6263
super(threadName);
64+
6365
this.topologyBuilder = topologyBuilder;
6466
this.properties = properties;
65-
String groupName = topologyBuilder.getJobId() + "_" + ROCKETMQ_STREAMS_CONSUMER_GROUP;
67+
jobId = topologyBuilder.getJobId();
68+
69+
String groupName = String.join("_", jobId, ROCKETMQ_STREAMS_CONSUMER_GROUP);
6670

6771
RocketMQClient rocketMQClient = new RocketMQClient(properties.getProperty(MixAll.NAMESRV_ADDR_PROPERTY));
6872

@@ -88,8 +92,9 @@ public WorkerThread(String threadName, TopologyBuilder topologyBuilder, Properti
8892
public void run() {
8993
try {
9094
this.planetaryEngine.start();
91-
9295
this.planetaryEngine.runInLoop();
96+
logger.info("worker thread=[{}], start task success, jobId:{}", this.getName(), jobId);
97+
9398
} catch (Throwable e) {
9499
logger.error("worker thread=[{}], error:{}.", this.getName(), e);
95100
throw new RStreamsException(e);
@@ -101,6 +106,7 @@ public void run() {
101106

102107
public void shutdown() {
103108
this.planetaryEngine.stop();
109+
logger.info("worker thread=[{}], shutdown task success, jobId:{}", this.getName(), jobId);
104110
}
105111

106112

core/src/main/java/org/apache/rocketmq/streams/core/state/RocketMQStore.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,8 @@ public void persist(Set<MessageQueue> messageQueues) throws Throwable {
185185
}
186186

187187
String stateTopic = stateTopicQueue.getTopic();
188-
createStateTopic(stateTopic);
188+
boolean isStaticTopic = stateTopicQueue.getBrokerName().equals(Constant.STATIC_TOPIC_BROKER_NAME);
189+
createStateTopic(stateTopic, isStaticTopic);
189190

190191
for (byte[] key : keySet) {
191192

@@ -224,7 +225,7 @@ public void loadState(Set<MessageQueue> addQueues) throws Throwable {
224225

225226
Set<MessageQueue> stateTopicQueue = convertSourceTopicQueue2StateTopicQueue(addQueues);
226227
for (MessageQueue messageQueue : stateTopicQueue) {
227-
createStateTopic(messageQueue.getTopic());
228+
createStateTopic(messageQueue.getTopic(), messageQueue.getBrokerName().equals(Constant.STATIC_TOPIC_BROKER_NAME));
228229
}
229230

230231
consumer.assign(stateTopicQueue);
@@ -389,15 +390,19 @@ private List<MessageExt> sortByQueueOffset(List<MessageExt> target) {
389390
return target;
390391
}
391392

392-
private void createStateTopic(String stateTopic) throws Exception {
393+
private void createStateTopic(String stateTopic, boolean sourceTopicIsStaticTopic) throws Exception {
393394
if (RocketMQUtil.checkWhetherExist(stateTopic)) {
394395
return;
395396
}
396397

397398
String sourceTopic = stateTopic2SourceTopic(stateTopic);
398399
Pair<Integer, Set<String>> clustersPair = getTotalQueueNumAndClusters(sourceTopic);
399400

400-
RocketMQUtil.createNormalTopic(mqAdmin, stateTopic, clustersPair.getKey(), clustersPair.getValue());
401+
if (sourceTopicIsStaticTopic) {
402+
RocketMQUtil.createStaticCompactTopic(mqAdmin, stateTopic, clustersPair.getKey(), clustersPair.getValue());
403+
} else {
404+
RocketMQUtil.createNormalTopic(mqAdmin, sourceTopic, stateTopic);
405+
}
401406
}
402407

403408
private Pair<Integer, Set<String>> getTotalQueueNumAndClusters(String sourceTopic) throws Exception {

core/src/main/java/org/apache/rocketmq/streams/core/util/RocketMQUtil.java

Lines changed: 65 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@
2525
import org.apache.rocketmq.common.constant.PermName;
2626
import org.apache.rocketmq.common.protocol.ResponseCode;
2727
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
28+
import org.apache.rocketmq.common.protocol.route.BrokerData;
29+
import org.apache.rocketmq.common.protocol.route.QueueData;
30+
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
2831
import org.apache.rocketmq.remoting.exception.RemotingException;
2932
import org.apache.rocketmq.srvutil.ServerUtil;
3033
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
@@ -35,6 +38,7 @@
3538
import org.slf4j.LoggerFactory;
3639

3740
import java.util.ArrayList;
41+
import java.util.HashMap;
3842
import java.util.HashSet;
3943
import java.util.List;
4044
import java.util.Set;
@@ -45,7 +49,7 @@ public class RocketMQUtil {
4549
private static final List<String> existTopic = new ArrayList<>();
4650

4751
//neither static topic nor compact topic. expansion with source topic.
48-
public static void createNormalTopic(DefaultMQAdminExt mqAdmin, String topicName, int queueNum, Set<String> clusters) throws Exception {
52+
public static void createNormalTopic(DefaultMQAdminExt mqAdmin, String topicName, int totalQueueNum, Set<String> clusters) throws Exception {
4953
if (check(mqAdmin, topicName)) {
5054
logger.info("topic[{}] already exist.", topicName);
5155
return;
@@ -55,20 +59,68 @@ public static void createNormalTopic(DefaultMQAdminExt mqAdmin, String topicName
5559
clusters = getCluster(mqAdmin);
5660
}
5761

58-
TopicConfig topicConfig = new TopicConfig(topicName, queueNum, queueNum, PermName.PERM_READ | PermName.PERM_WRITE);
5962

6063
for (String cluster : clusters) {
6164
Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(mqAdmin, cluster);
6265

63-
for (String addr : masterSet) {
64-
mqAdmin.createAndUpdateTopicConfig(addr, topicConfig);
65-
logger.info("create topic to broker:{} cluster:{}, success.", addr, cluster);
66+
int remainder = totalQueueNum % masterSet.size();
67+
if (remainder != 0) {
68+
String temp = String.format("can not create topic:%s, total num=%s, master num=%s", topicName, totalQueueNum, masterSet.size());
69+
logger.warn(temp);
6670
}
71+
72+
int queueNumInEachBroker = totalQueueNum / masterSet.size();
73+
TopicConfig topicConfig = new TopicConfig(topicName, queueNumInEachBroker, queueNumInEachBroker, PermName.PERM_READ | PermName.PERM_WRITE);
74+
75+
if (remainder == 0) {
76+
for (String addr : masterSet) {
77+
mqAdmin.createAndUpdateTopicConfig(addr, topicConfig);
78+
logger.info("create topic to broker:{} cluster:{}, success.", addr, cluster);
79+
}
80+
} else {
81+
String[] masterArray = masterSet.toArray(new String[]{});
82+
83+
topicConfig = new TopicConfig(topicName, queueNumInEachBroker + remainder,
84+
queueNumInEachBroker + remainder, PermName.PERM_READ | PermName.PERM_WRITE);
85+
mqAdmin.createAndUpdateTopicConfig(masterArray[0], topicConfig);
86+
87+
for (int i = 1; i < masterArray.length; i++) {
88+
topicConfig = new TopicConfig(topicName, queueNumInEachBroker, queueNumInEachBroker, PermName.PERM_READ | PermName.PERM_WRITE);
89+
mqAdmin.createAndUpdateTopicConfig(masterArray[0], topicConfig);
90+
}
91+
}
92+
93+
}
94+
}
95+
96+
public static void createNormalTopic(DefaultMQAdminExt mqAdmin, String sourceTopic, String stateTopic) throws Exception {
97+
//找到brokerAddr
98+
TopicRouteData topicRouteData = mqAdmin.examineTopicRouteInfo(sourceTopic);
99+
List<QueueData> queueData = topicRouteData.getQueueDatas();
100+
List<BrokerData> brokerData = topicRouteData.getBrokerDatas();
101+
102+
103+
HashMap<String, String> brokerName2MaterBrokerAddr = new HashMap<>();
104+
for (BrokerData broker : brokerData) {
105+
String masterBrokerAddr = broker.getBrokerAddrs().get(0L);
106+
brokerName2MaterBrokerAddr.put(broker.getBrokerName(), masterBrokerAddr);
67107
}
108+
109+
for (QueueData queue : queueData) {
110+
int readQueueNums = queue.getReadQueueNums();
111+
int writeQueueNums = queue.getWriteQueueNums();
112+
String brokerName = queue.getBrokerName();
113+
114+
TopicConfig topicConfig = new TopicConfig(stateTopic, readQueueNums, writeQueueNums);
115+
116+
mqAdmin.createAndUpdateTopicConfig(brokerName2MaterBrokerAddr.get(brokerName), topicConfig);
117+
}
118+
119+
existTopic.add(stateTopic);
68120
}
69121

70122
//used in RSQLDB,maybe.
71-
public static void createStaticCompactTopic(DefaultMQAdminExt mqAdmin, String topicName, int queueNum, Set<String> clusters) throws Exception {
123+
public static void createStaticCompactTopic(DefaultMQAdminExt mqAdmin, String topicName, int totalQueueNum, Set<String> clusters) throws Exception {
72124
if (check(mqAdmin, topicName)) {
73125
logger.info("topic[{}] already exist.", topicName);
74126
return;
@@ -80,15 +132,15 @@ public static void createStaticCompactTopic(DefaultMQAdminExt mqAdmin, String to
80132

81133

82134
for (String cluster : clusters) {
83-
createStaticTopicWithCommand(topicName, queueNum, new HashSet<>(), cluster, mqAdmin.getNamesrvAddr());
84-
logger.info("【step 1】create static topic:[{}] in cluster:[{}] success, logic queue num:[{}].", topicName, cluster, queueNum);
135+
createStaticTopicWithCommand(topicName, totalQueueNum, new HashSet<>(), cluster, mqAdmin.getNamesrvAddr());
136+
logger.info("【step 1】create static topic:[{}] in cluster:[{}] success, logic queue num:[{}].", topicName, cluster, totalQueueNum);
85137

86-
update2CompactTopicWithCommand(topicName, queueNum, cluster, mqAdmin.getNamesrvAddr());
138+
update2CompactTopicWithCommand(topicName, totalQueueNum, cluster, mqAdmin.getNamesrvAddr());
87139
logger.info("【step 2】update static topic to compact topic success. topic:[{}], cluster:[{}]", topicName, cluster);
88140
}
89141

90142
existTopic.add(topicName);
91-
logger.info("create static-compact topic [{}] success, queue num [{}]", topicName, queueNum);
143+
logger.info("create static-compact topic [{}] success, queue num [{}]", topicName, totalQueueNum);
92144
}
93145

94146
public static void createStaticTopic(DefaultMQAdminExt mqAdmin, String topicName, int queueNum) throws Exception {
@@ -106,23 +158,23 @@ public static void createStaticTopic(DefaultMQAdminExt mqAdmin, String topicName
106158
existTopic.add(topicName);
107159
}
108160

109-
private static void createStaticTopicWithCommand(String topic, int queueNum, Set<String> brokers, String cluster, String nameservers) throws Exception {
161+
private static void createStaticTopicWithCommand(String topic, int totalQueueNum, Set<String> brokers, String cluster, String nameservers) throws Exception {
110162
UpdateStaticTopicSubCommand cmd = new UpdateStaticTopicSubCommand();
111163
Options options = ServerUtil.buildCommandlineOptions(new Options());
112164
String[] args;
113165
if (cluster != null) {
114166
args = new String[]{
115167
"-c", cluster,
116168
"-t", topic,
117-
"-qn", String.valueOf(queueNum),
169+
"-qn", String.valueOf(totalQueueNum),
118170
"-n", nameservers
119171
};
120172
} else {
121173
String brokerStr = String.join(",", brokers);
122174
args = new String[]{
123175
"-b", brokerStr,
124176
"-t", topic,
125-
"-qn", String.valueOf(queueNum),
177+
"-qn", String.valueOf(totalQueueNum),
126178
"-n", nameservers
127179
};
128180
}

0 commit comments

Comments
 (0)