Skip to content

Commit 32516c6

Browse files
authored
add multi rocketmq source example (apache#106)
* add multi rocketmq source example * code style
1 parent 0c9a8ce commit 32516c6

File tree

4 files changed

+123
-7
lines changed

4 files changed

+123
-7
lines changed

rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/mutilconsumer/MutilStreamsClientTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public static void main(String[] args) {
4444
producerPool.submit(new Runnable() {
4545
@Override
4646
public void run() {
47-
Producer.produceInLoop("data.txt");
47+
Producer.produceInLoop(RMQ_TOPIC, "data.txt");
4848
}
4949
});
5050

rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/mutilconsumer/Producer.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,7 @@
2626
import org.apache.rocketmq.remoting.common.RemotingHelper;
2727
import org.apache.rocketmq.streams.examples.rocketmqsource.ProducerFromFile;
2828

29-
import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.NAMESRV_ADDRESS;
30-
import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_TOPIC;
29+
import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.*;
3130

3231
public class Producer {
3332
private static final AtomicInteger count = new AtomicInteger(0);
@@ -36,7 +35,7 @@ public class Producer {
3635
* total produce 1000 data.
3736
* @param fileName
3837
*/
39-
public static void produceInLoop(String fileName) {
38+
public static void produceInLoop(String topic, String fileName) {
4039
DefaultMQProducer producer = new DefaultMQProducer("test-group");
4140

4241
try {
@@ -51,11 +50,10 @@ public static void produceInLoop(String fileName) {
5150
}
5251

5352
for (String str : result) {
54-
Message msg = new Message(RMQ_TOPIC, "", str.getBytes(RemotingHelper.DEFAULT_CHARSET));
53+
Message msg = new Message(topic, "", str.getBytes(RemotingHelper.DEFAULT_CHARSET));
5554
producer.send(msg);
5655
count.getAndIncrement();
5756
}
58-
5957
Thread.sleep(100);
6058
}
6159

rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/rocketmqsource/Constant.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
public class Constant {
2323
public static final String NAMESRV_ADDRESS = "127.0.0.1:9876";
2424
public static final String RMQ_TOPIC = "NormalTestTopic";
25-
public static final String RMQ_CONSUMER_GROUP_NAME = "test-group-02";
25+
public static final String RMQ_TOPIC_OTHER = "NormalTestTopic1";
26+
public static final String RMQ_CONSUMER_GROUP_NAME = "test-group-01";
27+
public static final String RMQ_CONSUMER_GROUP_NAME_OTHER = "test-group-02";
2628
public static final String TAGS = "*";
2729
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
*
3+
* * Licensed to the Apache Software Foundation (ASF) under one or more
4+
* * contributor license agreements. See the NOTICE file distributed with
5+
* * this work for additional information regarding copyright ownership.
6+
* * The ASF licenses this file to You under the Apache License, Version 2.0
7+
* * (the "License"); you may not use this file except in compliance with
8+
* * the License. You may obtain a copy of the License at
9+
* *
10+
* * http://www.apache.org/licenses/LICENSE-2.0
11+
* *
12+
* * Unless required by applicable law or agreed to in writing, software
13+
* * distributed under the License is distributed on an "AS IS" BASIS,
14+
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* * See the License for the specific language governing permissions and
16+
* * limitations under the License.
17+
*
18+
*/
19+
20+
package org.apache.rocketmq.streams.examples.rocketmqsource;
21+
22+
import org.apache.rocketmq.streams.client.StreamBuilder;
23+
import org.apache.rocketmq.streams.client.source.DataStreamSource;
24+
import org.apache.rocketmq.streams.client.strategy.WindowStrategy;
25+
import org.apache.rocketmq.streams.client.transform.DataStream;
26+
import org.apache.rocketmq.streams.client.transform.JoinStream;
27+
import org.apache.rocketmq.streams.client.transform.window.Time;
28+
import org.apache.rocketmq.streams.examples.mutilconsumer.Producer;
29+
30+
import java.util.Random;
31+
import java.util.concurrent.ExecutorService;
32+
import java.util.concurrent.Executors;
33+
34+
import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_TOPIC;
35+
import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_TOPIC_OTHER;
36+
import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.NAMESRV_ADDRESS;
37+
import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_CONSUMER_GROUP_NAME;
38+
import static org.apache.rocketmq.streams.examples.rocketmqsource.Constant.RMQ_CONSUMER_GROUP_NAME_OTHER;
39+
40+
public class MultiRocketMQSourceStreamsExample {
41+
private static ExecutorService producerPool = Executors.newFixedThreadPool(2);
42+
private static ExecutorService consumerPool = Executors.newCachedThreadPool();
43+
private static Random random = new Random();
44+
45+
46+
public static void main(String[] args) {
47+
//producer
48+
producerPool.submit(new Runnable() {
49+
@Override
50+
public void run() {
51+
Producer.produceInLoop(RMQ_TOPIC, "data.txt");
52+
}
53+
});
54+
55+
//producer
56+
producerPool.submit(new Runnable() {
57+
@Override
58+
public void run() {
59+
Producer.produceInLoop(RMQ_TOPIC_OTHER, "data.txt");
60+
}
61+
});
62+
63+
64+
//consumer
65+
for (int i = 0; i < 1; i++) {
66+
consumerPool.submit(new Runnable() {
67+
@Override
68+
public void run() {
69+
try {
70+
runOneStreamsClient(77);
71+
} catch (Exception e) {
72+
e.printStackTrace();
73+
}
74+
}
75+
});
76+
}
77+
78+
}
79+
80+
private static void runOneStreamsClient(int index) {
81+
int namespaceIndex = index;
82+
int pipelineIndex = index;
83+
DataStreamSource leftSource = StreamBuilder.dataStream("namespace" + namespaceIndex, "pipeline" + pipelineIndex);
84+
DataStream left = leftSource.fromRocketmq(
85+
RMQ_TOPIC,
86+
RMQ_CONSUMER_GROUP_NAME,
87+
true,
88+
NAMESRV_ADDRESS);
89+
90+
int otherPipelineIndex = index + 1;
91+
DataStreamSource rightSource = StreamBuilder.dataStream("namespace" + namespaceIndex, "pipeline" + otherPipelineIndex);
92+
DataStream right = rightSource.fromRocketmq(
93+
RMQ_TOPIC_OTHER,
94+
RMQ_CONSUMER_GROUP_NAME_OTHER,
95+
true,
96+
NAMESRV_ADDRESS);
97+
98+
left.join(right)
99+
.setJoinType(JoinStream.JoinType.LEFT_JOIN)
100+
.setCondition("(InFlow,==,InFlow)")
101+
.window(Time.minutes(1))
102+
.toDataSteam()
103+
.map(message -> {
104+
System.out.println(message);
105+
return message + "===";
106+
})
107+
.toPrint(1)
108+
.with(WindowStrategy.highPerformance())
109+
.start();
110+
try {
111+
Thread.sleep(10000);
112+
} catch (InterruptedException e) {
113+
e.printStackTrace();
114+
}
115+
}
116+
}

0 commit comments

Comments
 (0)