Skip to content

Commit d1e031e

Browse files
cyril68vv
andauthored
add checkpoint storage (apache#69)
* add channel-db module * add channel-configurable * add chinese * fixed DataStream flatMap * U * stash & pre merge * add license for chackpoint/pom.xml * add license for checkpoint/pom.xml * fix license Co-authored-by: vv <[email protected]>
1 parent d16c6c8 commit d1e031e

File tree

22 files changed

+973
-192
lines changed

22 files changed

+973
-192
lines changed

README-chinese.md

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
# RocketMQ Streams
2+
## Features
3+
4+
* 轻量级部署:可以单独部署,也支持集群部署
5+
* 多种类型的数据输入以及输出,source 支持 rocketmq , sink 支持db, rocketmq 等
6+
7+
## DataStream Example
8+
9+
```java
10+
import org.apache.rocketmq.streams.client.transform.DataStream;
11+
12+
DataStreamSource source=StreamBuilder.dataStream("namespace","pipeline");
13+
14+
source
15+
.fromFile("~/admin/data/text.txt",false)
16+
.map(message->message)
17+
.toPrint(1)
18+
.start();
19+
```
20+
21+
## Maven Repository
22+
23+
```xml
24+
25+
<dependency>
26+
<groupId>org.apache.rocketmq</groupId>
27+
<artifactId>rocketmq-streams-clients</artifactId>
28+
<version>1.0.0-SNAPSHOT</version>
29+
</dependency>
30+
```
31+
32+
# Core API
33+
34+
rocketmq-stream 实现了一系列高级的API,可以让用户很方便的编写流计算的程序,实现自己的业务需求;
35+
36+
## StreamBuilder
37+
38+
StreamBuilder 用于构建流任务的源; 内部包含```dataStream()``````tableStream()```俩个方法,分别返回DataStreamSource和TableStreamSource俩个源;
39+
40+
+ [dataStream(nameSpaceName,pipelineName)]() 返回DataStreamSource实例,用于分段编程实现流计算任务;
41+
+ [tableStream(nameSpaceName,pipelineName)]()返回TableStreamSource实例, 用于脚本编程实现流计算任务;
42+
43+
## DataStream API
44+
45+
### Source
46+
47+
DataStreamSource 是分段式编程的源头类,用于对接各种数据源, 从各大消息队列中获取数据;
48+
49+
+ ```fromFile``` 从文件中读取数据, 该方法包含俩个参数
50+
+ ```filePath``` 文件路径,必填参数
51+
+ ```isJsonData``` 是否json数据, 非必填参数, 默认为```true```
52+
53+
54+
+ ```fromRocketmq``` 从rocketmq中获取数据,包含四个参数
55+
+ ```topic``` rocketmq消息队列的topic名称,必填参数
56+
+ ```groupName``` 消费者组的名称,必填参数
57+
+ ```isJson``` 是否json格式,非必填参数
58+
+ ```tags``` rocketmq消费的tags值,用于过滤消息,非必填参数
59+
60+
+ ```from``` 自定义的数据源, 通过实现ISource接口实现自己的数据源
61+
62+
### transform
63+
64+
transform 允许在流计算过程中对输入源的数据进行修改,进行下一步的操作;DataStream API中包括```DataStream```,```JoinStream```, ```SplitStream```,```WindowStream```等多个transform类;
65+
66+
#### DataStream
67+
68+
DataStream实现了一系列常见的流计算算子
69+
70+
+ ```map``` 通过将源的每个记录传递给函数func来返回一个新的DataStream
71+
+ ```flatmap``` 与map类似,一个输入项对应0个或者多个输出项
72+
+ ```filter``` 只选择func返回true的源DStream的记录来返回一个新的DStream
73+
+ ```forEach``` 对每个记录执行一次函数func, 返回一个新的DataStream
74+
+ ```selectFields``` 对每个记录返回对应的字段值,返回一个新的DataStream
75+
+ ```operate``` 对每个记录执行一次自定义的函数,返回一个新的DataStream
76+
+ ```script``` 针对每个记录的字段执行一段脚本,返回新的字段,生成一个新的DataStream
77+
+ ```toPrint``` 将结果在控制台打印,生成新的DataStreamAction实例
78+
+ ```toFile``` 将结果保存为文件,生成一个新的DataStreamAction实例
79+
+ ```toDB``` 将结果保存到数据库
80+
+ ```toRocketmq``` 将结果输出到rocketmq
81+
+ ```to``` 将结果经过自定义的ISink接口输出到指定的存储
82+
+ ```window``` 在窗口内进行相关的统计分析,一般会与```groupBy```连用, ```window()```用来定义窗口的大小, ```groupBy()```用来定义统计分析的主key,可以指定多个
83+
+ ```count``` 在窗口内计数
84+
+ ```min``` 获取窗口内统计值的最小值
85+
+ ```max``` 获取窗口内统计值得最大值
86+
+ ```avg``` 获取窗口内统计值的平均值
87+
+ ```sum``` 获取窗口内统计值的加和值
88+
+ ```reduce``` 在窗口内进行自定义的汇总运算
89+
+ ```join``` 根据条件将将俩个流进行关联, 合并为一个大流进行相关的运算
90+
+ ```union``` 将俩个流进行合并
91+
+ ```split``` 将一个数据流按照标签进行拆分,分为不同的数据流供下游进行分析计算
92+
+ ```with``` with算子用来指定计算过程中的相关策略,包括checkpoint的存储策略,state的存储策略等
93+
94+
# Strategy
95+
96+
策略机制主要用来控制计算引擎运行过程中的底层逻辑,如checkpoint,state的存储方式等,后续还会增加对窗口、双流join等的控制;所有的控制策略通过```with```算子传入,可以同时传入多个策略类型;
97+
98+
```java
99+
//指定checkpoint的存储策略
100+
source
101+
.fromRocketmq("TSG_META_INFO","")
102+
.map(message->message+"--")
103+
.toPrint(1)
104+
.with(CheckpointStrategy.db("jdbc:mysql://XXXXX:3306/XXXXX","","",0L))
105+
.start();
106+
```

pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
<module>rocketmq-streams-channel-http</module>
5353
<module>rocketmq-streams-state</module>
5454
<module>rocketmq-streams-examples</module>
55+
<module>rocketmq-streams-checkpoint</module>
5556

5657
</modules>
5758

rocketmq-streams-checkpoint/pom.xml

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
Licensed to the Apache Software Foundation (ASF) under one or more
4+
contributor license agreements. See the NOTICE file distributed with
5+
this work for additional information regarding copyright ownership.
6+
The ASF licenses this file to You under the Apache License, Version 2.0
7+
(the "License"); you may not use this file except in compliance with
8+
the License. You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
-->
18+
<project xmlns="http://maven.apache.org/POM/4.0.0"
19+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
20+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
21+
<parent>
22+
<artifactId>rocketmq-streams</artifactId>
23+
<groupId>org.apache.rocketmq</groupId>
24+
<version>1.0.0-SNAPSHOT</version>
25+
</parent>
26+
<modelVersion>4.0.0</modelVersion>
27+
28+
<artifactId>rocketmq-streams-checkpoint</artifactId>
29+
<name>ROCKETMQ STREAMS :: checkpoint</name>
30+
<packaging>jar</packaging>
31+
32+
<properties>
33+
<maven.compiler.source>8</maven.compiler.source>
34+
<maven.compiler.target>8</maven.compiler.target>
35+
</properties>
36+
<dependencies>
37+
<dependency>
38+
<groupId>org.apache.rocketmq</groupId>
39+
<artifactId>rocketmq-streams-commons</artifactId>
40+
<exclusions>
41+
<exclusion>
42+
<groupId>com.google.auto.service</groupId>
43+
<artifactId>auto-service</artifactId>
44+
</exclusion>
45+
</exclusions>
46+
</dependency>
47+
48+
<dependency>
49+
<groupId>org.apache.rocketmq</groupId>
50+
<artifactId>rocketmq-streams-db-operator</artifactId>
51+
<exclusions>
52+
<exclusion>
53+
<groupId>com.google.auto.service</groupId>
54+
<artifactId>auto-service</artifactId>
55+
</exclusion>
56+
</exclusions>
57+
</dependency>
58+
59+
<dependency>
60+
<groupId>com.google.auto.service</groupId>
61+
<artifactId>auto-service</artifactId>
62+
<optional>true</optional>
63+
</dependency>
64+
</dependencies>
65+
66+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.rocketmq.streams.checkpoint.db;
18+
19+
import org.apache.commons.logging.Log;
20+
import org.apache.commons.logging.LogFactory;
21+
import org.apache.rocketmq.streams.common.channel.source.ISource;
22+
import org.apache.rocketmq.streams.common.checkpoint.AbstractCheckPointStorage;
23+
import org.apache.rocketmq.streams.common.checkpoint.CheckPoint;
24+
import org.apache.rocketmq.streams.common.checkpoint.CheckPointManager;
25+
import org.apache.rocketmq.streams.common.checkpoint.SourceSnapShot;
26+
import org.apache.rocketmq.streams.db.driver.orm.ORMUtil;
27+
28+
import java.util.List;
29+
30+
/**
31+
* @description
32+
*/
33+
public class DBCheckPointStorage extends AbstractCheckPointStorage {
34+
35+
static final Log logger = LogFactory.getLog(DBCheckPointStorage.class);
36+
37+
static final String STORAGE_NAME = "DB";
38+
39+
public DBCheckPointStorage(){
40+
41+
}
42+
43+
@Override
44+
public String getStorageName() {
45+
return STORAGE_NAME;
46+
}
47+
48+
@Override
49+
public <T> void save(List<T> checkPointState) {
50+
logger.info(String.format("save checkpoint size %d", checkPointState.size()));
51+
ORMUtil.batchReplaceInto(checkPointState);
52+
}
53+
54+
@Override
55+
//todo
56+
public CheckPoint recover(ISource iSource, String queueId) {
57+
String sourceName = CheckPointManager.createSourceName(iSource, null);
58+
String key = CheckPointManager.createCheckPointKey(sourceName, queueId);
59+
String sql = "select * from source_snap_shot where `key` = " + "'" + key + "';";
60+
SourceSnapShot snapShot = ORMUtil.queryForObject(sql, null, SourceSnapShot.class);
61+
62+
logger.info(String.format("checkpoint recover key is %s, sql is %s, recover sourceSnapShot : %s", key, sql, snapShot == null ? "null snapShot" : snapShot.toString()));
63+
return new CheckPoint().fromSnapShot(snapShot);
64+
}
65+
}

rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/AbstractChannel.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,4 +227,14 @@ public void setJsonData(Boolean isJsonData) {
227227
create();
228228
((AbstractSource)source).setJsonData(isJsonData);
229229
}
230+
231+
@Override
232+
public String getTopic(){
233+
return source.getTopic();
234+
}
235+
236+
@Override
237+
public void setTopic(String topic){
238+
source.setTopic(topic);
239+
}
230240
}

rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/memory/MemoryChannel.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,4 +74,9 @@ protected boolean startSource() {
7474
}
7575
};
7676
}
77+
78+
@Override
79+
public String createCheckPointName() {
80+
return "memory-source";
81+
}
7782
}

rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/AbstractSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@
3232
import org.apache.rocketmq.streams.common.channel.source.ISource;
3333
import org.apache.rocketmq.streams.common.channel.split.ISplit;
3434
import org.apache.rocketmq.streams.common.checkpoint.CheckPointManager;
35-
import org.apache.rocketmq.streams.common.checkpoint.CheckPointManager.SourceState;
3635
import org.apache.rocketmq.streams.common.checkpoint.CheckPointMessage;
36+
import org.apache.rocketmq.streams.common.checkpoint.SourceState;
3737
import org.apache.rocketmq.streams.common.configurable.BasedConfigurable;
3838
import org.apache.rocketmq.streams.common.configurable.IConfigurableIdentification;
3939
import org.apache.rocketmq.streams.common.context.IMessage;

rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractSource.java

Lines changed: 46 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.apache.rocketmq.streams.common.context.UserDefinedMessage;
4343
import org.apache.rocketmq.streams.common.interfaces.IStreamOperator;
4444
import org.apache.rocketmq.streams.common.topology.builder.PipelineBuilder;
45+
import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
4546
import org.apache.rocketmq.streams.common.utils.StringUtil;
4647

4748
/**
@@ -75,6 +76,7 @@ public abstract class AbstractSource extends BasedConfigurable implements ISourc
7576

7677
protected List<String> logFingerprintFields;//log fingerprint to filter msg quickly
7778

79+
7880
/**
7981
* 数据源投递消息的算子,此算子用来接收source的数据,做处理
8082
*/
@@ -223,18 +225,22 @@ public JSONObject create(String message) {
223225
return createJson(message);
224226
}
225227

226-
228+
/**
229+
* 交给receiver执行后续逻辑
230+
*
231+
* @param channelMessage
232+
* @return
233+
*/
227234
public AbstractContext executeMessage(Message channelMessage) {
228235
AbstractContext context = new Context(channelMessage);
229-
if (!channelMessage.getHeader().isSystemMessage()) {
230-
messageQueueChangedCheck(channelMessage.getHeader());
231-
}
232-
233236
if (isSplitInRemoving(channelMessage)) {
234237
return context;
235238
}
239+
if (!channelMessage.getHeader().isSystemMessage()) {
240+
messageQueueChangedCheck(channelMessage.getHeader());
241+
}
236242

237-
boolean needFlush = !channelMessage.getHeader().isSystemMessage() && channelMessage.getHeader().isNeedFlush();
243+
boolean needFlush = channelMessage.getHeader().isSystemMessage() == false && channelMessage.getHeader().isNeedFlush();
238244

239245
if (receiver != null) {
240246
receiver.doMessage(channelMessage, context);
@@ -277,6 +283,9 @@ protected boolean isSplitInRemoving(Message channelMessage) {
277283
* @param header
278284
*/
279285
protected void messageQueueChangedCheck(MessageHeader header) {
286+
if (supportNewSplitFind() && supportRemoveSplitFind()) {
287+
return;
288+
}
280289
Set<String> queueIds = new HashSet<>();
281290
String msgQueueId = header.getQueueId();
282291
if (StringUtil.isNotEmpty(msgQueueId)) {
@@ -287,7 +296,7 @@ protected void messageQueueChangedCheck(MessageHeader header) {
287296
queueIds.addAll(checkpointQueueIds);
288297
}
289298
Set<String> newQueueIds = new HashSet<>();
290-
299+
Set<String> removeQueueIds = new HashSet<>();
291300
for (String queueId : queueIds) {
292301
if (isNotDataSplit(queueId)) {
293302
continue;
@@ -536,4 +545,34 @@ public boolean isBatchMessage() {
536545
return isBatchMessage;
537546
}
538547

548+
@Override
549+
public String createCheckPointName(){
550+
551+
ISource source = this;
552+
553+
String namespace = source.getNameSpace();
554+
String name = source.getConfigureName();
555+
String groupName = source.getGroupName();
556+
557+
558+
if(StringUtil.isEmpty(namespace)){
559+
namespace = "default_namespace";
560+
}
561+
562+
if(StringUtil.isEmpty(name)){
563+
name = "default_name";
564+
}
565+
566+
if(StringUtil.isEmpty(groupName)){
567+
groupName = "default_groupName";
568+
}
569+
String topic = source.getTopic();
570+
if(topic == null || topic.trim().length() == 0){
571+
topic = "default_topic";
572+
}
573+
return MapKeyUtil.createKey(namespace, groupName, topic, name);
574+
575+
}
576+
577+
539578
}

0 commit comments

Comments
 (0)