Skip to content

Commit 78a606c

Browse files
authored
[ISSUE apache#199]Support rsqldb (apache#200)
* feat(nest join) support nest join * maintain(example) remove MqttSourceExample
1 parent 0216f4b commit 78a606c

File tree

7 files changed

+40
-46
lines changed

7 files changed

+40
-46
lines changed

rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sinkcache/impl/MessageCache.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.rocketmq.streams.common.channel.sinkcache.DataSourceAutoFlushTask;
2727
import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache;
2828
import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack;
29+
import org.apache.rocketmq.streams.common.context.Message;
2930
import org.apache.rocketmq.streams.common.schedule.ScheduleManager;
3031
import org.apache.rocketmq.streams.common.schedule.ScheduleTask;
3132

rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/context/MessageHeader.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,8 @@ public class MessageHeader {
9999

100100
protected String msgRouteFromLable;//消息从哪里来的标签,标记上游节点的标记,主要是通过build table name来标记
101101

102+
private String originTable;
103+
102104
protected String logFingerprintValue;//日志指纹的值
103105

104106
public MessageHeader copy() {
@@ -359,4 +361,12 @@ public String getPipelineName() {
359361
public void setPipelineName(String pipelineName) {
360362
this.pipelineName = pipelineName;
361363
}
364+
365+
public String getOriginTable() {
366+
return originTable;
367+
}
368+
369+
public void setOriginTable(String originTable) {
370+
this.originTable = originTable;
371+
}
362372
}

rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainPipeline.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,7 @@ public void doNextStages(AbstractContext context, String msgPrevSourceName, Stri
290290
//boolean needFlush = needFlush(msg);
291291
if (StringUtil.isNotEmpty(oriMsgPrewSourceName)) {
292292
msg.getHeader().setMsgRouteFromLable(oriMsgPrewSourceName);
293+
msg.getHeader().setOriginTable(oriMsgPrewSourceName);
293294
}
294295
boolean isContinue = executeStage(stage, msg, copyContext);
295296

rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/stages/JoinChainStage.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,14 @@ public class JoinChainStage<T extends IMessage> extends AbstractWindowStage<T> {
4444
@Override
4545
protected IMessage doProcess(IMessage message, AbstractContext context) {
4646
String lable = message.getHeader().getMsgRouteFromLable();
47+
String originTable = message.getHeader().getOriginTable();
48+
4749
String joinFlag = null;
4850
if (lable != null) {
51+
if ((lable.equals("left") || lable.equals("right")) && originTable != null) {
52+
lable = originTable;
53+
}
54+
4955
if (lable.equals(rightDependentTableName)) {
5056
joinFlag = MessageHeader.JOIN_RIGHT;
5157
} else {
@@ -61,9 +67,7 @@ protected IMessage doProcess(IMessage message, AbstractContext context) {
6167
} else {
6268
rightPipeline.doMessage(message, context);
6369
}
64-
//if(!MessageGloableTrace.existFinshBranch(message)){
65-
// context.setBreak(true);
66-
//}
70+
6771
context.breakExecute();
6872
return message;
6973
}

rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/source/MqttSourceExample.java

Lines changed: 0 additions & 42 deletions
This file was deleted.

rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,6 @@ public void shuffleCalculate(List<IMessage> messages, WindowInstance instance, S
102102

103103
if (WindowJoinType.left.name().equalsIgnoreCase(routeLabel)) {
104104
storage.putWindowBaseValue(queueId, windowInstanceId, WindowType.JOIN_WINDOW, WindowJoinType.left, temp);
105-
106105
} else if (WindowJoinType.right.name().equalsIgnoreCase(routeLabel)) {
107106
storage.putWindowBaseValue(queueId, windowInstanceId, WindowType.JOIN_WINDOW, WindowJoinType.right, temp);
108107
} else {
@@ -490,6 +489,22 @@ protected void sendMessage(JSONObject message, boolean needFlush) {
490489
if (needFlush) {
491490
nextMessage.getHeader().setNeedFlush(true);
492491
}
492+
493+
String routeLabel = nextMessage.getHeader().getMsgRouteFromLable();
494+
if (routeLabel == null) {
495+
//嵌套join,内部join后没有routeLabel,需要设置结果的routeLabel
496+
String configureName = this.getConfigureName();
497+
String[] tempList = configureName.split("_");
498+
for (int i = tempList.length -1; i > 0; i--) {
499+
if ("left".equalsIgnoreCase(tempList[i]) || "right".equalsIgnoreCase(tempList[i])) {
500+
routeLabel = tempList[i];
501+
System.out.println("nested join, routeLabel=" + routeLabel);
502+
break;
503+
}
504+
}
505+
nextMessage.getHeader().setMsgRouteFromLable(routeLabel);
506+
}
507+
493508
AbstractContext context = new Context(nextMessage);
494509
boolean isWindowTest = ComponentCreator.getPropertyBooleanValue("window.fire.isTest");
495510
if (isWindowTest) {

rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,11 @@ public void addNewSplit(IMessage message, AbstractContext context, NewSplitMessa
229229
for (String splitId : splitIds) {
230230
this.loadResult.put(splitId, future);
231231
}
232+
233+
if (message.getHeader().isSystemMessage() && window.getFireReceiver() == null) {
234+
return;
235+
}
236+
232237
window.getFireReceiver().doMessage(message, context);
233238
}
234239

0 commit comments

Comments
 (0)