Skip to content

Commit 9df9a60

Browse files
authored
Merge pull request apache#49 from yuanxiaodong/window
add cache filter to excute quickly for script
2 parents 2892e92 + 647fa4b commit 9df9a60

File tree

63 files changed

+1440
-471
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

63 files changed

+1440
-471
lines changed

rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/DBSink.java

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,12 @@
2323
import java.sql.ResultSet;
2424
import java.sql.SQLException;
2525
import java.util.List;
26+
import java.util.Set;
2627
import org.apache.rocketmq.streams.common.channel.IChannel;
2728
import org.apache.rocketmq.streams.common.channel.sink.AbstractSink;
29+
import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache;
30+
import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack;
31+
import org.apache.rocketmq.streams.common.channel.sinkcache.impl.MessageCache;
2832
import org.apache.rocketmq.streams.common.component.AbstractComponent;
2933
import org.apache.rocketmq.streams.common.configurable.annotation.ENVDependence;
3034
import org.apache.rocketmq.streams.common.context.IMessage;
@@ -54,6 +58,10 @@ public class DBSink extends AbstractSink {
5458
@ENVDependence
5559
protected String password;
5660

61+
protected boolean openSqlCache=false;
62+
63+
protected transient IMessageCache<String> sqlCache;//cache sql, batch submit sql
64+
5765
/**
5866
* db串多数是名字,可以取个名字前缀,如果值为空,默认为此类的name,name为空,默认为简单类名
5967
*
@@ -103,6 +111,25 @@ protected boolean initConfigurable() {
103111
ResultSet metaResult = metaData.getColumns(connection.getCatalog(), "%", this.tableName, null);
104112
this.metaData = MetaData.createMetaData(metaResult);
105113
this.metaData.setTableName(this.tableName);
114+
sqlCache=new MessageCache<>(new IMessageFlushCallBack<String>() {
115+
@Override public boolean flushMessage(List<String> sqls) {
116+
JDBCDriver dataSource = DriverBuilder.createDriver(jdbcDriver, url, userName, password);
117+
try {
118+
dataSource.executSqls(sqls);
119+
} catch (Exception e) {
120+
e.printStackTrace();
121+
throw new RuntimeException(e);
122+
} finally {
123+
if (dataSource != null) {
124+
dataSource.destroy();
125+
}
126+
}
127+
return true;
128+
}
129+
});
130+
((MessageCache<String>) sqlCache).setAutoFlushTimeGap(100000);
131+
((MessageCache<String>) sqlCache).setAutoFlushSize(50);
132+
sqlCache.openAutoFlush();
106133
}
107134
return super.initConfigurable();
108135
} catch (ClassNotFoundException | SQLException e) {
@@ -157,8 +184,20 @@ protected boolean batchInsert(List<IMessage> messageList) {
157184
}
158185
}
159186

187+
@Override public boolean checkpoint(Set<String> splitIds) {
188+
if(sqlCache!=null){
189+
sqlCache.flush(splitIds);
190+
}
191+
return true;
192+
}
193+
160194
protected void executeSQL(JDBCDriver dbDataSource, String sql) {
161-
dbDataSource.execute(sql);
195+
if(isOpenSqlCache()){
196+
this.sqlCache.addCache(sql);
197+
}else {
198+
dbDataSource.execute(sql);
199+
}
200+
162201
}
163202

164203
/**
@@ -238,4 +277,12 @@ public String getTableName() {
238277
public void setTableName(String tableName) {
239278
this.tableName = tableName;
240279
}
280+
281+
public boolean isOpenSqlCache() {
282+
return openSqlCache;
283+
}
284+
285+
public void setOpenSqlCache(boolean openSqlCache) {
286+
this.openSqlCache = openSqlCache;
287+
}
241288
}

rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/strategy/WindowStrategy.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818

1919
import java.util.Properties;
2020
import org.apache.rocketmq.streams.common.component.AbstractComponent;
21+
import org.apache.rocketmq.streams.common.component.ComponentCreator;
2122
import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
23+
import org.apache.rocketmq.streams.common.configure.ConfigureFileKey;
2224

2325
public class WindowStrategy implements Strategy {
2426

@@ -51,6 +53,9 @@ public static Strategy highPerformance() {
5153
return new WindowStrategy();
5254
}
5355

54-
56+
public static Strategy windowDefaultSiZe(int defualtSize){
57+
ComponentCreator.getProperties().put(ConfigureFileKey.DIPPER_WINDOW_DEFAULT_INERVAL_SIZE,defualtSize);
58+
return null;
59+
}
5560

5661
}

rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/DataStream.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -402,6 +402,16 @@ protected void addOtherDataStream(DataStream rightSource) {
402402

403403
this.otherPipelineBuilders.addAll(rightSource.otherPipelineBuilders);
404404
}
405+
406+
public DataStreamAction toFile(String filePath,int batchSize,boolean isAppend) {
407+
FileSink fileChannel = new FileSink(filePath,isAppend);
408+
if(batchSize>0){
409+
fileChannel.setBatchSize(batchSize);
410+
}
411+
ChainStage<?> output = mainPipelineBuilder.createStage(fileChannel);
412+
mainPipelineBuilder.setTopologyStages(currentChainStage, output);
413+
return new DataStreamAction(this.mainPipelineBuilder, this.otherPipelineBuilders, output);
414+
}
405415
public DataStreamAction toFile(String filePath,boolean isAppend) {
406416
FileSink fileChannel = new FileSink(filePath,isAppend);
407417
ChainStage<?> output = mainPipelineBuilder.createStage(fileChannel);
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package org.apache.rocketmq.streams.client;
2+
3+
import org.apache.rocketmq.streams.common.utils.FileUtil;
4+
import org.apache.rocketmq.streams.script.operator.impl.FunctionScript;
5+
import org.junit.Test;
6+
7+
public class ScriptOptimizationTest {
8+
9+
@Test
10+
public void testScriptOptimization(){
11+
String scriptValue= FileUtil.loadFileContent("/Users/yuanxiaodong/Downloads/script.txt");
12+
FunctionScript functionScript=new FunctionScript(scriptValue);
13+
functionScript.init();
14+
15+
}
16+
}

rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/SourceTest.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
*/
1717
package org.apache.rocketmq.streams.client;
1818

19+
import org.apache.rocketmq.streams.client.source.DataStreamSource;
20+
import org.apache.rocketmq.streams.client.transform.DataStream;
1921
import org.apache.rocketmq.streams.common.channel.impl.file.FileSource;
2022
import org.apache.rocketmq.streams.common.context.AbstractContext;
2123
import org.apache.rocketmq.streams.common.context.IMessage;
@@ -37,4 +39,13 @@ public Object doMessage(IMessage message, AbstractContext context) {
3739
}
3840
});
3941
}
42+
43+
44+
@Test
45+
public void testImportMsgFromSource(){
46+
DataStreamSource.create("tmp","tmp")
47+
.fromRocketmq("TOPIC_AEGIS_DETECT_MSG","chris_test","T_MSG_PROC",true,null)
48+
.toFile("/tmp/aegis_proc.txt",true)
49+
.start();
50+
}
4051
}

rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/AbstractWindowTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,11 @@ public JSONObject map(JSONObject message) throws Exception {
5555
})
5656
.window(TumblingWindow.of(Time.seconds(5)))
5757
.fireMode(2).waterMark(100000000)
58-
.setMaxMsgGap(80L)
58+
.setMaxMsgGap(10L)
5959
.groupBy("name")
6060
.setTimeField("time")
6161
.sum("total","sum_total")
62-
.setLocalStorageOnly(true)
62+
.setLocalStorageOnly(isLocalOnly)
6363
.toDataSteam()
6464
.forEach(new ForEachFunction<JSONObject>() {
6565
AtomicInteger sum = new AtomicInteger(0) ;

rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/cache/compress/BitSetCache.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,16 @@
22

33
import org.apache.rocketmq.streams.common.utils.NumberUtils;
44

5+
/**
6+
* key:list boolean value
7+
*/
58
public class BitSetCache {
69
protected ByteArrayValueKV cache;
710
protected int byteSetSize;
811
protected int capacity;
12+
protected int bitSetSize;
913

10-
private class BitSet{
14+
public class BitSet{
1115
private byte[] bytes;
1216

1317
public BitSet(){
@@ -17,7 +21,7 @@ public BitSet(byte[] bytes){
1721
this.bytes=bytes;
1822
}
1923
public void set(int index){
20-
if(index>byteSetSize){
24+
if(index>bitSetSize){
2125
throw new RuntimeException("the index exceed max index, max index is "+byteSetSize+", real is "+index);
2226
}
2327
int byteIndex=index/8;
@@ -27,7 +31,7 @@ public void set(int index){
2731
bytes[byteIndex]=byteElement;
2832
}
2933
public boolean get(int index){
30-
if(index>byteSetSize){
34+
if(index>bitSetSize){
3135
throw new RuntimeException("the index exceed max index, max index is "+byteSetSize+", real is "+index);
3236
}
3337
int byteIndex=index/8;
@@ -51,6 +55,7 @@ public BitSetCache(int bitSetSize, int capacity){
5155
cache=new ByteArrayValueKV(capacity,true);
5256
this.byteSetSize=bitSetSize/8+bitSetSize%8;
5357
this.capacity=capacity;
58+
this.bitSetSize=bitSetSize;
5459
}
5560

5661

rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/AbstractChannel.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,14 @@ public void getJsonObject(JSONObject jsonObject) {
8181

8282
}
8383

84+
@Override public boolean checkpoint(Set<String> splitIds) {
85+
return sink.checkpoint(splitIds);
86+
}
87+
88+
@Override public boolean checkpoint(String... splitIds) {
89+
return sink.checkpoint(splitIds);
90+
}
91+
8492
@Override
8593
public boolean flush(String... splitIds) {
8694
return sink.flush(splitIds);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package org.apache.rocketmq.streams.common.channel.impl.transit;
2+
3+
import com.alibaba.fastjson.JSONObject;
4+
import com.google.auto.service.AutoService;
5+
import java.util.Properties;
6+
import org.apache.rocketmq.streams.common.channel.builder.IChannelBuilder;
7+
import org.apache.rocketmq.streams.common.channel.impl.memory.MemorySource;
8+
import org.apache.rocketmq.streams.common.channel.sink.ISink;
9+
import org.apache.rocketmq.streams.common.channel.source.ISource;
10+
import org.apache.rocketmq.streams.common.metadata.MetaData;
11+
import org.apache.rocketmq.streams.common.model.ServiceName;
12+
import org.apache.rocketmq.streams.common.utils.ConfigurableUtil;
13+
14+
@AutoService(IChannelBuilder.class)
15+
@ServiceName(value = TransitChannelBuilder.TYPE, aliasName = "cache_table")
16+
public class TransitChannelBuilder implements IChannelBuilder {
17+
public static final String TYPE = "transit";
18+
19+
@Override public ISource createSource(String namespace, String name, Properties properties, MetaData metaData) {
20+
return (TransitSource)ConfigurableUtil.create(TransitSource.class.getName(), namespace, name, createFormatProperty(properties), null);
21+
}
22+
23+
@Override public String getType() {
24+
return TYPE;
25+
}
26+
27+
@Override public ISink createSink(String namespace, String name, Properties properties, MetaData metaData) {
28+
return (TransitSink)ConfigurableUtil.create(TransitSink.class.getName(), namespace, name, createFormatProperty(properties), null);
29+
}
30+
31+
32+
/**
33+
* 创建标准的属性文件
34+
*
35+
* @param properties
36+
* @return
37+
*/
38+
protected JSONObject createFormatProperty(Properties properties) {
39+
JSONObject formatProperties = new JSONObject();
40+
for (Object object : properties.keySet()) {
41+
String key = (String)object;
42+
if ("type".equals(key)) {
43+
continue;
44+
}
45+
formatProperties.put(key, properties.getProperty(key));
46+
}
47+
IChannelBuilder.formatPropertiesName(formatProperties, properties, "logFingerprintFieldNames", "filterFieldNames");
48+
IChannelBuilder.formatPropertiesName(formatProperties, properties, "tableName", "sql_create_table_name");
49+
// IChannelBuilder.formatPropertiesName(formatProperties, properties, "maxThread", "thread.max.count");
50+
return formatProperties;
51+
}
52+
}

0 commit comments

Comments
 (0)