diff --git a/pom.xml b/pom.xml index ab8825d2..7bbe1f42 100644 --- a/pom.xml +++ b/pom.xml @@ -70,7 +70,7 @@ 3.2.13.RELEASE 1.0-rc5 5.1.40 - 1.2.27 + 1.2.78 2.2.1 4.5.2 2.5 diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainStage.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainStage.java index 1916291d..d1b7ab6b 100644 --- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainStage.java +++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/ChainStage.java @@ -152,7 +152,9 @@ public void sendSystem(IMessage message, AbstractContext context, Pipeline... pi } Set set = new HashSet<>(); for (Pipeline pipeline : pipelines) { - set.add((ChainPipeline)pipeline); + if (pipeline != null) { + set.add((ChainPipeline)pipeline); + } } sendSystem(message, context, set); } diff --git a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/model/AbstractStage.java b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/model/AbstractStage.java index 9860c51e..1a2d8b1a 100644 --- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/model/AbstractStage.java +++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/model/AbstractStage.java @@ -84,7 +84,12 @@ public T doMessage(T t, AbstractContext context) { context.breakExecute(); return null; } - TraceUtil.debug(t.getHeader().getTraceId(), "AbstractStage", label, t.getMessageBody().toJSONString()); + try { + + TraceUtil.debug(t.getHeader().getTraceId(), "AbstractStage", label, t.getMessageBody().toJSONString()); + } catch (Exception e) { + LOG.error("t.getMessageBody() parse error", e); + } IStageHandle handle = selectHandle(t, context); if (handle == null) { return t; diff --git a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/string/SubStringIndexFunction.java b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/string/SubStringIndexFunction.java index 705a5fc3..3d50f018 100644 --- a/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/string/SubStringIndexFunction.java +++ b/rocketmq-streams-script/src/main/java/org/apache/rocketmq/streams/script/function/impl/string/SubStringIndexFunction.java @@ -85,6 +85,12 @@ public String substringindex(IMessage message, FunctionContext context, @FunctionParamter(comment = "指定用于拆分原始字段的字符代表列名称或常量值", value = "string") Integer startIndex, @FunctionParamter(comment = "指定用于拆分原始字段的字符代表列名称或常量值", value = "string") Integer endIndex) { oriMsg = FunctionUtils.getValueString(message, context, oriMsg); + int msgLength = oriMsg.length(); + if (startIndex >= msgLength) { + return ""; + } else if (endIndex > msgLength) { + endIndex = msgLength; + } return oriMsg.substring(startIndex, endIndex); } diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowInstance.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowInstance.java index ebcc0fe4..c5759767 100644 --- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowInstance.java +++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowInstance.java @@ -109,6 +109,10 @@ public String createWindowInstanceId() { return MapKeyUtil.createKey(splitId, windowNameSpace, windowName, windowInstanceName, startTime, endTime); } + public String createWindowInstanceIdWithoutSplitid() { + return MapKeyUtil.createKey(windowNameSpace, windowName, windowInstanceName, startTime, endTime); + } + public String createWindowInstanceTriggerId(){ return MapKeyUtil.createKey(splitId, windowNameSpace, windowName, windowInstanceName, startTime, endTime,fireTime); } diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/OverWindow.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/OverWindow.java index b8b74f80..86fa99bf 100644 --- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/OverWindow.java +++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/OverWindow.java @@ -128,7 +128,7 @@ public boolean isSynchronous() { @Override protected boolean initConfigurable() { - return true; + return super.initConfigurable(); } @Override diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java index 65796477..920d525d 100644 --- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java +++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.rocketmq.streams.common.utils.TraceUtil; import org.apache.rocketmq.streams.common.context.AbstractContext; import org.apache.rocketmq.streams.common.context.Context; import org.apache.rocketmq.streams.common.context.IMessage; @@ -40,6 +41,7 @@ import org.apache.rocketmq.streams.window.state.impl.JoinLeftState; import org.apache.rocketmq.streams.window.state.impl.JoinRightState; import org.apache.rocketmq.streams.window.state.impl.JoinState; +import org.apache.rocketmq.streams.window.storage.ShufflePartitionManager; public class JoinWindow extends AbstractShuffleWindow { @@ -73,6 +75,7 @@ protected boolean initConfigurable() { // // } + @Override protected int fireWindowInstance(WindowInstance instance, String shuffleId, Map queueId2Offsets) { clearFire(instance); @@ -81,7 +84,8 @@ protected int fireWindowInstance(WindowInstance instance, String shuffleId, Map< @Override public void clearCache(String queueId) { - + getStorage().clearCache(shuffleChannel.getChannelQueue(queueId),getWindowBaseValueClass()); + ShufflePartitionManager.getInstance().clearSplit(queueId); } @Override @@ -110,8 +114,7 @@ public void shuffleCalculate(List messages, WindowInstance instance, S } for (IMessage msg : messages) { - MessageHeader header = JSONObject.parseObject(msg.getMessageBody(). - getString(WindowCache.ORIGIN_MESSAGE_HEADER), MessageHeader.class); + MessageHeader header = msg.getHeader(); String routeLabel = header.getMsgRouteFromLable(); // Map joinMessages = new HashMap<>(); String storeKeyPrefix = ""; @@ -128,7 +131,11 @@ public void shuffleCalculate(List messages, WindowInstance instance, S List tmpMessages = new ArrayList<>(); int count = 0; while (iterator.hasNext()) { - tmpMessages.add(iterator.next()); + WindowBaseValue windowBaseValue = iterator.next(); + if (windowBaseValue == null) { + continue; + } + tmpMessages.add(windowBaseValue); count++; if (count == 100) { sendMessage(msg, tmpMessages); @@ -146,7 +153,9 @@ private Iterator getMessageIterator(String queueId, WindowInsta List instances = new ArrayList<>(); for (Map.Entry entry : this.windowInstanceMap.entrySet()) { - instances.add(entry.getValue()); + if (queueId.equalsIgnoreCase(entry.getValue().getSplitId())) { + instances.add(entry.getValue()); + } } Iterator windowInstanceIter = instances.iterator(); return new Iterator() { @@ -159,9 +168,9 @@ public boolean hasNext() { if (iterator != null && iterator.hasNext()) { return true; } - while (windowInstanceIter.hasNext()) { + if (windowInstanceIter.hasNext()) { WindowInstance instance = windowInstanceIter.next(); - iterator = storage.loadWindowInstanceSplitData(null, queueId, + iterator = storage.loadWindowInstanceSplitData(null, null, instance.createWindowInstanceId(), keyPrefix, clazz); @@ -217,17 +226,64 @@ public WindowBaseValue next() { } - public List connectJoin(IMessage message, List> rows, String joinType, String rightAsName) { + public List connectJoin(IMessage message, List> rows, String joinType, + String rightAsName) { List result = new ArrayList<>(); if (rows.size() <= 0) { return result; } if ("inner".equalsIgnoreCase(joinType)) { result = connectInnerJoin(message, rows, rightAsName); + } else if ("left".equalsIgnoreCase(joinType)) { + result = connectLeftJoin(message, rows, rightAsName); + } + return result; + } + + private List connectLeftJoin(IMessage message, List> rows, String rightAsName) { + + List result = new ArrayList<>(); + String routeLabel = message.getHeader().getMsgRouteFromLable(); + JSONObject messageBody = message.getMessageBody(); + String traceId = message.getHeader().getTraceId(); + int index = 1; + if (LABEL_LEFT.equalsIgnoreCase(routeLabel) && rows.size() > 0) { + for (Map raw : rows) { + // addAsName(raw, rightAsName); + JSONObject object = (JSONObject)messageBody.clone(); + object.fluentPutAll(addAsName(raw, rightAsName)); + object.put(TraceUtil.TRACE_ID_FLAG, traceId + "-" + index); + index++; + result.add(object); + } + } else if (LABEL_LEFT.equalsIgnoreCase(routeLabel) && rows.size() <= 0) { + JSONObject object = (JSONObject) messageBody.clone(); + object.put(TraceUtil.TRACE_ID_FLAG, traceId + "-" + index); + result.add(object); + } else { + messageBody = addAsName(messageBody, rightAsName); + for (Map raw : rows) { + JSONObject object = (JSONObject)messageBody.clone(); + object.fluentPutAll(raw); + object.put(TraceUtil.TRACE_ID_FLAG, traceId + "-" + index); + index++; + result.add(object); + } + } + + + + if (rows != null && rows.size() > 0) { + for (Map raw : rows) { + JSONObject object = (JSONObject) messageBody.clone(); + object.fluentPutAll(raw); + result.add(object); + } + return result; + } + if (LABEL_LEFT.equalsIgnoreCase(routeLabel)) { + result.add(messageBody); } - // else if ("left".equalsIgnoreCase(joinType)) { - // result = connectLeftJoin(message, rows, rightAsName); - // } return result; } @@ -241,12 +297,16 @@ public List connectJoin(IMessage message, List> public List connectInnerJoin(IMessage message, List> rows, String rightAsName) { List result = new ArrayList<>(); String routeLabel = message.getHeader().getMsgRouteFromLable(); + String traceId = message.getHeader().getTraceId(); + int index = 1; if (LABEL_LEFT.equalsIgnoreCase(routeLabel)) { JSONObject messageBody = message.getMessageBody(); for (Map raw : rows) { // addAsName(raw, rightAsName); JSONObject object = (JSONObject)messageBody.clone(); object.fluentPutAll(addAsName(raw, rightAsName)); + object.put(TraceUtil.TRACE_ID_FLAG, traceId + "-" + index); + index++; result.add(object); } } else { @@ -255,6 +315,8 @@ public List connectInnerJoin(IMessage message, List raw : rows) { JSONObject object = (JSONObject)messageBody.clone(); object.fluentPutAll(raw); + object.put(TraceUtil.TRACE_ID_FLAG, traceId + "-" + index); + index++; result.add(object); } } @@ -284,14 +346,9 @@ private JSONObject addAsName(Map raw, String rightAsName) { */ protected String createStoreKey(IMessage message, String routeLabel, WindowInstance windowInstance) { String shuffleKey = message.getMessageBody().getString(WindowCache.SHUFFLE_KEY); - String shuffleId = shuffleChannel.getChannelQueue(shuffleKey).getQueueId(); String orginQueueId = message.getMessageBody().getString(WindowCache.ORIGIN_QUEUE_ID); String originOffset = message.getMessageBody().getString(WindowCache.ORIGIN_OFFSET); - String windowNamespace = getNameSpace(); - String windowName = getConfigureName(); - String startTime = windowInstance.getStartTime(); - String endTime = windowInstance.getEndTime(); - String storeKey = MapKeyUtil.createKey(shuffleId, windowNamespace, windowName, startTime, endTime, shuffleKey, routeLabel, orginQueueId, originOffset); + String storeKey = MapKeyUtil.createKey(windowInstance.createWindowInstanceId(), shuffleKey, routeLabel, orginQueueId, originOffset); return storeKey; } @@ -327,6 +384,8 @@ private JoinState createJoinState(IMessage message, WindowInstance instance, Str JSONObject messageBody = (JSONObject)message.getMessageBody().clone(); messageBody.remove("WindowInstance"); messageBody.remove("AbstractWindow"); + messageBody.remove(WindowCache.ORIGIN_MESSAGE_HEADER); + messageBody.remove("MessageHeader"); JoinState state = null; if ("left".equalsIgnoreCase(routeLabel)) { @@ -396,47 +455,41 @@ public Class getWindowBaseValueClass() { return JoinState.class; } - // @Override - // public void finishWindowProcessAndSend2Receiver(List messageList,WindowInstance windowInstance) { - // for (IMessage message : messageList) { - // List> result = joinOperator.dealJoin(message); - // List> rows = matchRows(message.getMessageBody(), result); - // String rightAsName = message.getMessageBody().getString("rightAsName"); - // String joinType = message.getMessageBody().getString("joinType"); - // List connectMsgs = joinOperator.connectJoin(message, rows, joinType, rightAsName); - // for (int i=0; i < connectMsgs.size(); i++) { - // if (i == connectMsgs.size() -1) { - // sendMessage(connectMsgs.get(i), true); - // } else { - // sendMessage(connectMsgs.get(i), false); - // } - // - // } - // - // } - // //todo 完成处理 - // //todo 发送消息到下一个节点 sendFireMessage(); - // } /** * window触发后的清理工作 - * @param windowInstances - */ - /** - * 删除掉触发过的数据 - * - * @param instance + * @param windowInstance */ @Override - public void clearFireWindowInstance(WindowInstance instance) { - if(instance==null){ - return; + public void clearFireWindowInstance(WindowInstance windowInstance) { +// String partitionNum=(getOrderBypPrefix()+ windowInstance.getSplitId()); + + List removeInstances = new ArrayList<>(); + + Date clearTime = DateUtil.addSecond(DateUtil.parse(windowInstance.getStartTime()), -sizeInterval * (retainWindowCount - 1) * 60); + Iterator iterable = this.windowInstanceMap.keySet().iterator(); + while (iterable.hasNext()) { + WindowInstance instance = this.windowInstanceMap.get(iterable.next()); + Date startTime = DateUtil.parse(instance.getStartTime()); + if (DateUtil.dateDiff(clearTime, startTime) >= 0) { + removeInstances.add(instance); + iterable.remove(); + } } - WindowInstance.clearInstance(instance); - joinOperator.cleanMessage(instance.getWindowNameSpace(), instance.getWindowName(), this.getRetainWindowCount(), - this.getSizeInterval(), instance.getStartTime()); - //todo windowinstace - //todo left+right + + for (WindowInstance instance : removeInstances) { + + windowMaxValueManager.deleteSplitNum(instance, instance.getSplitId()); + ShufflePartitionManager.getInstance().clearWindowInstance(instance.createWindowInstanceId()); + storage.delete(instance.createWindowInstanceId(),null,WindowBaseValue.class,sqlCache); + if(!isLocalStorageOnly){ + WindowInstance.clearInstance(instance,sqlCache); + joinOperator.cleanMessage(instance.getWindowNameSpace(), instance.getWindowName(), this.getRetainWindowCount(), + this.getSizeInterval(), windowInstance.getStartTime()); + } + } + + } protected List> matchRows(JSONObject msg, List> rows) { diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java index cb6cb4e3..ba17a442 100644 --- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java +++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java @@ -67,7 +67,7 @@ public class ShuffleChannel extends AbstractSystemChannel { protected static final Log LOG = LogFactory.getLog(ShuffleChannel.class); protected static final String SHUFFLE_QUEUE_ID = "SHUFFLE_QUEUE_ID"; - + protected static final String SHUFFLE_OFFSET = "SHUFFLE_OFFSET"; protected static final String SHUFFLE_MESSAGES = "SHUFFLE_MESSAGES"; protected String MSG_OWNER = "MSG_OWNER";//消息所属的window @@ -158,6 +158,7 @@ public Object doMessage(IMessage oriMessage, AbstractContext context) { for (Object obj : messages) { IMessage message = new Message((JSONObject) obj); message.getHeader().setQueueId(queueId); + message.getMessageBody().put(SHUFFLE_OFFSET, oriMessage.getHeader().getOffset()); window.updateMaxEventTime(message); if (isRepeateMessage(message, queueId)) { continue; diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/state/impl/WindowValue.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/state/impl/WindowValue.java index b4ae9042..e04bd161 100644 --- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/state/impl/WindowValue.java +++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/state/impl/WindowValue.java @@ -264,7 +264,7 @@ public boolean calculate(AbstractWindow window, IMessage message) { calProjectColumn(window, message); String traceId = message.getMessageBody().getString(WindowCache.ORIGIN_MESSAGE_TRACE_ID); if (!StringUtil.isEmpty(traceId)) { - TraceUtil.debug(traceId, "window value result", getComputedColumnResult()); + TraceUtil.debug(traceId, "window value result", decodeSQLContent(getComputedColumnResult())); } } catch (Exception e) { LOG.error("failed in calculating the message", e);