Skip to content

Commit 79fd2e4

Browse files
authored
Merge pull request #64 from duhenglucky/main_sink
[ISSUE #65]Polish RocketMQ topic create process and sink settings
2 parents d8e62d1 + e825206 commit 79fd2e4

File tree

11 files changed

+172
-215
lines changed

11 files changed

+172
-215
lines changed

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@
118118
<exclude>*/*.iml</exclude>
119119
<exclude>**/*.txt</exclude>
120120
<exclude>**/*.cs</exclude>
121+
<exclude>src/test/resources/window_msg_*</exclude>
121122
</excludes>
122123
</configuration>
123124
</plugin>

rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java

Lines changed: 54 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -24,24 +24,21 @@
2424
import java.util.List;
2525
import java.util.Map;
2626
import java.util.Set;
27-
import java.util.concurrent.atomic.AtomicBoolean;
28-
import java.util.concurrent.atomic.AtomicInteger;
29-
3027
import org.apache.commons.logging.Log;
3128
import org.apache.commons.logging.LogFactory;
3229
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
3330
import org.apache.rocketmq.client.producer.DefaultMQProducer;
34-
import org.apache.rocketmq.common.MixAll;
31+
import org.apache.rocketmq.common.TopicConfig;
3532
import org.apache.rocketmq.common.message.Message;
3633
import org.apache.rocketmq.common.message.MessageQueue;
37-
import org.apache.rocketmq.common.protocol.body.TopicList;
3834
import org.apache.rocketmq.streams.common.channel.sink.AbstractSupportShuffleSink;
3935
import org.apache.rocketmq.streams.common.channel.split.ISplit;
4036
import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
4137
import org.apache.rocketmq.streams.common.context.IMessage;
4238
import org.apache.rocketmq.streams.common.utils.StringUtil;
4339
import org.apache.rocketmq.streams.queue.RocketMQMessageQueue;
4440
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
41+
import org.apache.rocketmq.tools.command.CommandUtil;
4542

4643
public class RocketMQSink extends AbstractSupportShuffleSink {
4744

@@ -51,6 +48,8 @@ public class RocketMQSink extends AbstractSupportShuffleSink {
5148

5249
private String topic;
5350
private String groupName;
51+
private String clusterName = "DefaultCluster";
52+
private boolean order = false;
5453

5554
private transient List<DefaultMQPushConsumer> consumers = new ArrayList<>();
5655
private transient DefaultMQProducer producer;
@@ -67,7 +66,11 @@ public RocketMQSink(String namesrvAddr, String topic, String groupName) {
6766
this.groupName = groupName;
6867
}
6968

70-
69+
public RocketMQSink(String namesrvAddr, String topic, String groupName, String clusterName, boolean order) {
70+
this.namesrvAddr = namesrvAddr;
71+
this.topic = topic;
72+
this.groupName = groupName;
73+
}
7174
@Override
7275
protected boolean initConfigurable() {
7376
super.initConfigurable();
@@ -132,7 +135,6 @@ protected boolean batchInsert(List<IMessage> messages) {
132135
return true;
133136
}
134137

135-
136138
protected void initProducer() {
137139
if (producer == null) {
138140
synchronized (this) {
@@ -169,7 +171,6 @@ public void destroyProduce() {
169171
}
170172
}
171173

172-
173174
@Override
174175
public void destroy() {
175176
super.destroy();
@@ -183,30 +184,50 @@ public String getShuffleTopicFieldName() {
183184

184185
@Override
185186
protected void createTopicIfNotExist(int splitNum) {
187+
if (StringUtil.isEmpty(topic)) {
188+
LOG.error("Topic should be empty");
189+
throw new RuntimeException("Topic should be empty");
190+
}
186191
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt();
187192
defaultMQAdminExt.setVipChannelEnabled(false);
188193
defaultMQAdminExt.setNamesrvAddr(this.getNamesrvAddr());
189194
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
195+
TopicConfig topicConfig = new TopicConfig();
196+
topicConfig.setReadQueueNums(splitNum);
197+
topicConfig.setWriteQueueNums(splitNum);
198+
topicConfig.setTopicName(topic.trim());
199+
190200
try {
191201
defaultMQAdminExt.start();
192-
TopicList topicList = defaultMQAdminExt.fetchAllTopicList();
193-
for (String topic : topicList.getTopicList()) {
194-
if (topic.equals(this.topic)) {
195-
return;
196-
}
202+
Set<String> masterSet =
203+
CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
204+
for (String master : masterSet) {
205+
defaultMQAdminExt.createAndUpdateTopicConfig(master, topicConfig);
206+
LOG.info("Create topic to success: " + master);
197207
}
198208

199-
200-
defaultMQAdminExt.createTopic(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC, topic, splitNum, 1);
209+
if (this.order) {
210+
Set<String> brokerNameSet =
211+
CommandUtil.fetchBrokerNameByClusterName(defaultMQAdminExt, clusterName);
212+
StringBuilder orderConf = new StringBuilder();
213+
String splitor = "";
214+
for (String s : brokerNameSet) {
215+
orderConf.append(splitor).append(s).append(":")
216+
.append(topicConfig.getWriteQueueNums());
217+
splitor = ";";
218+
}
219+
defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(),
220+
orderConf.toString(), true);
221+
System.out.printf("set cluster orderConf. isOrder=%s, orderConf=[%s]", order, orderConf);
222+
}
201223
} catch (Exception e) {
202-
e.printStackTrace();
203-
throw new RuntimeException("create topic error " + topic, e);
224+
LOG.error("Create topic error", e);
225+
throw new RuntimeException("Create topic error " + topic, e);
204226
} finally {
205227
defaultMQAdminExt.shutdown();
206228
}
207229
}
208230

209-
210231
@Override
211232
public List<ISplit> getSplitList() {
212233
initProducer();
@@ -245,7 +266,6 @@ public int getSplitNum() {
245266
return splitNames.size();
246267
}
247268

248-
249269
public String getTags() {
250270
return tags;
251271
}
@@ -302,4 +322,19 @@ public void setNamesrvAddr(String namesrvAddr) {
302322
this.namesrvAddr = namesrvAddr;
303323
}
304324

325+
public String getClusterName() {
326+
return clusterName;
327+
}
328+
329+
public void setClusterName(String clusterName) {
330+
this.clusterName = clusterName;
331+
}
332+
333+
public boolean isOrder() {
334+
return order;
335+
}
336+
337+
public void setOrder(boolean order) {
338+
this.order = order;
339+
}
305340
}

rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/DataStream.java

Lines changed: 33 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@
1919

2020
import com.alibaba.fastjson.JSONObject;
2121
import com.google.common.collect.Sets;
22-
23-
import java.nio.charset.StandardCharsets;
22+
import java.io.Serializable;
2423
import java.util.ArrayList;
2524
import java.util.List;
25+
import java.util.Set;
26+
import org.apache.commons.lang3.StringUtils;
2627
import org.apache.rocketmq.streams.client.DataStreamAction;
2728
import org.apache.rocketmq.streams.client.transform.window.WindowInfo;
2829
import org.apache.rocketmq.streams.common.channel.impl.OutputPrintChannel;
@@ -35,7 +36,12 @@
3536
import org.apache.rocketmq.streams.common.context.Message;
3637
import org.apache.rocketmq.streams.common.context.MessageHeader;
3738
import org.apache.rocketmq.streams.common.context.UserDefinedMessage;
38-
import org.apache.rocketmq.streams.common.functions.*;
39+
import org.apache.rocketmq.streams.common.functions.FilterFunction;
40+
import org.apache.rocketmq.streams.common.functions.FlatMapFunction;
41+
import org.apache.rocketmq.streams.common.functions.ForEachFunction;
42+
import org.apache.rocketmq.streams.common.functions.ForEachMessageFunction;
43+
import org.apache.rocketmq.streams.common.functions.MapFunction;
44+
import org.apache.rocketmq.streams.common.functions.SplitFunction;
3945
import org.apache.rocketmq.streams.common.topology.ChainPipeline;
4046
import org.apache.rocketmq.streams.common.topology.ChainStage;
4147
import org.apache.rocketmq.streams.common.topology.builder.IStageBuilder;
@@ -54,9 +60,6 @@
5460
import org.apache.rocketmq.streams.window.operator.AbstractWindow;
5561
import org.apache.rocketmq.streams.window.operator.join.JoinWindow;
5662

57-
import java.io.Serializable;
58-
import java.util.Set;
59-
6063
public class DataStream implements Serializable {
6164

6265
protected PipelineBuilder mainPipelineBuilder;
@@ -450,28 +453,38 @@ public DataStreamAction toDB(String url, String userName, String password, Strin
450453
return new DataStreamAction(this.mainPipelineBuilder, this.otherPipelineBuilders, output);
451454
}
452455

453-
public DataStreamAction toRocketmq(String topic) {
454-
return toRocketmq(topic, "*", null, -1, null);
456+
public DataStreamAction toRocketmq(String topic, String groupName, String nameServerAddress) {
457+
return toRocketmq(topic, "*", groupName, -1, nameServerAddress, null, false);
455458
}
456459

457-
public DataStreamAction toRocketmq(String topic, String namesrvAddr) {
458-
return toRocketmq(topic, "*", null, -1, namesrvAddr);
460+
public DataStreamAction toRocketmq(String topic, String tags, String groupName, String nameServerAddress,
461+
String clusterName,
462+
boolean order) {
463+
return toRocketmq(topic, tags, groupName, -1, nameServerAddress, clusterName, order);
459464
}
460465

461-
public DataStreamAction toRocketmq(String topic, String tags,
462-
String namesrvAddr) {
463-
return toRocketmq(topic, tags, null, -1, namesrvAddr);
464-
}
465-
466-
public DataStreamAction toRocketmq(String topic, String tags, String groupName, int batchSize, String namesrvAddr) {
466+
public DataStreamAction toRocketmq(String topic, String tags, String groupName, int batchSize, String nameServerAddress,
467+
String clusterName, boolean order) {
467468
RocketMQSink rocketMQSink = new RocketMQSink();
468-
rocketMQSink.setTopic(topic);
469-
rocketMQSink.setTags(tags);
470-
rocketMQSink.setGroupName(groupName);
471-
rocketMQSink.setNamesrvAddr(namesrvAddr);
469+
if (StringUtils.isNotBlank(topic)) {
470+
rocketMQSink.setTopic(topic);
471+
}
472+
if (StringUtils.isNotBlank(tags)) {
473+
rocketMQSink.setTags(tags);
474+
}
475+
if (StringUtils.isNotBlank(groupName)) {
476+
rocketMQSink.setGroupName(groupName);
477+
}
478+
if (StringUtils.isNotBlank(nameServerAddress)) {
479+
rocketMQSink.setNamesrvAddr(nameServerAddress);
480+
}
481+
if (StringUtils.isNotBlank(clusterName)) {
482+
rocketMQSink.setClusterName(clusterName);
483+
}
472484
if (batchSize > 0) {
473485
rocketMQSink.setBatchSize(batchSize);
474486
}
487+
rocketMQSink.setOrder(order);
475488
ChainStage<?> output = this.mainPipelineBuilder.createStage(rocketMQSink);
476489
this.mainPipelineBuilder.setTopologyStages(currentChainStage, output);
477490
return new DataStreamAction(this.mainPipelineBuilder, this.otherPipelineBuilders, output);

0 commit comments

Comments
 (0)