Skip to content

Commit bb64b41

Browse files
authored
Merge pull request apache#112 from programer-0/release-1.0.0
Fix join and Window issues, add the mqtt source and sink
2 parents 32516c6 + 50048b2 commit bb64b41

File tree

302 files changed

+12647
-6085
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

302 files changed

+12647
-6085
lines changed

README-chinese.md

Lines changed: 42 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
1-
# RocketMQ Streams
2-
## Features
1+
[![GitHub release](https://img.shields.io/badge/release-download-orange.svg)](https://github.com/apache/rocketmq-streams/releases)
2+
[![License](https://img.shields.io/badge/license-Apache%202-4EB1BA.svg)](https://www.apache.org/licenses/LICENSE-2.0.html)
3+
[![Average time to resolve an issue](http://isitmaintained.com/badge/resolution/apache/rocketmq-streams.svg)](http://isitmaintained.com/project/apache/rocketmq-streams "Average time to resolve an issue")
4+
[![Percentage of issues still open](http://isitmaintained.com/badge/open/apache/rocketmq-streams.svg)](http://isitmaintained.com/project/apache/rocketmq-streams "Percentage of issues still open")
5+
[![Twitter Follow](https://img.shields.io/twitter/follow/ApacheRocketMQ?style=social)](https://twitter.com/intent/follow?screen_name=ApacheRocketMQ)
6+
7+
# Features
38

49
* 轻量级部署:可以单独部署,也支持集群部署
510
* 多种类型的数据输入以及输出,source 支持 rocketmq , sink 支持db, rocketmq 等
611

7-
## DataStream Example
12+
# DataStream Example
813

914
```java
1015
import org.apache.rocketmq.streams.client.transform.DataStream;
@@ -18,14 +23,14 @@ DataStreamSource source=StreamBuilder.dataStream("namespace","pipeline");
1823
.start();
1924
```
2025

21-
## Maven Repository
26+
# Maven Repository
2227

2328
```xml
2429

2530
<dependency>
2631
<groupId>org.apache.rocketmq</groupId>
2732
<artifactId>rocketmq-streams-clients</artifactId>
28-
<version>1.0.0-Preview-SNAPSHOT</version>
33+
<version>1.0.0-SNAPSHOT</version>
2934
</dependency>
3035
```
3136

@@ -35,10 +40,9 @@ rocketmq-stream 实现了一系列高级的API,可以让用户很方便的编
3540

3641
## StreamBuilder
3742

38-
StreamBuilder 用于构建流任务的源; 内部包含```dataStream()``````tableStream()```俩个方法,分别返回DataStreamSource和TableStreamSource俩个源;
43+
StreamBuilder 用于构建流任务的源;
3944

4045
+ [dataStream(nameSpaceName,pipelineName)]() 返回DataStreamSource实例,用于分段编程实现流计算任务;
41-
+ [tableStream(nameSpaceName,pipelineName)]()返回TableStreamSource实例, 用于脚本编程实现流计算任务;
4246

4347
## DataStream API
4448

@@ -57,14 +61,24 @@ DataStreamSource 是分段式编程的源头类,用于对接各种数据源,
5761
+ ```isJson``` 是否json格式,非必填参数
5862
+ ```tags``` rocketmq消费的tags值,用于过滤消息,非必填参数
5963

64+
+ ```fromMqtt``` 从满足MQTT协议的终端读取数据, 满足边缘计算的场景,其中包含9个参数
65+
+ ```url``` mqtt broker的地址,必填参数
66+
+ ```clientId``` 客户端ID, 必填参数,相同的clientId有负载的作用
67+
+ ```topic``` topic信息, 必填参数
68+
+ ```username``` 用户名, 非必填,在mqtt端添加鉴权机制时使用
69+
+ ```password``` 密码,非必填参数,在mqtt端添加鉴权机制时使用
70+
+ ```cleanSession``` 是否清理session信息, 非必填,默认为true
71+
+ ```connectionTimeout``` 连接超时信息, 非必填,默认是10s
72+
+ ```aliveInterval``` 判断连接是否活跃的间隔时间,非必填,默认是60s
73+
+ ```automaticReconnect``` 连接断开后自动重连机制,非必填,默认是true
74+
75+
6076
+ ```from``` 自定义的数据源, 通过实现ISource接口实现自己的数据源
6177

6278
### transform
6379

6480
transform 允许在流计算过程中对输入源的数据进行修改,进行下一步的操作;DataStream API中包括```DataStream```,```JoinStream```, ```SplitStream```,```WindowStream```等多个transform类;
6581

66-
#### DataStream
67-
6882
DataStream实现了一系列常见的流计算算子
6983

7084
+ ```map``` 通过将源的每个记录传递给函数func来返回一个新的DataStream
@@ -74,8 +88,9 @@ DataStream实现了一系列常见的流计算算子
7488
+ ```selectFields``` 对每个记录返回对应的字段值,返回一个新的DataStream
7589
+ ```operate``` 对每个记录执行一次自定义的函数,返回一个新的DataStream
7690
+ ```script``` 针对每个记录的字段执行一段脚本,返回新的字段,生成一个新的DataStream
77-
+ ```toPrint``` 将结果在控制台打印,生成新的DataStreamAction实例
78-
+ ```toFile``` 将结果保存为文件,生成一个新的DataStreamAction实例
91+
+ ```toPrint``` 将结果在控制台打印,生成新的DataStream实例
92+
+ ```toFile``` 将结果保存为文件,生成一个新的DataStream实例
93+
+ ```toMqtt``` 将结果输出到满足mqtt协议的设备中,生成一个新的DataStream实例
7994
+ ```toDB``` 将结果保存到数据库
8095
+ ```toRocketmq``` 将结果输出到rocketmq
8196
+ ```to``` 将结果经过自定义的ISink接口输出到指定的存储
@@ -86,12 +101,15 @@ DataStream实现了一系列常见的流计算算子
86101
+ ```avg``` 获取窗口内统计值的平均值
87102
+ ```sum``` 获取窗口内统计值的加和值
88103
+ ```reduce``` 在窗口内进行自定义的汇总运算
89-
+ ```join``` 根据条件将将俩个流进行关联, 合并为一个大流进行相关的运算
104+
+ ```join``` 根据条件将俩个流进行内关联
105+
+ ```leftJoin``` 根据条件将俩个流的数据进行左关联
106+
+ ```dimJoin``` 根据条件将流与维表进行内关联,维表的数据可以来自于文件,也可以来自于数据库
107+
+ ```dimLeftJoin``` 根据条件将流与维表进行左关联,维表的数据可以来自于文件,也可以来自于数据库
90108
+ ```union``` 将俩个流进行合并
91109
+ ```split``` 将一个数据流按照标签进行拆分,分为不同的数据流供下游进行分析计算
92110
+ ```with``` with算子用来指定计算过程中的相关策略,包括checkpoint的存储策略,state的存储策略等
93111

94-
# Strategy
112+
#### Strategy
95113

96114
策略机制主要用来控制计算引擎运行过程中的底层逻辑,如checkpoint,state的存储方式等,后续还会增加对窗口、双流join等的控制;所有的控制策略通过```with```算子传入,可以同时传入多个策略类型;
97115

@@ -105,34 +123,20 @@ source
105123
.start();
106124
```
107125

108-
# Deployment
109-
Rocketmq-Streams 的任务提供多种发布方式, 用户可以依照自己的需求自行选择
110-
111-
## 与应用程序集成
112-
Rocketmq-Streams 核心就是一个独立的jar包, 用户可以在自己的应用中引该jar包, 然后进行自己逻辑的开发, 并在自己的应用中启动, 此时流任务和应用共享资源。
126+
# 运行
113127

114-
## 独立部署
115-
1. 通过```mvn clean install``` 构建工程
116-
2.```rocketmq-streams-runner/target/rocket-streams-1.0.0-Preview-SNAPSHOT-distribution.tar.gz``` 中获取tar.gz包, 并解压
117-
3. ```rocketmq-streams```目录架构如下:
118-
+ ```bin``` 指令目录,包括启动和停止指令
119-
+ ```conf``` 配置目录,包括日志配置以及应用的相关配置文件
120-
+ ```jobs``` 任务目录, 独立打包后的rocketmq-streams jar包
121-
+ ```lib``` 依赖包
122-
+ ```log``` 日志目录
128+
Rocketmq-Streams 作为典型的java应用,既可以集成在业务系统里运行,也可以作为一个独立的jar包来运行;
123129

124-
### 发布应用
125-
用户依赖rocketmq-streams,开发流处理程序,独立打包后, 将jar包拷贝到jobs目录, 通过指令即可完成任务的启动和运行;可以通过启动多个独立的应用程序;
130+
首先对应用的源码进行编译
126131

127-
```java
128-
bin/start.sh org.apache.rocketmq.streams.examples.filesource.FileSourceExample(实时任务的入口类)
132+
```shell
133+
mvn -Prelease-all -DskipTests clean install -U
129134
```
130-
多个任务可以同时启动,入口类用逗号分隔
131135

132-
```java
133-
bin/start.sh org.apache.rocketmq.streams.examples.filesource.FileSourceExample,org.apache.rocketmq.streams.examples.checkpoint.RemoteCheckpointTest
134-
```
135-
任务停止
136-
```java
137-
bin/stop.sh org.apache.rocketmq.streams.examples.filesource.FileSourceExample(实时任务的入口类)
136+
然后直接通过java指令来运行
137+
138+
```shell
139+
java -jar jarName mainClass
138140
```
141+
142+
更多详细的案例可以看[这里](docs/SUMMARY.md)

README.md

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@
66
[![Percentage of issues still open](http://isitmaintained.com/badge/open/apache/rocketmq-streams.svg)](http://isitmaintained.com/project/apache/rocketmq-streams "Percentage of issues still open")
77
[![Twitter Follow](https://img.shields.io/twitter/follow/ApacheRocketMQ?style=social)](https://twitter.com/intent/follow?screen_name=ApacheRocketMQ)
88

9-
## [中文文档](./README-chinese.md)
10-
## [Quick Start](./quick_start.md)
9+
## [中文文档](./README-Chinese.md)
1110

11+
## [Quick Start](./quick_start.md)
1212

1313
## Features
1414

@@ -35,7 +35,7 @@ DataStreamSource source=StreamBuilder.dataStream("namespace","pipeline");
3535
<dependency>
3636
<groupId>org.apache.rocketmq</groupId>
3737
<artifactId>rocketmq-streams-clients</artifactId>
38-
<version>1.0.0-Preview-SNAPSHOT</version>
38+
<version>1.0.0-SNAPSHOT</version>
3939
</dependency>
4040
```
4141

@@ -97,7 +97,11 @@ DataStream implements a series of common stream calculation operators as follows
9797
+ ```avg```: gets the average of the statistical values in the window.
9898
+ ```sum```: gets the sum of the statistical values in the window.
9999
+ ```reduce```: performs custom summary calculations in the window.
100-
+ ```join```: associates the two streams according to the conditions and merges them into a large stream for related calculations.
100+
+ ```join```: associates the two streams or one stream and one physical table according to the conditions and merges them into a large stream for related calculations.
101+
+ ```dimJoin``` associate a stream with a physical table which can be a file or a db table, and all matching records are retained
102+
+ ```dimLeftJoin``` After a flow is associated with a physical table, all data of the flow is reserved and fields that do not match the physical table are left blank
103+
+ ```join```
104+
+ ```leftJoin```
101105
+ ```union```: merges the two streams.
102106
+ ```split```: splits a data stream into different data streams according to tags for downstream analysis and calculation.
103107
+ ```with```: specifies related strategies during the calculation, including Checkpoint and state storage strategies, etc.
@@ -115,4 +119,5 @@ source
115119
.with(CheckpointStrategy.db("jdbc:mysql://XXXXX:3306/XXXXX","","",0L))
116120
.start();
117121
```
122+
118123
——————————————

docs/README.md

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
[![GitHub release](https://img.shields.io/badge/release-download-orange.svg)](https://github.com/apache/rocketmq-streams/releases)
2+
[![License](https://img.shields.io/badge/license-Apache%202-4EB1BA.svg)](https://www.apache.org/licenses/LICENSE-2.0.html)
3+
[![Average time to resolve an issue](http://isitmaintained.com/badge/resolution/apache/rocketmq-streams.svg)](http://isitmaintained.com/project/apache/rocketmq-streams "Average time to resolve an issue")
4+
[![Percentage of issues still open](http://isitmaintained.com/badge/open/apache/rocketmq-streams.svg)](http://isitmaintained.com/project/apache/rocketmq-streams "Percentage of issues still open")
5+
[![Twitter Follow](https://img.shields.io/twitter/follow/ApacheRocketMQ?style=social)](https://twitter.com/intent/follow?screen_name=ApacheRocketMQ)
6+
7+
# Features
8+
9+
* 轻量级部署:可以单独部署,也支持集群部署
10+
* 多种类型的数据输入以及输出,source 支持 rocketmq , sink 支持db, rocketmq 等
11+
12+
# DataStream Example
13+
14+
```java
15+
import org.apache.rocketmq.streams.client.transform.DataStream;
16+
17+
DataStreamSource source=StreamBuilder.dataStream("namespace","pipeline");
18+
19+
source
20+
.fromFile("~/admin/data/text.txt",false)
21+
.map(message->message)
22+
.toPrint(1)
23+
.start();
24+
```
25+
26+
# Maven Repository
27+
28+
```xml
29+
30+
<dependency>
31+
<groupId>org.apache.rocketmq</groupId>
32+
<artifactId>rocketmq-streams-clients</artifactId>
33+
<version>1.0.0-SNAPSHOT</version>
34+
</dependency>
35+
```
36+
37+
# Core API
38+
39+
rocketmq-stream 实现了一系列高级的API,可以让用户很方便的编写流计算的程序,实现自己的业务需求;
40+
41+
## StreamBuilder
42+
43+
StreamBuilder 用于构建流任务的源;
44+
45+
+ [dataStream(nameSpaceName,pipelineName)]() 返回DataStreamSource实例,用于分段编程实现流计算任务;
46+
47+
## DataStream API
48+
49+
### Source
50+
51+
DataStreamSource 是分段式编程的源头类,用于对接各种数据源, 从各大消息队列中获取数据;
52+
53+
+ ```fromFile``` 从文件中读取数据, 该方法包含俩个参数
54+
+ ```filePath``` 文件路径,必填参数
55+
+ ```isJsonData``` 是否json数据, 非必填参数, 默认为```true```
56+
57+
58+
+ ```fromRocketmq``` 从rocketmq中获取数据,包含四个参数
59+
+ ```topic``` rocketmq消息队列的topic名称,必填参数
60+
+ ```groupName``` 消费者组的名称,必填参数
61+
+ ```isJson``` 是否json格式,非必填参数
62+
+ ```tags``` rocketmq消费的tags值,用于过滤消息,非必填参数
63+
64+
+ ```fromMqtt``` 从满足MQTT协议的终端读取数据, 满足边缘计算的场景,其中包含9个参数
65+
+ ```url``` mqtt broker的地址,必填参数
66+
+ ```clientId``` 客户端ID, 必填参数,相同的clientId有负载的作用
67+
+ ```topic``` topic信息, 必填参数
68+
+ ```username``` 用户名, 非必填,在mqtt端添加鉴权机制时使用
69+
+ ```password``` 密码,非必填参数,在mqtt端添加鉴权机制时使用
70+
+ ```cleanSession``` 是否清理session信息, 非必填,默认为true
71+
+ ```connectionTimeout``` 连接超时信息, 非必填,默认是10s
72+
+ ```aliveInterval``` 判断连接是否活跃的间隔时间,非必填,默认是60s
73+
+ ```automaticReconnect``` 连接断开后自动重连机制,非必填,默认是true
74+
75+
76+
+ ```from``` 自定义的数据源, 通过实现ISource接口实现自己的数据源
77+
78+
### transform
79+
80+
transform 允许在流计算过程中对输入源的数据进行修改,进行下一步的操作;DataStream API中包括```DataStream```,```JoinStream```, ```SplitStream```,```WindowStream```等多个transform类;
81+
82+
DataStream实现了一系列常见的流计算算子
83+
84+
+ ```map``` 通过将源的每个记录传递给函数func来返回一个新的DataStream
85+
+ ```flatmap``` 与map类似,一个输入项对应0个或者多个输出项
86+
+ ```filter``` 只选择func返回true的源DStream的记录来返回一个新的DStream
87+
+ ```forEach``` 对每个记录执行一次函数func, 返回一个新的DataStream
88+
+ ```selectFields``` 对每个记录返回对应的字段值,返回一个新的DataStream
89+
+ ```operate``` 对每个记录执行一次自定义的函数,返回一个新的DataStream
90+
+ ```script``` 针对每个记录的字段执行一段脚本,返回新的字段,生成一个新的DataStream
91+
+ ```toPrint``` 将结果在控制台打印,生成新的DataStream实例
92+
+ ```toFile``` 将结果保存为文件,生成一个新的DataStream实例
93+
+ ```toMqtt``` 将结果输出到满足mqtt协议的设备中,生成一个新的DataStream实例
94+
+ ```toDB``` 将结果保存到数据库
95+
+ ```toRocketmq``` 将结果输出到rocketmq
96+
+ ```to``` 将结果经过自定义的ISink接口输出到指定的存储
97+
+ ```window``` 在窗口内进行相关的统计分析,一般会与```groupBy```连用, ```window()```用来定义窗口的大小, ```groupBy()```用来定义统计分析的主key,可以指定多个
98+
+ ```count``` 在窗口内计数
99+
+ ```min``` 获取窗口内统计值的最小值
100+
+ ```max``` 获取窗口内统计值得最大值
101+
+ ```avg``` 获取窗口内统计值的平均值
102+
+ ```sum``` 获取窗口内统计值的加和值
103+
+ ```reduce``` 在窗口内进行自定义的汇总运算
104+
+ ```join``` 根据条件将俩个流进行内关联
105+
+ ```leftJoin``` 根据条件将俩个流的数据进行左关联
106+
+ ```dimJoin``` 根据条件将流与维表进行内关联,维表的数据可以来自于文件,也可以来自于数据库
107+
+ ```dimLeftJoin``` 根据条件将流与维表进行左关联,维表的数据可以来自于文件,也可以来自于数据库
108+
+ ```union``` 将俩个流进行合并
109+
+ ```split``` 将一个数据流按照标签进行拆分,分为不同的数据流供下游进行分析计算
110+
+ ```with``` with算子用来指定计算过程中的相关策略,包括checkpoint的存储策略,state的存储策略等
111+
112+
#### Strategy
113+
114+
策略机制主要用来控制计算引擎运行过程中的底层逻辑,如checkpoint,state的存储方式等,后续还会增加对窗口、双流join等的控制;所有的控制策略通过```with```算子传入,可以同时传入多个策略类型;
115+
116+
```java
117+
//指定checkpoint的存储策略
118+
source
119+
.fromRocketmq("TSG_META_INFO","")
120+
.map(message->message+"--")
121+
.toPrint(1)
122+
.with(CheckpointStrategy.db("jdbc:mysql://XXXXX:3306/XXXXX","","",0L))
123+
.start();
124+
```
125+
126+
# 运行
127+
128+
Rocketmq-Streams 作为典型的java应用,既可以集成在业务系统里运行,也可以作为一个独立的jar包来运行;
129+
130+
首先对应用的源码进行编译
131+
132+
```shell
133+
mvn -Prelease-all -DskipTests clean install -U
134+
```
135+
136+
然后直接通过java指令来运行
137+
138+
```shell
139+
java -jar jarName mainClass
140+
```
141+
142+
更多详细的案例可以看[这里](docs/SUMMARY.md)

docs/SUMMARY.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# Summary
2+
3+
* [Introduction](README.md)
4+
* [Quick Start](quick_start/README.md)
5+
* [创建实时任务数据源](stream_source/README.md)
6+
* [创建实时任务数据输出](stream_sink/README.md)
7+
* [数据处理逻辑](stream_transform/README.md)
8+

docs/book.json

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
{
2+
"plugins": [
3+
"collapsible-menu",
4+
"anchor-navigation-ex",
5+
"tbfed-pagefooter",
6+
"disqus",
7+
"expandable-chapters-small",
8+
"-lunr",
9+
"-search",
10+
"search-plus",
11+
"-sharing",
12+
"splitter",
13+
"hide-element",
14+
"insert-logo",
15+
"code"
16+
],
17+
"title": "# Rrocket-Stream(SQL)",
18+
"pluginsConfig": {
19+
"tbfed-pagefooter": {
20+
"copyright": "Copyright &copy 阿里云智能引擎团队 2021",
21+
"modify_label": "文件修订时间:",
22+
"modify_format": "YYYY-MM-DD HH:mm:ss"
23+
},
24+
"disqus": {
25+
"shortName": "gitbookuse"
26+
},
27+
"theme-default": {
28+
"showLevel": false
29+
},
30+
"hide-element": {
31+
"elements": [
32+
".gitbook-link"
33+
]
34+
},
35+
"insert-logo": {
36+
"url": "/images/logo.jpeg",
37+
"style": "background: none; max-height: 120px; min-height: 120px"
38+
},
39+
"anchor-navigation-ex": {
40+
"showLevel": false,
41+
"showGoTop": false
42+
}
43+
}
44+
}

docs/images/logo.jpeg

23 KB
Loading

0 commit comments

Comments
 (0)