Skip to content

Commit 8e13e3c

Browse files
authored
Add a quick start which contain some examples. (#68)
* a runnable window example reading data from rocketmq. * add quick start * modify quick_start * modify example. * modify and add another pageclick example. * remove private path * fix spelling mistakes * use mysql as remote checkpoint storage * modify code style of import class * modify annotation Co-authored-by: nize <[email protected]>
1 parent 79fd2e4 commit 8e13e3c

File tree

38 files changed

+883
-254
lines changed

38 files changed

+883
-254
lines changed

QUICKSTART.md

Lines changed: 0 additions & 81 deletions
This file was deleted.

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
[![Twitter Follow](https://img.shields.io/twitter/follow/ApacheRocketMQ?style=social)](https://twitter.com/intent/follow?screen_name=ApacheRocketMQ)
88

99
## [中文文档](./README-Chinese.md)
10-
## [Quick Start](./QUICKSTART.md)
10+
## [Quick Start](./quick_start.md)
1111

1212

1313
## Features

pom.xml

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@
6969
<commons-logging.version>1.1</commons-logging.version>
7070
<spring.version>3.2.13.RELEASE</spring.version>
7171
<auto-service.version>1.0-rc5</auto-service.version>
72-
<mysql-connector.version>5.1.40</mysql-connector.version>
72+
<mysql-connector.version>8.0.26</mysql-connector.version>
7373
<fastjson.version>1.2.78</fastjson.version>
7474
<quartz.version>2.2.1</quartz.version>
7575
<httpclient.version>4.5.2</httpclient.version>
@@ -112,13 +112,13 @@
112112
<exclude>.asf.yaml</exclude>
113113
<exclude>README.md</exclude>
114114
<exclude>README-Chinese.md</exclude>
115-
<exclude>QUICKSTART.md</exclude>
115+
<exclude>quick_start.md</exclude>
116116
<exclude>.github/**</exclude>
117117
<exclude>*/target/**</exclude>
118118
<exclude>*/*.iml</exclude>
119119
<exclude>**/*.txt</exclude>
120120
<exclude>**/*.cs</exclude>
121-
<exclude>src/test/resources/window_msg_*</exclude>
121+
<exclude>**/*.sql</exclude>
122122
</excludes>
123123
</configuration>
124124
</plugin>
@@ -283,6 +283,12 @@
283283
<artifactId>rocketmq-streams-channel-rocketmq</artifactId>
284284
<version>${project.version}</version>
285285
</dependency>
286+
<dependency>
287+
<groupId>org.apache.rocketmq</groupId>
288+
<artifactId>rocketmq-streams-dbinit</artifactId>
289+
<version>${project.version}</version>
290+
</dependency>
291+
286292

287293
<!-- ================================================= -->
288294
<!-- rocketmq library -->

quick_start.md

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
## rocketmq-streams 快速搭建
2+
---
3+
### 前言
4+
本文档主要介绍如何基于rocketmq-streams快速搭建流处理任务,搭建过程中某些例子会用到rocketmq,可以参考[rocketmq搭建文档](https://rocketmq.apache.org/docs/quick-start/)
5+
6+
7+
### 1、源码构建
8+
9+
#### 1.1、构建环境
10+
- JDK 1.8 and above
11+
- Maven 3.2 and above
12+
13+
#### 1.2、构建Rocketmq-streams
14+
15+
`git clone https://github.com/apache/rocketmq-streams.git`
16+
`cd rocketmq-streams`
17+
`mvn clean -DskipTests install -U`
18+
19+
20+
### 2、基于rocketmq-streams创建应用
21+
22+
#### 2.1、pom依赖
23+
```xml
24+
<dependency>
25+
<groupId>org.apache.rocketmq</groupId>
26+
<artifactId>rocketmq-streams-clients</artifactId>
27+
</dependency>
28+
```
29+
#### 2.2、shade clients依赖包
30+
```xml
31+
<build>
32+
<plugins>
33+
<plugin>
34+
<groupId>org.apache.maven.plugins</groupId>
35+
<artifactId>maven-shade-plugin</artifactId>
36+
<version>3.2.1</version>
37+
<executions>
38+
<execution>
39+
<phase>package</phase>
40+
<goals>
41+
<goal>shade</goal>
42+
</goals>
43+
<configuration>
44+
<minimizeJar>false</minimizeJar>
45+
<shadedArtifactAttached>true</shadedArtifactAttached>
46+
<artifactSet>
47+
<includes>
48+
<include>org.apache.rocketmq:rocketmq-streams-clients</include>
49+
</includes>
50+
</artifactSet>
51+
</configuration>
52+
</execution>
53+
</executions>
54+
</plugin>
55+
</plugins>
56+
</build>
57+
```
58+
59+
#### 2.3、编写业务代码
60+
Please see the [rocketmq-streams-examples](rocketmq-streams-examples/README.md)
61+
#### 2.4、运行
62+
- 前提:在从rocketmq中读取数据做流处理时,需要运行topic在rocketmq中自动创建,因为做groupBy操作时,需要用到rocketmq作为shuffle数据的读写目的地。
63+
- 命令:
64+
```
65+
java -jar XXXX-shade.jar \
66+
-Dlog4j.level=ERROR \
67+
-Dlog4j.home=/logs \
68+
-Xms1024m \
69+
-Xmx1024m
70+
```
71+

rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
4141
import org.apache.rocketmq.tools.command.CommandUtil;
4242

43+
4344
public class RocketMQSink extends AbstractSupportShuffleSink {
4445

4546
private static final Log LOG = LogFactory.getLog(RocketMQSink.class);

rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,7 @@
4141
import org.apache.rocketmq.streams.common.channel.source.AbstractSupportOffsetResetSource;
4242
import org.apache.rocketmq.streams.common.channel.split.ISplit;
4343
import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
44-
import org.apache.rocketmq.streams.common.context.AbstractContext;
45-
import org.apache.rocketmq.streams.common.context.IMessage;
46-
import org.apache.rocketmq.streams.common.interfaces.IStreamOperator;
47-
import org.apache.rocketmq.streams.common.utils.DateUtil;
4844
import org.apache.rocketmq.streams.common.utils.ReflectUtil;
49-
import org.apache.rocketmq.streams.common.utils.RuntimeUtil;
5045
import org.apache.rocketmq.streams.debug.DebugWriter;
5146
import org.apache.rocketmq.streams.queue.RocketMQMessageQueue;
5247
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
@@ -60,7 +55,6 @@
6055
import java.util.Set;
6156
import java.util.UUID;
6257
import java.util.concurrent.ConcurrentMap;
63-
import java.util.concurrent.atomic.AtomicBoolean;
6458
import java.util.concurrent.atomic.AtomicLong;
6559

6660
public class RocketMQSource extends AbstractSupportOffsetResetSource {
@@ -134,6 +128,8 @@ protected DefaultMQPushConsumer startConsumer() {
134128
for (MessageExt msg : msgs) {
135129
String data = new String(msg.getBody(), CHARSET);
136130
JSONObject jsonObject = create(data);
131+
132+
137133
String queueId = RocketMQMessageQueue.getQueueId(context.getMessageQueue());
138134
String offset = msg.getQueueOffset() + "";
139135
org.apache.rocketmq.streams.common.context.Message message = createMessage(jsonObject, queueId, offset, false);

rocketmq-streams-clients/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@
5252
<groupId>org.apache.rocketmq</groupId>
5353
<artifactId>rocketmq-streams-window</artifactId>
5454
</dependency>
55+
<dependency>
56+
<groupId>org.apache.rocketmq</groupId>
57+
<artifactId>rocketmq-streams-dbinit</artifactId>
58+
</dependency>
5559
<dependency>
5660
<groupId>org.slf4j</groupId>
5761
<artifactId>slf4j-api</artifactId>

rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/DataStreamAction.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,6 @@
1818
package org.apache.rocketmq.streams.client;
1919

2020
import com.google.common.collect.Maps;
21-
22-
import java.util.Map;
23-
import java.util.Properties;
24-
import java.util.Set;
25-
2621
import org.apache.rocketmq.streams.client.strategy.Strategy;
2722
import org.apache.rocketmq.streams.client.transform.DataStream;
2823
import org.apache.rocketmq.streams.common.component.ComponentCreator;
@@ -32,6 +27,10 @@
3227
import org.apache.rocketmq.streams.common.topology.builder.PipelineBuilder;
3328
import org.apache.rocketmq.streams.configurable.ConfigurableComponent;
3429

30+
import java.util.Map;
31+
import java.util.Properties;
32+
import java.util.Set;
33+
3534
public class DataStreamAction extends DataStream {
3635

3736
private final Map<String, Object> properties = Maps.newHashMap();

rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DataStreamTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,11 @@
2828
import org.junit.Test;
2929

3030
import java.io.Serializable;
31-
import java.sql.*;
31+
import java.sql.Connection;
32+
import java.sql.DatabaseMetaData;
33+
import java.sql.DriverManager;
34+
import java.sql.ResultSet;
35+
import java.sql.SQLException;
3236

3337
public class DataStreamTest implements Serializable {
3438

rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/AbstractWindowTest.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,16 @@
1818
package org.apache.rocketmq.streams.client.windows;
1919

2020
import com.alibaba.fastjson.JSONObject;
21+
import org.apache.rocketmq.streams.client.transform.DataStream;
22+
import org.apache.rocketmq.streams.client.transform.window.Time;
23+
import org.apache.rocketmq.streams.client.transform.window.TumblingWindow;
24+
import org.apache.rocketmq.streams.common.functions.ForEachFunction;
25+
import org.apache.rocketmq.streams.common.functions.MapFunction;
26+
import org.apache.rocketmq.streams.common.utils.DateUtil;
27+
import org.apache.rocketmq.streams.common.utils.FileUtil;
28+
import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
29+
import org.apache.rocketmq.streams.common.utils.StringUtil;
30+
2131
import java.io.Serializable;
2232
import java.util.ArrayList;
2333
import java.util.Collections;
@@ -28,15 +38,6 @@
2838
import java.util.Map;
2939
import java.util.concurrent.atomic.AtomicInteger;
3040
import java.util.concurrent.atomic.AtomicLong;
31-
import org.apache.rocketmq.streams.client.transform.DataStream;
32-
import org.apache.rocketmq.streams.client.transform.window.Time;
33-
import org.apache.rocketmq.streams.client.transform.window.TumblingWindow;
34-
import org.apache.rocketmq.streams.common.functions.ForEachFunction;
35-
import org.apache.rocketmq.streams.common.functions.MapFunction;
36-
import org.apache.rocketmq.streams.common.utils.DateUtil;
37-
import org.apache.rocketmq.streams.common.utils.FileUtil;
38-
import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
39-
import org.apache.rocketmq.streams.common.utils.StringUtil;
4041

4142
import static junit.framework.TestCase.assertTrue;
4243

rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/OutputPrintChannel.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,10 @@
1616
*/
1717
package org.apache.rocketmq.streams.common.channel.impl;
1818

19-
import java.util.List;
20-
2119
import org.apache.rocketmq.streams.common.channel.sink.AbstractSink;
2220
import org.apache.rocketmq.streams.common.context.IMessage;
23-
import org.apache.rocketmq.streams.common.utils.PrintUtil;
21+
22+
import java.util.List;
2423

2524
/**
2625
* 测试使用,输出就是把消息打印出来
@@ -30,7 +29,7 @@ public class OutputPrintChannel extends AbstractSink {
3029
@Override
3130
protected boolean batchInsert(List<IMessage> messages) {
3231
for (IMessage msg : messages) {
33-
//System.out.println(msg.getMessageValue());
32+
System.out.println(msg.getMessageValue());
3433
}
3534
return false;
3635
}

rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/file/FileSource.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.io.FileNotFoundException;
2222
import java.io.FileReader;
2323
import java.io.IOException;
24+
import java.net.URL;
2425
import java.util.Iterator;
2526
import java.util.concurrent.CountDownLatch;
2627
import java.util.concurrent.ExecutorService;
@@ -55,7 +56,7 @@ public FileSource(String filePath) {
5556
@Override
5657
protected boolean initConfigurable() {
5758
super.initConfigurable();
58-
File file = new File(filePath);
59+
File file = getFile(filePath);
5960
if (file.exists() && file.isDirectory()) {
6061
executorService = new ThreadPoolExecutor(maxThread, maxThread,
6162
0L, TimeUnit.MILLISECONDS,
@@ -64,6 +65,23 @@ protected boolean initConfigurable() {
6465
return true;
6566
}
6667

68+
private File getFile(String filePath) {
69+
File file = new File(filePath);
70+
if (!file.exists()) {
71+
ClassLoader loader = getClass().getClassLoader();
72+
URL url = loader.getResource(filePath);
73+
74+
if (url != null) {
75+
String path = url.getFile();
76+
file = new File(path);
77+
this.filePath = path;
78+
}
79+
}
80+
return file;
81+
82+
83+
}
84+
6785
@Override
6886
protected boolean startSource() {
6987

@@ -97,7 +115,7 @@ protected boolean startSource() {
97115
*/
98116
protected LinkedBlockingQueue<FileIterator> createIteratorList() {
99117
LinkedBlockingQueue<FileIterator> iterators = new LinkedBlockingQueue<>(1000);
100-
File file = new File(filePath);
118+
File file = getFile(filePath);
101119
if (file.exists() == false) {
102120
return null;
103121
}

0 commit comments

Comments
 (0)