Skip to content

Commit f7f8854

Browse files
authored
Merge pull request apache#157 from j-j-cheng/snapshot-1.0.3
Snapshot 1.0.3
2 parents e36ac82 + f8b55a0 commit f7f8854

File tree

98 files changed

+4447
-765
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

98 files changed

+4447
-765
lines changed

pom.xml

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
<module>rocketmq-streams-channel-es</module>
5959
<module>rocketmq-streams-channel-kafka</module>
6060
<module>rocketmq-streams-channel-mqtt</module>
61+
<module>rocketmq-streams-cep</module>
6162
</modules>
6263

6364
<properties>
@@ -75,16 +76,16 @@
7576
<spring.version>5.1.14.RELEASE</spring.version>
7677
<auto-service.version>1.0-rc5</auto-service.version>
7778
<mysql-connector.version>5.1.40</mysql-connector.version>
78-
<fastjson.version>1.2.9</fastjson.version>
79+
<fastjson.version>1.2.25</fastjson.version>
7980
<quartz.version>2.2.1</quartz.version>
80-
<httpclient.version>4.5.2</httpclient.version>
81-
<commons-io.version>2.5</commons-io.version>
82-
<junit.version>4.12</junit.version>
81+
<httpclient.version>4.5.13</httpclient.version>
82+
<commons-io.version>2.7</commons-io.version>
83+
<junit.version>4.13.1</junit.version>
8384
<guava.version>25.1-jre</guava.version>
8485
<groovy.version>2.1.8</groovy.version>
8586
<disruptor.version>3.2.0</disruptor.version>
8687
<rocksdbjni.version>6.6.4</rocksdbjni.version>
87-
<rocketmq.version>4.9.2</rocketmq.version>
88+
<rocketmq.version>4.9.3</rocketmq.version>
8889
<hyperscan.version>5.4.0-2.0.0</hyperscan.version>
8990
<platform.version>3.5.2</platform.version>
9091
<gson.version>2.8.5</gson.version>
@@ -99,6 +100,8 @@
99100
<elasticsearch.version>7.4.0</elasticsearch.version>
100101
<kafka.version>1.1.0</kafka.version>
101102
<paho.version>1.2.2</paho.version>
103+
<kryo.version>5.3.0</kryo.version>
104+
<fst.version>2.56</fst.version>
102105
<slf4j-log4j12.version>1.7.36</slf4j-log4j12.version>
103106
</properties>
104107

@@ -227,6 +230,11 @@
227230
<artifactId>rocketmq-streams-state</artifactId>
228231
<version>${project.version}</version>
229232
</dependency>
233+
<dependency>
234+
<groupId>org.apache.rocketmq</groupId>
235+
<artifactId>rocketmq-streams-cep</artifactId>
236+
<version>${project.version}</version>
237+
</dependency>
230238
<dependency>
231239
<groupId>org.apache.rocketmq</groupId>
232240
<artifactId>rocketmq-streams-channel-syslog</artifactId>
@@ -569,6 +577,17 @@
569577
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
570578
<version>${paho.version}</version>
571579
</dependency>
580+
581+
<dependency>
582+
<groupId>de.ruedigermoeller</groupId>
583+
<artifactId>fst</artifactId>
584+
<version>${fst.version}</version>
585+
</dependency>
586+
<dependency>
587+
<groupId>com.esotericsoftware</groupId>
588+
<artifactId>kryo</artifactId>
589+
<version>${kryo.version}</version>
590+
</dependency>
572591
</dependencies>
573592
</dependencyManagement>
574593

rocketmq-streams-cep/pom.xml

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
<?xml version="1.0" encoding="utf-8"?>
2+
<!--
3+
Licensed to the Apache Software Foundation (ASF) under one or more
4+
contributor license agreements. See the NOTICE file distributed with
5+
this work for additional information regarding copyright ownership.
6+
The ASF licenses this file to You under the Apache License, Version 2.0
7+
(the "License"); you may not use this file except in compliance with
8+
the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
-->
18+
<project xmlns="http://maven.apache.org/POM/4.0.0"
19+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
20+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
21+
<modelVersion>4.0.0</modelVersion>
22+
<parent>
23+
<groupId>org.apache.rocketmq</groupId>
24+
<artifactId>rocketmq-streams</artifactId>
25+
<version>1.0.2-SNAPSHOT</version>
26+
</parent>
27+
<artifactId>rocketmq-streams-ce0</artifactId>
28+
<name>ROCKETMQ STREAMS :: cep</name>
29+
<packaging>jar</packaging>
30+
<dependencies>
31+
<dependency>
32+
<groupId>org.apache.rocketmq</groupId>
33+
<artifactId>rocketmq-streams-commons</artifactId>
34+
</dependency>
35+
<dependency>
36+
<groupId>org.apache.flink</groupId>
37+
<artifactId>flink-cep_${scala.binary.version}</artifactId>
38+
<version>${flink.version}</version>
39+
</dependency>
40+
<dependency>
41+
<groupId>org.apache.flink</groupId>
42+
<artifactId>flink-java</artifactId>
43+
<version>${flink.version}</version>
44+
</dependency>
45+
<dependency>
46+
<groupId>org.apache.flink</groupId>
47+
<artifactId>flink-core</artifactId>
48+
<version>${flink.version}</version>
49+
</dependency>
50+
<dependency>
51+
<groupId>org.apache.flink</groupId>
52+
<artifactId>flink-clients_${scala.binary.version}</artifactId>
53+
<version>${flink.version}</version>
54+
</dependency>
55+
56+
<dependency>
57+
<groupId>org.apache.flink</groupId>
58+
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
59+
<version>${flink.version}</version>
60+
</dependency>
61+
62+
63+
</dependencies>
64+
</project>
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
<?xml version="1.0" encoding="UTF-8" ?>
2+
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
3+
<!--
4+
Licensed to the Apache Software Foundation (ASF) under one or more
5+
contributor license agreements. See the NOTICE file distributed with
6+
this work for additional information regarding copyright ownership.
7+
The ASF licenses this file to You under the Apache License, Version 2.0
8+
(the "License"); you may not use this file except in compliance with
9+
the License. You may obtain a copy of the License at
10+
11+
http://www.apache.org/licenses/LICENSE-2.0
12+
13+
Unless required by applicable law or agreed to in writing, software
14+
distributed under the License is distributed on an "AS IS" BASIS,
15+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
See the License for the specific language governing permissions and
17+
limitations under the License.
18+
-->
19+
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
20+
21+
<appender name="Console" class="org.apache.log4j.ConsoleAppender">
22+
<layout class="org.apache.log4j.PatternLayout">
23+
<param name="ConversionPattern" value="%d{ISO8601} %l [%t] %-5p - %m%n%n"/>
24+
</layout>
25+
<filter class="org.apache.log4j.varia.LevelRangeFilter">
26+
<param name="LevelMin" value="INFO"/>
27+
<param name="LevelMax" value="ERROR"/>
28+
</filter>
29+
</appender>
30+
31+
<root>
32+
<priority value="INFO"/>
33+
<appender-ref ref="Console"/>
34+
</root>
35+
36+
</log4j:configuration>
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.rocketmq.streams.es.sink;
18+
19+
import com.alibaba.fastjson.JSONObject;
20+
import com.google.auto.service.AutoService;
21+
import java.util.Properties;
22+
import org.apache.rocketmq.streams.common.channel.builder.IChannelBuilder;
23+
import org.apache.rocketmq.streams.common.channel.builder.IShuffleChannelBuilder;
24+
import org.apache.rocketmq.streams.common.channel.sink.ISink;
25+
import org.apache.rocketmq.streams.common.channel.source.ISource;
26+
import org.apache.rocketmq.streams.common.metadata.MetaData;
27+
import org.apache.rocketmq.streams.common.model.ServiceName;
28+
import org.apache.rocketmq.streams.common.utils.ConfigurableUtil;
29+
30+
@AutoService(IChannelBuilder.class)
31+
@ServiceName(value = ESChannelBuilder.TYPE, aliasName = "ES")
32+
public class ESChannelBuilder implements IChannelBuilder {
33+
public static final String TYPE = "es";
34+
35+
protected JSONObject createFormatProperty(Properties properties) {
36+
JSONObject formatProperties = new JSONObject();
37+
for (Object object : properties.keySet()) {
38+
String key = (String) object;
39+
if ("type".equals(key)) {
40+
continue;
41+
}
42+
formatProperties.put(key, properties.get(key));
43+
}
44+
IChannelBuilder.formatPropertiesName(formatProperties, properties, "host", "endPoint");
45+
IChannelBuilder.formatPropertiesName(formatProperties, properties, "esIndex", "index");
46+
IChannelBuilder.formatPropertiesName(formatProperties, properties, "esIndexType", "typeName");;
47+
IChannelBuilder.formatPropertiesName(formatProperties, properties, "esMsgId", "es_msg_id");;
48+
return formatProperties;
49+
}
50+
51+
@Override public ISource createSource(String namespace, String name, Properties properties, MetaData metaData) {
52+
throw new RuntimeException("can not support source for ES");
53+
}
54+
55+
@Override
56+
public String getType() {
57+
return TYPE;
58+
}
59+
60+
@Override
61+
public ISink createSink(String namespace, String name, Properties properties, MetaData metaData) {
62+
return (ISink) ConfigurableUtil.create(ESSinkOnlyChannel.class.getName(), namespace, name, createFormatProperty(properties), null);
63+
}
64+
65+
}

rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java

Lines changed: 34 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,27 @@
2020
import java.nio.charset.StandardCharsets;
2121
import java.util.ArrayList;
2222
import java.util.Comparator;
23+
import java.util.Comparator;
2324
import java.util.HashMap;
2425
import java.util.HashSet;
2526
import java.util.List;
2627
import java.util.Map;
2728
import java.util.Set;
29+
import org.apache.commons.logging.Log;
30+
import org.apache.commons.logging.LogFactory;
2831
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
32+
import org.apache.rocketmq.client.exception.MQBrokerException;
33+
import org.apache.rocketmq.client.exception.MQClientException;
2934
import org.apache.rocketmq.client.producer.DefaultMQProducer;
35+
import org.apache.rocketmq.common.MixAll;
3036
import org.apache.rocketmq.common.TopicConfig;
3137
import org.apache.rocketmq.common.message.Message;
3238
import org.apache.rocketmq.common.message.MessageQueue;
39+
import org.apache.rocketmq.common.protocol.RequestCode;
40+
import org.apache.rocketmq.common.protocol.ResponseCode;
41+
import org.apache.rocketmq.common.protocol.header.CreateTopicRequestHeader;
42+
import org.apache.rocketmq.remoting.exception.RemotingException;
43+
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
3344
import org.apache.rocketmq.streams.common.channel.sink.AbstractSupportShuffleSink;
3445
import org.apache.rocketmq.streams.common.channel.split.ISplit;
3546
import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
@@ -38,12 +49,10 @@
3849
import org.apache.rocketmq.streams.queue.RocketMQMessageQueue;
3950
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
4051
import org.apache.rocketmq.tools.command.CommandUtil;
41-
import org.slf4j.Logger;
42-
import org.slf4j.LoggerFactory;
4352

4453
public class RocketMQSink extends AbstractSupportShuffleSink {
4554

46-
private static final Logger LOGGER = LoggerFactory.getLogger(RocketMQSink.class);
55+
private static final Log LOG = LogFactory.getLog(RocketMQSink.class);
4756
@ENVDependence
4857
private String tags = "*";
4958

@@ -90,8 +99,8 @@ protected boolean batchInsert(List<IMessage> messages) {
9099
return true;
91100
}
92101
if (StringUtil.isEmpty(topic)) {
93-
if (LOGGER.isErrorEnabled()) {
94-
LOGGER.error("topic is blank");
102+
if (LOG.isErrorEnabled()) {
103+
LOG.error("topic is blank");
95104
}
96105
return false;
97106
}
@@ -170,8 +179,8 @@ public void destroyProduce() {
170179
producer.shutdown();
171180
producer = null;
172181
} catch (Throwable t) {
173-
if (LOGGER.isWarnEnabled()) {
174-
LOGGER.warn(t.getMessage(), t);
182+
if (LOG.isWarnEnabled()) {
183+
LOG.warn(t.getMessage(), t);
175184
}
176185
}
177186
}
@@ -191,7 +200,7 @@ public String getShuffleTopicFieldName() {
191200
@Override
192201
protected void createTopicIfNotExist(int splitNum) {
193202
if (StringUtil.isEmpty(topic)) {
194-
LOGGER.error("Topic should be empty");
203+
LOG.error("Topic should be empty");
195204
throw new RuntimeException("Topic should be empty");
196205
}
197206
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt();
@@ -209,7 +218,7 @@ protected void createTopicIfNotExist(int splitNum) {
209218
CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
210219
for (String master : masterSet) {
211220
defaultMQAdminExt.createAndUpdateTopicConfig(master, topicConfig);
212-
LOGGER.info("Create topic to success: " + master);
221+
LOG.info("Create topic to success: " + master);
213222
}
214223

215224
if (this.order) {
@@ -224,10 +233,10 @@ protected void createTopicIfNotExist(int splitNum) {
224233
}
225234
defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(),
226235
orderConf.toString(), true);
227-
LOGGER.info("set cluster orderConf. isOrder={}, orderConf=[{}]", order, orderConf);
236+
System.out.printf("set cluster orderConf. isOrder=%s, orderConf=[%s]", order, orderConf);
228237
}
229238
} catch (Exception e) {
230-
LOGGER.error("Create topic error", e);
239+
LOG.error("Create topic error", e);
231240
throw new RuntimeException("Create topic error " + topic, e);
232241
} finally {
233242
defaultMQAdminExt.shutdown();
@@ -241,20 +250,22 @@ protected void createTopicIfNotExist(int splitNum) {
241250
List<MessageQueue> metaqQueueSet = new ArrayList<>();
242251
try {
243252

244-
try {
245-
metaqQueueSet = producer.fetchPublishMessageQueues(topic);
246-
} catch (Exception e) {
247-
producer.send(new Message(topic, "test", "test".getBytes(StandardCharsets.UTF_8)));
248-
metaqQueueSet = producer.fetchPublishMessageQueues(topic);
249-
}
250-
List<ISplit<?, ?>> queueList = new ArrayList<>();
251-
for (MessageQueue queue : metaqQueueSet) {
252-
RocketMQMessageQueue rocketMQMessageQueue = new RocketMQMessageQueue(queue);
253-
queueList.add(rocketMQMessageQueue);
253+
if (messageQueues == null || messageQueues.size() == 0) {
254+
try {
255+
metaqQueueSet = producer.fetchPublishMessageQueues(topic);
256+
}catch (Exception e) {
257+
producer.send(new Message(topic, "test", "test".getBytes(StandardCharsets.UTF_8)));
258+
metaqQueueSet = producer.fetchPublishMessageQueues(topic);
259+
}
260+
List<ISplit<?, ?>> queueList = new ArrayList<>();
261+
for (MessageQueue queue : metaqQueueSet) {
262+
RocketMQMessageQueue rocketMQMessageQueue = new RocketMQMessageQueue(queue);
263+
queueList.add(rocketMQMessageQueue);
254264

265+
}
266+
queueList.sort((Comparator<ISplit>) Comparable::compareTo);
267+
messageQueues = queueList;
255268
}
256-
queueList.sort((Comparator<ISplit>) Comparable::compareTo);
257-
messageQueues = queueList;
258269
} catch (Exception e) {
259270
throw new RuntimeException(e);
260271
}

rocketmq-streams-clients/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,10 @@
7373
<groupId>org.apache.rocketmq</groupId>
7474
<artifactId>rocketmq-streams-window</artifactId>
7575
</dependency>
76+
<dependency>
77+
<groupId>org.apache.rocketmq</groupId>
78+
<artifactId>rocketmq-streams-channel-kafka</artifactId>
79+
</dependency>
7680
<dependency>
7781
<groupId>org.apache.rocketmq</groupId>
7882
<artifactId>rocketmq-streams-connectors</artifactId>

0 commit comments

Comments
 (0)