Skip to content

Commit b1a0541

Browse files
authored
Merge pull request apache#35 from francisoliverlee/main
fix RocketMQ Source has no namesrv set error
2 parents ed62017 + 05c7baf commit b1a0541

File tree

13 files changed

+234
-52
lines changed

13 files changed

+234
-52
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ target/
22
.DS_Store
33
!.mvn/wrapper/maven-wrapper.jar
44
*.versionsBackup
5+
.gradle/
56

67
### STS ###
78
.apt_generated

build_without_test.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
mvn clean package -Dmaven.test.skip=true

pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
<module>rocketmq-streams-channel-rocketmq</module>
5151
<module>rocketmq-streams-channel-db</module>
5252
<module>rocketmq-streams-channel-http</module>
53+
<module>rocketmq-streams-examples</module>
5354
</modules>
5455

5556
<properties>
@@ -173,6 +174,11 @@
173174
<artifactId>rocketmq-streams-commons</artifactId>
174175
<version>${project.version}</version>
175176
</dependency>
177+
<dependency>
178+
<groupId>org.apache.rocketmq</groupId>
179+
<artifactId>rocketmq-streams-clients</artifactId>
180+
<version>${project.version}</version>
181+
</dependency>
176182
<dependency>
177183
<groupId>org.apache.rocketmq</groupId>
178184
<artifactId>rocketmq-streams-configurable</artifactId>

rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,6 @@ protected DefaultMQPushConsumer startConsumer() {
150150
});
151151

152152
setOffsetStore(consumer);
153-
addRebalanceCallback(consumer);
154153
consumer.start();
155154

156155
return consumer;

rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/source/DataStreamSource.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
package org.apache.rocketmq.streams.client.source;
1919

2020
import com.google.common.collect.Sets;
21+
2122
import java.util.Set;
23+
2224
import org.apache.rocketmq.streams.client.transform.DataStream;
2325
import org.apache.rocketmq.streams.common.channel.impl.file.FileSource;
2426
import org.apache.rocketmq.streams.common.channel.source.ISource;
@@ -49,20 +51,21 @@ public DataStream fromFile(String filePath, Boolean isJsonData) {
4951
return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, null);
5052
}
5153

52-
public DataStream fromRocketmq(String topic, String groupName) {
53-
return fromRocketmq(topic, groupName, null, false);
54+
public DataStream fromRocketmq(String topic, String groupName, String namesrvAddress) {
55+
return fromRocketmq(topic, groupName, false, namesrvAddress);
5456
}
5557

56-
public DataStream fromRocketmq(String topic, String groupName, boolean isJson) {
57-
return fromRocketmq(topic, groupName, null, isJson);
58+
public DataStream fromRocketmq(String topic, String groupName, boolean isJson, String namesrvAddress) {
59+
return fromRocketmq(topic, groupName, "*", isJson, namesrvAddress);
5860
}
5961

60-
public DataStream fromRocketmq(String topic, String groupName, String tags, boolean isJson) {
62+
public DataStream fromRocketmq(String topic, String groupName, String tags, boolean isJson, String namesrvAddress) {
6163
RocketMQSource rocketMQSource = new RocketMQSource();
6264
rocketMQSource.setTopic(topic);
6365
rocketMQSource.setTags(tags);
6466
rocketMQSource.setGroupName(groupName);
6567
rocketMQSource.setJsonData(isJson);
68+
rocketMQSource.setNamesrvAddr(namesrvAddress);
6669
this.mainPipelineBuilder.setSource(rocketMQSource);
6770
return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, null);
6871
}

rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DataStreamTest.java

Lines changed: 40 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,8 @@
1818
package org.apache.rocketmq.streams.client;
1919

2020
import com.alibaba.fastjson.JSONObject;
21-
import netscape.javascript.JSObject;
2221
import org.apache.rocketmq.streams.client.source.DataStreamSource;
2322
import org.apache.rocketmq.streams.client.strategy.CheckpointStrategy;
24-
import org.apache.rocketmq.streams.client.strategy.StateStrategy;
25-
import org.apache.rocketmq.streams.client.strategy.Strategy;
2623
import org.apache.rocketmq.streams.client.transform.window.Time;
2724
import org.apache.rocketmq.streams.client.transform.window.TumblingWindow;
2825
import org.apache.rocketmq.streams.common.functions.MapFunction;
@@ -45,76 +42,74 @@ public void init() {
4542
@Test
4643
public void testFromFile() {
4744
dataStream
48-
.fromFile("/Users/junjie.cheng/text.txt", false)
49-
.map(message -> message + "--")
50-
.toPrint(1)
51-
.start();
45+
.fromFile("/Users/junjie.cheng/text.txt", false)
46+
.map(message -> message + "--")
47+
.toPrint(1)
48+
.start();
5249
}
5350

5451
@Test
5552
public void testRocketmq() {
56-
57-
5853
DataStreamSource dataStream = StreamBuilder.dataStream("test_namespace", "graph_pipeline");
5954
dataStream
60-
.fromRocketmq("TOPIC_EVENT_SAS_SECURITY_EVENT", "111")
61-
.map(message -> message + "--")
62-
.toPrint(1)
63-
.start();
55+
.fromRocketmq("topic_xxxx01", "consumer_xxxx01", "127.0.0.1:9876")
56+
.map(message -> message + "--")
57+
.toPrint(1)
58+
.start();
6459
}
6560

6661
@Test
6762
public void testDBCheckPoint() {
6863
dataStream
69-
.fromRocketmq("TSG_META_INFO", "")
70-
.map(message -> message + "--")
71-
.toPrint(1)
72-
.with(CheckpointStrategy.db("", "", "", 0L))
73-
.start();
64+
.fromRocketmq("topic_xxxx02", "consumer_xxxx02", "127.0.0.1:9876")
65+
.map(message -> message + "--")
66+
.toPrint(1)
67+
.with(CheckpointStrategy.db("", "", "", 0L))
68+
.start();
7469
}
7570

7671
@Test
7772
public void testFileCheckPoint() {
7873
dataStream
79-
.fromRocketmq("TSG_META_INFO", "")
80-
.map(message -> message + "--")
81-
.toPrint(1)
82-
.with(CheckpointStrategy.mem(0L))
83-
.start();
74+
.fromFile("/Users/junjie.cheng/text.txt", false)
75+
.map(message -> message + "--")
76+
.toPrint(1)
77+
.with(CheckpointStrategy.mem(0L))
78+
.start();
8479
}
8580

8681

8782
@Test
8883
public void testWindow() {
8984
DataStreamSource dataStream = StreamBuilder.dataStream("test_namespace", "graph_pipeline");
9085
dataStream
91-
.fromRocketmq("TSG_META_INFO", "")
92-
.map(new MapFunction<JSONObject, String>() {
93-
94-
@Override
95-
public JSONObject map(String message) throws Exception {
96-
JSONObject msg=JSONObject.parseObject(message);
97-
return msg;
98-
}
99-
})
100-
.window(TumblingWindow.of(Time.seconds(5)))
101-
.groupBy("name","age")
102-
.count("c")
103-
.sum("score","scoreValue")
104-
.toDataSteam()
105-
.toPrint(1)
106-
.with(CheckpointStrategy.db("","","",1000L))
107-
.start();
86+
.fromRocketmq("topic_xxxx03", "consumer_xxxx03", "127.0.0.1:9876")
87+
.map(new MapFunction<JSONObject, String>() {
88+
89+
@Override
90+
public JSONObject map(String message) throws Exception {
91+
JSONObject msg = JSONObject.parseObject(message);
92+
return msg;
93+
}
94+
})
95+
.window(TumblingWindow.of(Time.seconds(5)))
96+
.groupBy("name", "age")
97+
.count("c")
98+
.sum("score", "scoreValue")
99+
.toDataSteam()
100+
.toPrint(1)
101+
.with(CheckpointStrategy.db("", "", "", 1000L))
102+
.start();
108103
}
109104

110105
@Test
111106
public void testBothStrategy() {
112107
dataStream
113-
.fromRocketmq("TSG_META_INFO", "")
114-
.map(message -> message + "--")
115-
.toPrint(1)
116-
.with()
117-
.start();
108+
.fromRocketmq("topic_xxxx04", "consumer_xxxx04", "127.0.0.1:9876")
109+
.map(message -> message + "--")
110+
.toPrint(1)
111+
.with()
112+
.start();
118113
}
119114

120115
@Test

rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/WindowFromMetaq.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public void testWindowToMetaq() throws InterruptedException {
5656

5757
protected DataStream createSourceDataStream(){
5858
return StreamBuilder.dataStream("namespace", "name1")
59-
.fromRocketmq(topic,"chris1",true);
59+
.fromRocketmq(topic,"chris1","");
6060
}
6161

6262

rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/AbstractChannel.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,16 @@ protected void setJsonObject(JSONObject jsonObject) {
9393
jsonObject.put("source", Base64Utils.encode(InstantiationUtil.serializeObject(source)));
9494
}
9595

96+
@Override
97+
public void removeSplit(Set<String> splitIds) {
98+
source.removeSplit(splitIds);
99+
}
100+
101+
@Override
102+
public void addNewSplit(Set<String> splitIds) {
103+
source.addNewSplit(splitIds);
104+
}
105+
96106
@Override
97107
public Map<String, MessageOffset> getFinishedQueueIdAndOffsets(CheckPointMessage checkPointMessage) {
98108
return sink.getFinishedQueueIdAndOffsets(checkPointMessage);

rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/ISource.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,4 +74,8 @@ public interface ISource<T extends ISource> extends IConfigurable, IStageBuilder
7474
*/
7575
long getCheckpointTime();
7676

77+
void removeSplit(Set<String> splitIds);
78+
79+
void addNewSplit(Set<String> splitIds);
80+
7781
}

rocketmq-streams-examples/pom.xml

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
Licensed to the Apache Software Foundation (ASF) under one or more
4+
contributor license agreements. See the NOTICE file distributed with
5+
this work for additional information regarding copyright ownership.
6+
The ASF licenses this file to You under the Apache License, Version 2.0
7+
(the "License"); you may not use this file except in compliance with
8+
the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
-->
18+
<project xmlns="http://maven.apache.org/POM/4.0.0"
19+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
20+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
21+
<parent>
22+
<artifactId>rocketmq-streams</artifactId>
23+
<groupId>org.apache.rocketmq</groupId>
24+
<version>1.0.0-SNAPSHOT</version>
25+
</parent>
26+
<modelVersion>4.0.0</modelVersion>
27+
28+
<artifactId>rocketmq-streams-examples</artifactId>
29+
<name>ROCKETMQ STREAMS :: examples</name>
30+
<dependencies>
31+
<dependency>
32+
<groupId>org.apache.rocketmq</groupId>
33+
<artifactId>rocketmq-streams-clients</artifactId>
34+
<exclusions>
35+
<exclusion>
36+
<groupId>ch.qos.logback</groupId>
37+
<artifactId>logback-classic</artifactId>
38+
</exclusion>
39+
</exclusions>
40+
</dependency>
41+
</dependencies>
42+
<packaging>jar</packaging>
43+
44+
<properties>
45+
<file_encoding>UTF-8</file_encoding>
46+
<project.build.sourceEncoding>${file_encoding}</project.build.sourceEncoding>
47+
<maven.compiler.source>8</maven.compiler.source>
48+
<maven.compiler.target>8</maven.compiler.target>
49+
</properties>
50+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.rocketmq.streams.examples.filesource;
18+
19+
import org.apache.rocketmq.streams.client.StreamBuilder;
20+
import org.apache.rocketmq.streams.client.source.DataStreamSource;
21+
22+
public class FileSourceExample {
23+
public static void main(String[] args) {
24+
DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline");
25+
source.fromFile("/your/file/path", false)
26+
.map(message -> message)
27+
.toPrint(1)
28+
.start();
29+
}
30+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.rocketmq.streams.examples.rocketmqsource;
18+
19+
import org.apache.rocketmq.streams.client.StreamBuilder;
20+
import org.apache.rocketmq.streams.client.source.DataStreamSource;
21+
22+
public class RocketMQSourceExample1 {
23+
public static void main(String[] args) {
24+
DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline");
25+
26+
source.fromRocketmq(
27+
RocketMQSourceExample2.RMQ_TOPIC,
28+
RocketMQSourceExample2.RMQ_CONSUMER_GROUP_NAME,
29+
RocketMQSourceExample2.NAMESRV_ADDRESS
30+
)
31+
.map(message -> message)
32+
.toPrint(1)
33+
.start();
34+
35+
}
36+
}

0 commit comments

Comments
 (0)