Skip to content

Joinwindow bug fix #61

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
<spring.version>3.2.13.RELEASE</spring.version>
<auto-service.version>1.0-rc5</auto-service.version>
<mysql-connector.version>5.1.40</mysql-connector.version>
<fastjson.version>1.2.27</fastjson.version>
<fastjson.version>1.2.78</fastjson.version>
<quartz.version>2.2.1</quartz.version>
<httpclient.version>4.5.2</httpclient.version>
<commons-io.version>2.5</commons-io.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,9 @@ public void sendSystem(IMessage message, AbstractContext context, Pipeline... pi
}
Set<ChainPipeline> set = new HashSet<>();
for (Pipeline pipeline : pipelines) {
set.add((ChainPipeline)pipeline);
if (pipeline != null) {
set.add((ChainPipeline)pipeline);
}
}
sendSystem(message, context, set);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,12 @@ public T doMessage(T t, AbstractContext context) {
context.breakExecute();
return null;
}
TraceUtil.debug(t.getHeader().getTraceId(), "AbstractStage", label, t.getMessageBody().toJSONString());
try {

TraceUtil.debug(t.getHeader().getTraceId(), "AbstractStage", label, t.getMessageBody().toJSONString());
} catch (Exception e) {
LOG.error("t.getMessageBody() parse error", e);
}
IStageHandle handle = selectHandle(t, context);
if (handle == null) {
return t;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ public String substringindex(IMessage message, FunctionContext context,
@FunctionParamter(comment = "指定用于拆分原始字段的字符代表列名称或常量值", value = "string") Integer startIndex,
@FunctionParamter(comment = "指定用于拆分原始字段的字符代表列名称或常量值", value = "string") Integer endIndex) {
oriMsg = FunctionUtils.getValueString(message, context, oriMsg);
int msgLength = oriMsg.length();
if (startIndex >= msgLength) {
return "";
} else if (endIndex > msgLength) {
endIndex = msgLength;
}
return oriMsg.substring(startIndex, endIndex);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ public String createWindowInstanceId() {
return MapKeyUtil.createKey(splitId, windowNameSpace, windowName, windowInstanceName, startTime, endTime);
}

public String createWindowInstanceIdWithoutSplitid() {
return MapKeyUtil.createKey(windowNameSpace, windowName, windowInstanceName, startTime, endTime);
}

public String createWindowInstanceTriggerId(){
return MapKeyUtil.createKey(splitId, windowNameSpace, windowName, windowInstanceName, startTime, endTime,fireTime);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public boolean isSynchronous() {

@Override
protected boolean initConfigurable() {
return true;
return super.initConfigurable();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.rocketmq.streams.common.utils.TraceUtil;
import org.apache.rocketmq.streams.common.context.AbstractContext;
import org.apache.rocketmq.streams.common.context.Context;
import org.apache.rocketmq.streams.common.context.IMessage;
Expand All @@ -40,6 +41,7 @@
import org.apache.rocketmq.streams.window.state.impl.JoinLeftState;
import org.apache.rocketmq.streams.window.state.impl.JoinRightState;
import org.apache.rocketmq.streams.window.state.impl.JoinState;
import org.apache.rocketmq.streams.window.storage.ShufflePartitionManager;

public class JoinWindow extends AbstractShuffleWindow {

Expand Down Expand Up @@ -73,6 +75,7 @@ protected boolean initConfigurable() {
//
// }


@Override
protected int fireWindowInstance(WindowInstance instance, String shuffleId, Map<String, String> queueId2Offsets) {
clearFire(instance);
Expand All @@ -81,7 +84,8 @@ protected int fireWindowInstance(WindowInstance instance, String shuffleId, Map<

@Override
public void clearCache(String queueId) {

getStorage().clearCache(shuffleChannel.getChannelQueue(queueId),getWindowBaseValueClass());
ShufflePartitionManager.getInstance().clearSplit(queueId);
}

@Override
Expand Down Expand Up @@ -110,8 +114,7 @@ public void shuffleCalculate(List<IMessage> messages, WindowInstance instance, S
}

for (IMessage msg : messages) {
MessageHeader header = JSONObject.parseObject(msg.getMessageBody().
getString(WindowCache.ORIGIN_MESSAGE_HEADER), MessageHeader.class);
MessageHeader header = msg.getHeader();
String routeLabel = header.getMsgRouteFromLable();
// Map<String,WindowBaseValue> joinMessages = new HashMap<>();
String storeKeyPrefix = "";
Expand All @@ -128,7 +131,11 @@ public void shuffleCalculate(List<IMessage> messages, WindowInstance instance, S
List<WindowBaseValue> tmpMessages = new ArrayList<>();
int count = 0;
while (iterator.hasNext()) {
tmpMessages.add(iterator.next());
WindowBaseValue windowBaseValue = iterator.next();
if (windowBaseValue == null) {
continue;
}
tmpMessages.add(windowBaseValue);
count++;
if (count == 100) {
sendMessage(msg, tmpMessages);
Expand All @@ -146,7 +153,9 @@ private Iterator<WindowBaseValue> getMessageIterator(String queueId, WindowInsta

List<WindowInstance> instances = new ArrayList<>();
for (Map.Entry<String, WindowInstance> entry : this.windowInstanceMap.entrySet()) {
instances.add(entry.getValue());
if (queueId.equalsIgnoreCase(entry.getValue().getSplitId())) {
instances.add(entry.getValue());
}
}
Iterator<WindowInstance> windowInstanceIter = instances.iterator();
return new Iterator<WindowBaseValue>() {
Expand All @@ -159,9 +168,9 @@ public boolean hasNext() {
if (iterator != null && iterator.hasNext()) {
return true;
}
while (windowInstanceIter.hasNext()) {
if (windowInstanceIter.hasNext()) {
WindowInstance instance = windowInstanceIter.next();
iterator = storage.loadWindowInstanceSplitData(null, queueId,
iterator = storage.loadWindowInstanceSplitData(null, null,
instance.createWindowInstanceId(),
keyPrefix,
clazz);
Expand Down Expand Up @@ -217,17 +226,64 @@ public WindowBaseValue next() {

}

public List<JSONObject> connectJoin(IMessage message, List<Map<String, Object>> rows, String joinType, String rightAsName) {
public List<JSONObject> connectJoin(IMessage message, List<Map<String, Object>> rows, String joinType,
String rightAsName) {
List<JSONObject> result = new ArrayList<>();
if (rows.size() <= 0) {
return result;
}
if ("inner".equalsIgnoreCase(joinType)) {
result = connectInnerJoin(message, rows, rightAsName);
} else if ("left".equalsIgnoreCase(joinType)) {
result = connectLeftJoin(message, rows, rightAsName);
}
return result;
}

private List<JSONObject> connectLeftJoin(IMessage message, List<Map<String, Object>> rows, String rightAsName) {

List<JSONObject> result = new ArrayList<>();
String routeLabel = message.getHeader().getMsgRouteFromLable();
JSONObject messageBody = message.getMessageBody();
String traceId = message.getHeader().getTraceId();
int index = 1;
if (LABEL_LEFT.equalsIgnoreCase(routeLabel) && rows.size() > 0) {
for (Map<String, Object> raw : rows) {
// addAsName(raw, rightAsName);
JSONObject object = (JSONObject)messageBody.clone();
object.fluentPutAll(addAsName(raw, rightAsName));
object.put(TraceUtil.TRACE_ID_FLAG, traceId + "-" + index);
index++;
result.add(object);
}
} else if (LABEL_LEFT.equalsIgnoreCase(routeLabel) && rows.size() <= 0) {
JSONObject object = (JSONObject) messageBody.clone();
object.put(TraceUtil.TRACE_ID_FLAG, traceId + "-" + index);
result.add(object);
} else {
messageBody = addAsName(messageBody, rightAsName);
for (Map<String, Object> raw : rows) {
JSONObject object = (JSONObject)messageBody.clone();
object.fluentPutAll(raw);
object.put(TraceUtil.TRACE_ID_FLAG, traceId + "-" + index);
index++;
result.add(object);
}
}



if (rows != null && rows.size() > 0) {
for (Map<String,Object> raw : rows) {
JSONObject object = (JSONObject) messageBody.clone();
object.fluentPutAll(raw);
result.add(object);
}
return result;
}
if (LABEL_LEFT.equalsIgnoreCase(routeLabel)) {
result.add(messageBody);
}
// else if ("left".equalsIgnoreCase(joinType)) {
// result = connectLeftJoin(message, rows, rightAsName);
// }
return result;
}

Expand All @@ -241,12 +297,16 @@ public List<JSONObject> connectJoin(IMessage message, List<Map<String, Object>>
public List<JSONObject> connectInnerJoin(IMessage message, List<Map<String, Object>> rows, String rightAsName) {
List<JSONObject> result = new ArrayList<>();
String routeLabel = message.getHeader().getMsgRouteFromLable();
String traceId = message.getHeader().getTraceId();
int index = 1;
if (LABEL_LEFT.equalsIgnoreCase(routeLabel)) {
JSONObject messageBody = message.getMessageBody();
for (Map<String, Object> raw : rows) {
// addAsName(raw, rightAsName);
JSONObject object = (JSONObject)messageBody.clone();
object.fluentPutAll(addAsName(raw, rightAsName));
object.put(TraceUtil.TRACE_ID_FLAG, traceId + "-" + index);
index++;
result.add(object);
}
} else {
Expand All @@ -255,6 +315,8 @@ public List<JSONObject> connectInnerJoin(IMessage message, List<Map<String, Obje
for (Map<String, Object> raw : rows) {
JSONObject object = (JSONObject)messageBody.clone();
object.fluentPutAll(raw);
object.put(TraceUtil.TRACE_ID_FLAG, traceId + "-" + index);
index++;
result.add(object);
}
}
Expand Down Expand Up @@ -284,14 +346,9 @@ private JSONObject addAsName(Map<String, Object> raw, String rightAsName) {
*/
protected String createStoreKey(IMessage message, String routeLabel, WindowInstance windowInstance) {
String shuffleKey = message.getMessageBody().getString(WindowCache.SHUFFLE_KEY);
String shuffleId = shuffleChannel.getChannelQueue(shuffleKey).getQueueId();
String orginQueueId = message.getMessageBody().getString(WindowCache.ORIGIN_QUEUE_ID);
String originOffset = message.getMessageBody().getString(WindowCache.ORIGIN_OFFSET);
String windowNamespace = getNameSpace();
String windowName = getConfigureName();
String startTime = windowInstance.getStartTime();
String endTime = windowInstance.getEndTime();
String storeKey = MapKeyUtil.createKey(shuffleId, windowNamespace, windowName, startTime, endTime, shuffleKey, routeLabel, orginQueueId, originOffset);
String storeKey = MapKeyUtil.createKey(windowInstance.createWindowInstanceId(), shuffleKey, routeLabel, orginQueueId, originOffset);
return storeKey;
}

Expand Down Expand Up @@ -327,6 +384,8 @@ private JoinState createJoinState(IMessage message, WindowInstance instance, Str
JSONObject messageBody = (JSONObject)message.getMessageBody().clone();
messageBody.remove("WindowInstance");
messageBody.remove("AbstractWindow");
messageBody.remove(WindowCache.ORIGIN_MESSAGE_HEADER);
messageBody.remove("MessageHeader");

JoinState state = null;
if ("left".equalsIgnoreCase(routeLabel)) {
Expand Down Expand Up @@ -396,47 +455,41 @@ public Class getWindowBaseValueClass() {
return JoinState.class;
}

// @Override
// public void finishWindowProcessAndSend2Receiver(List<IMessage> messageList,WindowInstance windowInstance) {
// for (IMessage message : messageList) {
// List<Map<String, Object>> result = joinOperator.dealJoin(message);
// List<Map<String,Object>> rows = matchRows(message.getMessageBody(), result);
// String rightAsName = message.getMessageBody().getString("rightAsName");
// String joinType = message.getMessageBody().getString("joinType");
// List<JSONObject> connectMsgs = joinOperator.connectJoin(message, rows, joinType, rightAsName);
// for (int i=0; i < connectMsgs.size(); i++) {
// if (i == connectMsgs.size() -1) {
// sendMessage(connectMsgs.get(i), true);
// } else {
// sendMessage(connectMsgs.get(i), false);
// }
//
// }
//
// }
// //todo 完成处理
// //todo 发送消息到下一个节点 sendFireMessage();
// }

/**
* window触发后的清理工作
* @param windowInstances
*/
/**
* 删除掉触发过的数据
*
* @param instance
* @param windowInstance
*/
@Override
public void clearFireWindowInstance(WindowInstance instance) {
if(instance==null){
return;
public void clearFireWindowInstance(WindowInstance windowInstance) {
// String partitionNum=(getOrderBypPrefix()+ windowInstance.getSplitId());

List<WindowInstance> removeInstances = new ArrayList<>();

Date clearTime = DateUtil.addSecond(DateUtil.parse(windowInstance.getStartTime()), -sizeInterval * (retainWindowCount - 1) * 60);
Iterator<String> iterable = this.windowInstanceMap.keySet().iterator();
while (iterable.hasNext()) {
WindowInstance instance = this.windowInstanceMap.get(iterable.next());
Date startTime = DateUtil.parse(instance.getStartTime());
if (DateUtil.dateDiff(clearTime, startTime) >= 0) {
removeInstances.add(instance);
iterable.remove();
}
}
WindowInstance.clearInstance(instance);
joinOperator.cleanMessage(instance.getWindowNameSpace(), instance.getWindowName(), this.getRetainWindowCount(),
this.getSizeInterval(), instance.getStartTime());
//todo windowinstace
//todo left+right

for (WindowInstance instance : removeInstances) {

windowMaxValueManager.deleteSplitNum(instance, instance.getSplitId());
ShufflePartitionManager.getInstance().clearWindowInstance(instance.createWindowInstanceId());
storage.delete(instance.createWindowInstanceId(),null,WindowBaseValue.class,sqlCache);
if(!isLocalStorageOnly){
WindowInstance.clearInstance(instance,sqlCache);
joinOperator.cleanMessage(instance.getWindowNameSpace(), instance.getWindowName(), this.getRetainWindowCount(),
this.getSizeInterval(), windowInstance.getStartTime());
}
}


}

protected List<Map<String, Object>> matchRows(JSONObject msg, List<Map<String, Object>> rows) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public class ShuffleChannel extends AbstractSystemChannel {
protected static final Log LOG = LogFactory.getLog(ShuffleChannel.class);

protected static final String SHUFFLE_QUEUE_ID = "SHUFFLE_QUEUE_ID";

protected static final String SHUFFLE_OFFSET = "SHUFFLE_OFFSET";
protected static final String SHUFFLE_MESSAGES = "SHUFFLE_MESSAGES";
protected String MSG_OWNER = "MSG_OWNER";//消息所属的window

Expand Down Expand Up @@ -158,6 +158,7 @@ public Object doMessage(IMessage oriMessage, AbstractContext context) {
for (Object obj : messages) {
IMessage message = new Message((JSONObject) obj);
message.getHeader().setQueueId(queueId);
message.getMessageBody().put(SHUFFLE_OFFSET, oriMessage.getHeader().getOffset());
window.updateMaxEventTime(message);
if (isRepeateMessage(message, queueId)) {
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ public boolean calculate(AbstractWindow window, IMessage message) {
calProjectColumn(window, message);
String traceId = message.getMessageBody().getString(WindowCache.ORIGIN_MESSAGE_TRACE_ID);
if (!StringUtil.isEmpty(traceId)) {
TraceUtil.debug(traceId, "window value result", getComputedColumnResult());
TraceUtil.debug(traceId, "window value result", decodeSQLContent(getComputedColumnResult()));
}
} catch (Exception e) {
LOG.error("failed in calculating the message", e);
Expand Down