Skip to content

make a runnable example. #44

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
Expand All @@ -36,9 +37,12 @@
public class RocketMQOffset implements OffsetStore {
protected OffsetStore offsetStore;
protected AbstractSupportOffsetResetSource source;
private AtomicBoolean starting;

public RocketMQOffset(OffsetStore offsetStore, AbstractSupportOffsetResetSource source){
this.offsetStore=offsetStore;
this.source=source;
this.starting = new AtomicBoolean(true);
}
@Override
public void load() throws MQClientException {
Expand Down Expand Up @@ -67,9 +71,16 @@ public void persist(MessageQueue mq) {

@Override
public void removeOffset(MessageQueue mq) {
Set<String> splitIds = new HashSet<>();
splitIds.add(new RocketMQMessageQueue(mq).getQueueId());
source.removeSplit(splitIds);
//todo 启动时第一次做rebalance时source中也没有原有消费mq,不做移除,做了会有副作用
//后续整个checkpoint机制都会调整成异步,整块代码都不会保留,目前为了整体跑通,不做修改。
if (starting.get()) {
starting.set(false);
} else {
Set<String> splitIds = new HashSet<>();
splitIds.add(new RocketMQMessageQueue(mq).getQueueId());
source.removeSplit(splitIds);
}

offsetStore.removeOffset(mq);
}

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Sets;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.streams.client.DataStreamAction;
Expand Down Expand Up @@ -131,7 +133,7 @@ protected <T> T operate(IMessage message, AbstractContext context) {
for(T t:result){
Message subMessage=null;
if (result instanceof JSONObject) {
subMessage=new Message((JSONObject)result);
subMessage=new Message((JSONObject)t);
} else {
subMessage=new Message(new UserDefinedMessage(result));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;

import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.rocketmq.streams.common.channel.source.systemmsg.NewSplitMessage;
import org.apache.rocketmq.streams.common.channel.source.systemmsg.RemoveSplitMessage;
import org.apache.rocketmq.streams.common.channel.split.ISplit;
Expand Down Expand Up @@ -219,22 +221,18 @@ public JSONObject create(String message) {
return createJson(message);
}

/**
* 交给receiver执行后续逻辑
*
* @param channelMessage
* @return
*/

public AbstractContext executeMessage(Message channelMessage) {
AbstractContext context = new Context(channelMessage);
if (isSplitInRemoving(channelMessage)) {
return context;
}
if (!channelMessage.getHeader().isSystemMessage()) {
messageQueueChangedCheck(channelMessage.getHeader());
}

boolean needFlush = channelMessage.getHeader().isSystemMessage() == false && channelMessage.getHeader().isNeedFlush();
if (isSplitInRemoving(channelMessage)) {
return context;
}

boolean needFlush = !channelMessage.getHeader().isSystemMessage() && channelMessage.getHeader().isNeedFlush();

if (receiver != null) {
receiver.doMessage(channelMessage, context);
Expand Down Expand Up @@ -277,9 +275,6 @@ protected boolean isSplitInRemoving(Message channelMessage) {
* @param header
*/
protected void messageQueueChangedCheck(MessageHeader header) {
if (supportNewSplitFind() && supportRemoveSplitFind()) {
return;
}
Set<String> queueIds = new HashSet<>();
String msgQueueId = header.getQueueId();
if (StringUtil.isNotEmpty(msgQueueId)) {
Expand All @@ -290,7 +285,7 @@ protected void messageQueueChangedCheck(MessageHeader header) {
queueIds.addAll(checkpointQueueIds);
}
Set<String> newQueueIds = new HashSet<>();
Set<String> removeQueueIds = new HashSet<>();

for (String queueId : queueIds) {
if (isNotDataSplit(queueId)) {
continue;
Expand All @@ -306,22 +301,12 @@ protected void messageQueueChangedCheck(MessageHeader header) {
} else {

this.checkPointManager.updateLastUpdate(queueId);
//if(this.checkPointManager.isRemovedSplit(queueId)){
// this.checkPointManager.removeSplit(queueId);
// removeQueueIds.add(queueId);
//}else {
//
//}

}
}
}
//if(!supportRemoveSplitFind()){
// removeSplit(removeQueueIds);
//}
if (!supportNewSplitFind()) {
addNewSplit(newQueueIds);
}

addNewSplit(newQueueIds);
}

protected abstract boolean isNotDataSplit(String queueId);
Expand All @@ -343,12 +328,15 @@ public void removeSplit(Set<String> splitIds) {

}
}
public List<ISplit> getAllSplits(){

public List<ISplit> getAllSplits() {
return null;
}
public Map<String,List<ISplit>> getWorkingSplitsGroupByInstances(){

public Map<String, List<ISplit>> getWorkingSplitsGroupByInstances() {
return null;
}

public void addNewSplit(Set<String> splitIds) {
if (splitIds == null || splitIds.size() == 0) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,6 @@
*/
package org.apache.rocketmq.streams.common.checkpoint;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageCache;
import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack;
import org.apache.rocketmq.streams.common.channel.sinkcache.impl.MessageCache;
Expand All @@ -32,6 +25,13 @@
import org.apache.rocketmq.streams.common.utils.MapKeyUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;

import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

public class CheckPointManager {
protected IMessageCache<CheckPointMessage> messageCache;
protected transient Map<String, Long> currentSplitAndLastUpdateTime = new HashMap<>();//保存这个实例处理的分片数
Expand Down Expand Up @@ -138,11 +138,7 @@ protected SourceState createSourceState(CheckPointMessage checkPointMessage) {
* @param sourceStateMap
*/
protected void saveCheckPoint(Map<String, SourceState> sourceStateMap) {
List<CheckPoint> checkPoints = new ArrayList<>();
for (SourceState sourceState : sourceStateMap.values()) {
CheckPoint checkPoint = new CheckPoint();
// checkPoint.setOffset();
}

}

/**
Expand Down Expand Up @@ -176,16 +172,7 @@ public void addCheckPointMessage(CheckPointMessage message) {
this.messageCache.addCache(message);
}

//public boolean isRemovedSplit(String queueId) {
// Long lastUpdateTime=this.currentSplitAndLastUpdateTime.get(queueId);
// if(lastUpdateTime==null){
// return false;
// }
// if(System.currentTimeMillis()-lastUpdateTime>10000*1000){
// return true;
// }
// return false;
//}


public void updateLastUpdate(String queueId) {
addSplit(queueId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ public interface IConfigurable extends IJsonable, IConfigurableIdentification, S
/**
* 把toJson的结果当作一个特殊属性
*/
static final String JSON_PROPERTY = "configurable_json";
String JSON_PROPERTY = "configurable_json";

/**
* 把status当作configurable 的一个特殊属性
*/
static final String STATUS_PROPERTY = "configurable_status";
String STATUS_PROPERTY = "configurable_status";

/**
* 每个配置有一个独立的名字
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.rocketmq.streams.common.context;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -128,9 +129,7 @@ public void addSplitMessages(T... splitMessages) {
if (splitMessages == null) {
return;
}
for (T t : splitMessages) {
this.splitMessages.add(t);
}
this.splitMessages.addAll(Arrays.asList(splitMessages));
}

public void removeSpliteMessage(T message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
*/
package org.apache.rocketmq.streams.common.functions;

public interface MapFunction<T, O> extends Function {
import java.io.Serializable;

public interface MapFunction<T, O> extends Function, Serializable {

T map(O message) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.rocketmq.streams.common.topology.builder;

import java.io.Serializable;
import org.apache.rocketmq.streams.common.channel.sink.ISink;
import org.apache.rocketmq.streams.common.channel.source.ISource;
import org.apache.rocketmq.streams.common.configurable.AbstractConfigurable;
Expand All @@ -30,13 +29,15 @@
import org.apache.rocketmq.streams.common.utils.NameCreatorUtil;
import org.apache.rocketmq.streams.common.utils.StringUtil;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class PipelineBuilder implements Serializable {
private static final long serialVersionUID = 1L;

/**
* 最终产出的pipeline
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.rocketmq.streams.common.configurable.BasedConfigurable;
import org.apache.rocketmq.streams.common.context.AbstractContext;
import org.apache.rocketmq.streams.common.context.IMessage;
import org.apache.rocketmq.streams.common.context.Message;
import org.apache.rocketmq.streams.common.interfaces.IStreamOperator;
import org.apache.rocketmq.streams.common.interfaces.ISystemMessageProcessor;
import org.apache.rocketmq.streams.common.optimization.SQLLogFingerprintFilter;
Expand All @@ -33,7 +34,7 @@
public abstract class AbstractStage<T extends IMessage> extends BasedConfigurable
implements IStreamOperator<T, T>, ISystemMessageProcessor {

private static final long serialVersionUID = -143202547707927632L;


private static final Log LOG = LogFactory.getLog(AbstractStage.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@
import org.apache.rocketmq.streams.common.topology.model.IStageHandle;
import org.apache.rocketmq.streams.common.topology.stages.AbstractStatelessChainStage;

import java.io.Serializable;

/**
* 给用户提供自定义的抽象类
*/
public abstract class StageBuilder extends AbstractStatelessChainStage<IMessage> implements IStageBuilder<ChainStage>, IAfterConfigurableRefreshListener {

public abstract class StageBuilder extends AbstractStatelessChainStage<IMessage> implements IStageBuilder<ChainStage>, Serializable, IAfterConfigurableRefreshListener {
private static final long serialVersionUID = 1L;
@Override
protected boolean initConfigurable() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,18 @@

public class RocketMQSourceExample2 {
public static final String NAMESRV_ADDRESS = "127.0.0.1:9876";
public static final String RMQ_TOPIC = "topic_tiger_0822_01";
public static final String RMQ_CONSUMER_GROUP_NAME = "consumer_tiger_0822_01";
public static final String RMQ_TOPIC = "topic_tiger_0901_01";
public static final String RMQ_CONSUMER_GROUP_NAME = "test-group-10";
public static final String TAGS = "*";

public static void main(String[] args) {
DataStreamSource source = StreamBuilder.dataStream("namespace", "pipeline");

source.from(new RocketMQSource(
source.fromRocketmq(
RMQ_TOPIC,
TAGS,
RMQ_CONSUMER_GROUP_NAME,
NAMESRV_ADDRESS
))
false,
NAMESRV_ADDRESS)
.map(message -> message)
.toPrint(1)
.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ public void testRelationExpression() {
jsonObject.put("vmip", "1.1.1.1");

boolean value =
ExpressionBuilder.executeExecute("namespace", "(ip,=,1.2.2.3)&((uid,=,1224)|(vmip,=,1.1.11.1))",
jsonObject);
ExpressionBuilder.executeExecute("namespace", "(ip,=,1.2.2.3)&((uid,=,1224)|(vmip,=,1.1.11.1))", jsonObject);
assertTrue(value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ protected boolean initConfigurable() {
if (scriptOptimization.supportOptimize()) {
expressions = scriptOptimization.getScriptOptimizeExprssions();
}
FunctionScript functionScript = this;

//转化成istreamoperator 接口
for (IScriptExpression scriptExpression : expressions) {
receivers.add((message, context) -> {
Expand Down