Skip to content

Commit dbd4f77

Browse files
authored
Merge pull request apache#77 from programer-0/develop
Add fingerprinting to the client
2 parents 27b184b + 85a6221 commit dbd4f77

File tree

16 files changed

+277
-186
lines changed

16 files changed

+277
-186
lines changed

rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/source/DataStreamSource.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,12 @@
1717

1818
package org.apache.rocketmq.streams.client.source;
1919

20+
import java.util.Properties;
21+
import javax.sql.DataSource;
2022
import org.apache.rocketmq.streams.client.transform.DataStream;
2123
import org.apache.rocketmq.streams.common.channel.impl.file.FileSource;
2224
import org.apache.rocketmq.streams.common.channel.source.ISource;
25+
import org.apache.rocketmq.streams.common.component.ComponentCreator;
2326
import org.apache.rocketmq.streams.common.topology.builder.PipelineBuilder;
2427
import org.apache.rocketmq.streams.source.RocketMQSource;
2528

@@ -32,10 +35,23 @@ public DataStreamSource(String namespace, String pipelineName) {
3235
this.mainPipelineBuilder = new PipelineBuilder(namespace, pipelineName);
3336
}
3437

38+
public DataStreamSource(String namespace, String pipelineName, String[] duplicateKeys, Long windowSize) {
39+
this.mainPipelineBuilder = new PipelineBuilder(namespace, pipelineName);
40+
Properties properties = new Properties();
41+
properties.setProperty(pipelineName + ".duplicate.fields.names", String.join(";", duplicateKeys));
42+
properties.setProperty(pipelineName + ".duplicate.expiration.time", String.valueOf(windowSize));
43+
ComponentCreator.createProperties(properties);
44+
}
45+
3546
public static DataStreamSource create(String namespace, String pipelineName) {
3647
return new DataStreamSource(namespace, pipelineName);
3748
}
3849

50+
public static DataStreamSource create(String namespace, String pipelineName, String[] duplicateKeys,
51+
Long expirationTime) {
52+
return new DataStreamSource(namespace, pipelineName, duplicateKeys, expirationTime);
53+
}
54+
3955
public DataStream fromFile(String filePath) {
4056
return fromFile(filePath, true);
4157
}
@@ -68,7 +84,7 @@ public DataStream fromRocketmq(String topic, String groupName, String tags, bool
6884

6985
public DataStream from(ISource<?> source) {
7086
this.mainPipelineBuilder.setSource(source);
71-
return new DataStream(this.mainPipelineBuilder,null);
87+
return new DataStream(this.mainPipelineBuilder, null);
7288
}
7389

7490
}

rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/strategy/WindowStrategy.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@ public static Strategy highPerformance() {
5353
return new WindowStrategy();
5454
}
5555

56-
public static Strategy windowDefaultSiZe(int defualtSize){
57-
ComponentCreator.getProperties().put(ConfigureFileKey.DIPPER_WINDOW_DEFAULT_INERVAL_SIZE,defualtSize);
56+
public static Strategy windowDefaultSiZe(int defualtSize) {
57+
ComponentCreator.getProperties().put(ConfigureFileKey.DIPPER_WINDOW_DEFAULT_INERVAL_SIZE, defualtSize);
5858
return null;
5959
}
6060

rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/DataStream.java

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,6 @@ protected <T> T operate(IMessage message, AbstractContext context) {
143143
splitMessages.add(subMessage);
144144
}
145145
context.openSplitModel();
146-
;
147146
context.setSplitMessages(splitMessages);
148147
return null;
149148
} catch (Exception e) {
@@ -158,13 +157,17 @@ protected <T> T operate(IMessage message, AbstractContext context) {
158157
}
159158

160159
public <O> DataStream filter(final FilterFunction<O> filterFunction) {
161-
StageBuilder mapUDFOperator = new StageBuilder() {
160+
return filter(filterFunction, new String[] {});
161+
}
162162

163+
public <O> DataStream filter(final FilterFunction<O> filterFunction, String... fingerprints) {
164+
StageBuilder mapUDFOperator = new StageBuilder() {
163165
@Override
164166
protected <T> T operate(IMessage message, AbstractContext context) {
165167
try {
166-
boolean isFilter = filterFunction.filter((O) message.getMessageValue());
167-
if (isFilter) {
168+
boolean tag = filterFunction.filter((O) message.getMessageValue());
169+
if (!tag) {
170+
context.put("NEED_USE_FINGER_PRINT", true);
168171
context.breakExecute();
169172
}
170173
} catch (Exception e) {
@@ -175,6 +178,24 @@ protected <T> T operate(IMessage message, AbstractContext context) {
175178
};
176179
ChainStage stage = this.mainPipelineBuilder.createStage(mapUDFOperator);
177180
this.mainPipelineBuilder.setTopologyStages(currentChainStage, stage);
181+
182+
if (fingerprints.length > 0) {
183+
ChainPipeline<?> pipeline = this.mainPipelineBuilder.getPipeline();
184+
String filterName = stage.getLabel();
185+
if (!pipeline.isTopology()) {
186+
List<?> stages = pipeline.getStages();
187+
int i = 0;
188+
for (Object st : stages) {
189+
if (st == stage) {
190+
break;
191+
}
192+
i++;
193+
}
194+
filterName = i + "";
195+
}
196+
String key = MapKeyUtil.createKeyBySign(".", pipeline.getNameSpace(), pipeline.getConfigureName(), filterName);
197+
ComponentCreator.getProperties().setProperty(key, String.join(",", fingerprints));
198+
}
178199
return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, stage);
179200
}
180201

@@ -463,7 +484,8 @@ public DataStreamAction toRocketmq(String topic, String tags, String groupName,
463484
return toRocketmq(topic, tags, groupName, -1, nameServerAddress, clusterName, order);
464485
}
465486

466-
public DataStreamAction toRocketmq(String topic, String tags, String groupName, int batchSize, String nameServerAddress,
487+
public DataStreamAction toRocketmq(String topic, String tags, String groupName, int batchSize,
488+
String nameServerAddress,
467489
String clusterName, boolean order) {
468490
RocketMQSink rocketMQSink = new RocketMQSink();
469491
if (StringUtils.isNotBlank(topic)) {

rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/DataStreamTest.java

Lines changed: 53 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -46,74 +46,86 @@ public void init() {
4646
@Test
4747
public void testFromFile() {
4848
dataStream
49-
.fromFile("/Users/junjie.cheng/text.txt", false)
50-
.map(message -> message + "--")
51-
.toPrint(1)
52-
.start();
49+
.fromFile("/Users/junjie.cheng/text.txt", false)
50+
.map(message -> message + "--")
51+
.toPrint(1)
52+
.start();
5353
}
5454

5555
@Test
5656
public void testRocketmq() {
5757
DataStreamSource dataStream = StreamBuilder.dataStream("test_namespace", "graph_pipeline");
5858
dataStream
59-
.fromRocketmq("topic_xxxx01", "consumer_xxxx01", "127.0.0.1:9876")
60-
.map(message -> message + "--")
61-
.toPrint(1)
62-
.start();
59+
.fromRocketmq("topic_xxxx01", "consumer_xxxx01", "127.0.0.1:9876")
60+
.map(message -> message + "--")
61+
.toPrint(1)
62+
.start();
6363
}
6464

6565
@Test
6666
public void testDBCheckPoint() {
6767
dataStream
68-
.fromRocketmq("topic_xxxx02", "consumer_xxxx02", "127.0.0.1:9876")
69-
.map(message -> message + "--")
70-
.toPrint(1)
71-
.with(WindowStrategy.exactlyOnce("", "", ""))
72-
.start();
68+
.fromRocketmq("topic_xxxx02", "consumer_xxxx02", "127.0.0.1:9876")
69+
.map(message -> message + "--")
70+
.toPrint(1)
71+
.with(WindowStrategy.exactlyOnce("", "", ""))
72+
.start();
7373
}
7474

7575
@Test
7676
public void testFileCheckPoint() {
7777
dataStream
78-
.fromFile("/Users/junjie.cheng/text.txt", false)
79-
.map(message -> message + "--")
80-
.toPrint(1)
81-
.with(WindowStrategy.highPerformance())
82-
.start();
78+
.fromFile("/Users/junjie.cheng/text.txt", false)
79+
.map(message -> message + "--")
80+
.toPrint(1)
81+
.with(WindowStrategy.highPerformance())
82+
.start();
8383
}
8484

85-
8685
@Test
8786
public void testWindow() {
8887
DataStreamSource dataStream = StreamBuilder.dataStream("test_namespace", "graph_pipeline");
8988
dataStream
90-
.fromRocketmq("topic_xxxx03", "consumer_xxxx03", "127.0.0.1:9876")
91-
.map(new MapFunction<JSONObject, String>() {
92-
93-
@Override
94-
public JSONObject map(String message) throws Exception {
95-
JSONObject msg = JSONObject.parseObject(message);
96-
return msg;
97-
}
98-
})
99-
.window(TumblingWindow.of(Time.seconds(5)))
100-
.groupBy("name", "age")
101-
.count("c")
102-
.sum("score", "scoreValue")
103-
.toDataSteam()
104-
.toPrint(1)
105-
.with(WindowStrategy.exactlyOnce("", "", ""))
106-
.start();
89+
.fromRocketmq("topic_xxxx03", "consumer_xxxx03", "127.0.0.1:9876")
90+
.map(new MapFunction<JSONObject, String>() {
91+
92+
@Override
93+
public JSONObject map(String message) throws Exception {
94+
JSONObject msg = JSONObject.parseObject(message);
95+
return msg;
96+
}
97+
})
98+
.window(TumblingWindow.of(Time.seconds(5)))
99+
.groupBy("name", "age")
100+
.count("c")
101+
.sum("score", "scoreValue")
102+
.toDataSteam()
103+
.toPrint(1)
104+
.with(WindowStrategy.exactlyOnce("", "", ""))
105+
.start();
106+
}
107+
108+
@Test
109+
public void testFingerPrintStrategy() {
110+
dataStream
111+
.fromFile("/Users/junjie.cheng/text.txt", false)
112+
.map(message -> message + "--")
113+
.toPrint(1)
114+
.start();
115+
107116
}
108117

109118
@Test
110119
public void testBothStrategy() {
111120
dataStream
112-
.fromRocketmq("topic_xxxx04", "consumer_xxxx04", "127.0.0.1:9876")
113-
.map(message -> message + "--")
114-
.toPrint(1)
115-
.with()
116-
.start();
121+
.fromRocketmq("topic_xxxx04", "consumer_xxxx04", "127.0.0.1:9876")
122+
.map(message -> message + "--")
123+
.filter(message -> {
124+
return true;
125+
})
126+
.toPrint(1)
127+
.with()
128+
.start();
117129
}
118130

119131
@Test

rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/MessageHeader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ public MessageHeader copy() {
126126
header.msgRouteFromLable = msgRouteFromLable;
127127
header.logFingerprintValue = logFingerprintValue;
128128
header.messageQueue = messageQueue;
129-
header.checkpointQueueIds=checkpointQueueIds;
129+
header.checkpointQueueIds = checkpointQueueIds;
130130
return header;
131131
}
132132

rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/functions/FilterFunction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,5 @@
1919
public interface FilterFunction<T> extends Function {
2020

2121
boolean filter(T value) throws Exception;
22+
2223
}

rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/optimization/HyperscanRegex.java

Lines changed: 52 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.rocketmq.streams.common.optimization;
1818

19+
import com.gliwka.hyperscan.wrapper.CompileErrorException;
1920
import com.gliwka.hyperscan.wrapper.Database;
2021
import com.gliwka.hyperscan.wrapper.Expression;
2122
import com.gliwka.hyperscan.wrapper.ExpressionFlag;
@@ -27,23 +28,29 @@
2728
import java.util.List;
2829
import java.util.Set;
2930
import java.util.concurrent.atomic.AtomicBoolean;
31+
import org.apache.rocketmq.streams.common.utils.StringUtil;
3032

3133
public class HyperscanRegex<T> {
32-
protected List<Expression> regexs = new ArrayList<>();
34+
protected List<Expression> allRegexes = new ArrayList<>();//all registe regex
35+
3336
protected Database db;
3437
protected Scanner scanner;
3538
protected AtomicBoolean hasCompile = new AtomicBoolean(false);
36-
protected List<T> list = new ArrayList<>();
39+
protected List<T> expressionContextList = new ArrayList<>();
40+
41+
protected List<Expression> notSupportCompileExpression = new ArrayList<>();//can not comile expressions
42+
protected List<Expression> supportCompileExpression = new ArrayList<>();//all regex exclude not support compile
3743

3844
/**
3945
* 把多个表达式放到库里
4046
*
4147
* @param regex
4248
*/
4349
public void addRegex(String regex, T context) {
44-
list.add(context);
45-
Expression expression = new Expression(regex, EnumSet.of(ExpressionFlag.UTF8, ExpressionFlag.CASELESS, ExpressionFlag.SINGLEMATCH), list.size() - 1);
46-
regexs.add(expression);
50+
expressionContextList.add(context);
51+
Expression expression = new Expression(regex, EnumSet.of(ExpressionFlag.UTF8, ExpressionFlag.CASELESS, ExpressionFlag.SINGLEMATCH), expressionContextList.size() - 1);
52+
allRegexes.add(expression);
53+
supportCompileExpression.add(expression);
4754
db = null;
4855
scanner = null;
4956
hasCompile.set(false);
@@ -53,17 +60,25 @@ public void addRegex(String regex, T context) {
5360
* 完成编译
5461
*/
5562
public void compile() {
56-
try {
57-
if (hasCompile.compareAndSet(false, true) && regexs.size() > 0) {
58-
Database db = Database.compile(regexs);
63+
if (!hasCompile.compareAndSet(false, true) || supportCompileExpression.size() == 0) {
64+
return;
65+
}
66+
while (true) {
67+
try {
68+
if (supportCompileExpression.size() == 0) {
69+
break;
70+
}
71+
Database db = Database.compile(supportCompileExpression);
5972
Scanner scanner = new Scanner();
6073
scanner.allocScratch(db);
6174
this.db = db;
6275
this.scanner = scanner;
76+
break;
77+
} catch (CompileErrorException e) {
78+
Expression expression = e.getFailedExpression();
79+
this.supportCompileExpression.remove(expression);
80+
this.notSupportCompileExpression.add(expression);
6381
}
64-
65-
} catch (Exception e) {
66-
System.out.println("can not support this regex " + e.getMessage());
6782
}
6883

6984
}
@@ -75,15 +90,14 @@ public void compile() {
7590
* @return
7691
*/
7792
public boolean match(String content) {
78-
if (scanner == null || db == null || hasCompile.get() == false) {
93+
if (scanner == null || db == null || !hasCompile.get()) {
7994
compile();
8095
}
81-
List<Match> matches = scanner.scan(db, content);
82-
if (matches.size() > 0) {
83-
return true;
84-
} else {
96+
if (content == null) {
8597
return false;
8698
}
99+
List<Match> matches = scanner.scan(db, content);
100+
return matches.size() > 0;
87101
}
88102

89103
/**
@@ -93,18 +107,34 @@ public boolean match(String content) {
93107
* @return
94108
*/
95109
public Set<T> matchExpression(String content) {
96-
if (scanner == null || db == null || hasCompile.get() == false) {
110+
if (scanner == null || db == null || !hasCompile.get()) {
97111
compile();
98112
}
113+
if (content == null) {
114+
return new HashSet<>();
115+
}
99116
List<Match> matches = scanner.scan(db, content);
100117
Set<T> fireExpressions = new HashSet<>();
101-
if (matches.size() == 0) {
102-
return fireExpressions;
118+
if (this.notSupportCompileExpression.size() > 0) {
119+
for (Expression expression : this.notSupportCompileExpression) {
120+
String regex = expression.getExpression();
121+
boolean isMatch = StringUtil.matchRegexCaseInsensitive(content, regex);
122+
if (isMatch) {
123+
int index = expression.getId();
124+
fireExpressions.add(expressionContextList.get(index));
125+
}
126+
}
103127
}
104-
for (Match match : matches) {
105-
Integer index = match.getMatchedExpression().getId();
106-
fireExpressions.add(list.get(index));
128+
if (matches.size() > 0) {
129+
for (Match match : matches) {
130+
Integer index = match.getMatchedExpression().getId();
131+
fireExpressions.add(expressionContextList.get(index));
132+
}
107133
}
108134
return fireExpressions;
109135
}
136+
137+
public int size() {
138+
return allRegexes.size();
139+
}
110140
}

0 commit comments

Comments
 (0)