Skip to content

make a runnable example of window operator. #54

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import com.alibaba.fastjson.JSONObject;
import com.google.auto.service.AutoService;

import java.util.Properties;

import org.apache.rocketmq.streams.common.channel.builder.AbstractSupportShuffleChannelBuilder;
import org.apache.rocketmq.streams.common.channel.builder.IChannelBuilder;
import org.apache.rocketmq.streams.common.channel.sink.ISink;
Expand All @@ -31,32 +33,32 @@
import org.apache.rocketmq.streams.source.RocketMQSource;

@AutoService(IChannelBuilder.class)
@ServiceName(value = RocketMQChannelBuilder.TYPE, aliasName = "RocketMQSource",name="metaq")
@ServiceName(value = RocketMQChannelBuilder.TYPE, aliasName = "RocketMQSource", name = "metaq")
public class RocketMQChannelBuilder extends AbstractSupportShuffleChannelBuilder {
public static final String TYPE = "rocketmq";

@Override
public ISource createSource(String namespace, String name, Properties properties, MetaData metaData) {

RocketMQSource rocketMQSource = (RocketMQSource) ConfigurableUtil.create(RocketMQSource.class.getName(),namespace,name,createFormatProperty(properties),null);
RocketMQSource rocketMQSource = (RocketMQSource) ConfigurableUtil.create(RocketMQSource.class.getName(), namespace, name, createFormatProperty(properties), null);
return rocketMQSource;
}

protected JSONObject createFormatProperty(Properties properties){
JSONObject formatProperties=new JSONObject();
for(Object object:properties.keySet()){
String key=(String)object;
protected JSONObject createFormatProperty(Properties properties) {
JSONObject formatProperties = new JSONObject();
for (Object object : properties.keySet()) {
String key = (String) object;
if ("type".equals(key)) {
continue;
}
formatProperties.put(key,properties.getProperty(key));
formatProperties.put(key, properties.getProperty(key));
}
IChannelBuilder.formatPropertiesName(formatProperties,properties,"topic","topic");
IChannelBuilder.formatPropertiesName(formatProperties,properties,"tags","tag");
IChannelBuilder.formatPropertiesName(formatProperties,properties,"maxThread","thread.max.count");
IChannelBuilder.formatPropertiesName(formatProperties,properties,"pullIntervalMs","pullIntervalMs");
IChannelBuilder.formatPropertiesName(formatProperties,properties,"offsetTime","offsetTime");
IChannelBuilder.formatPropertiesName(formatProperties,properties,"namesrvAddr","namesrvAddr");
IChannelBuilder.formatPropertiesName(formatProperties, properties, "topic", "topic");
IChannelBuilder.formatPropertiesName(formatProperties, properties, "tags", "tag");
IChannelBuilder.formatPropertiesName(formatProperties, properties, "maxThread", "thread.max.count");
IChannelBuilder.formatPropertiesName(formatProperties, properties, "pullIntervalMs", "pullIntervalMs");
IChannelBuilder.formatPropertiesName(formatProperties, properties, "offsetTime", "offsetTime");
IChannelBuilder.formatPropertiesName(formatProperties, properties, "namesrvAddr", "namesrvAddr");
if (properties.getProperty("group") != null) {
String group = properties.getProperty("group");
if (group.startsWith("GID_")) {
Expand All @@ -78,16 +80,16 @@ public String getType() {

@Override
public ISink createSink(String namespace, String name, Properties properties, MetaData metaData) {
RocketMQSink rocketMQSink = (RocketMQSink) ConfigurableUtil.create(RocketMQSink.class.getName(),namespace,name,createFormatProperty(properties),null);
RocketMQSink rocketMQSink = (RocketMQSink) ConfigurableUtil.create(RocketMQSink.class.getName(), namespace, name, createFormatProperty(properties), null);
return rocketMQSink;
}

@Override
public ISink createBySource(ISource pipelineSource) {
RocketMQSource source = (RocketMQSource)pipelineSource;
String topic = source.getTopic();
RocketMQSource source = (RocketMQSource) pipelineSource;
RocketMQSink sink = new RocketMQSink();
sink.setTopic(topic);
sink.setNamesrvAddr(source.getNamesrvAddr());
sink.setTopic(source.getTopic());
sink.setTags(source.getTags());
return sink;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
Expand All @@ -44,21 +45,27 @@

public class RocketMQSink extends AbstractSupportShuffleSink {

protected static final Log LOG = LogFactory.getLog(RocketMQSink.class);
private static final Log LOG = LogFactory.getLog(RocketMQSink.class);
@ENVDependence
protected String tags = "*";
private String tags = "*";

protected String topic;
protected String groupName;
private String topic;
private String groupName;

private transient List<DefaultMQPushConsumer> consumers=new ArrayList<>();
protected transient DefaultMQProducer producer;
private transient List<DefaultMQPushConsumer> consumers = new ArrayList<>();
private transient DefaultMQProducer producer;

protected Long pullIntervalMs;
protected String namesrvAddr;
private Long pullIntervalMs;
private String namesrvAddr;

public RocketMQSink() {
}

public RocketMQSink(){}
public RocketMQSink(String namesrvAddr, String topic, String groupName) {
this.namesrvAddr = namesrvAddr;
this.topic = topic;
this.groupName = groupName;
}


@Override
Expand All @@ -81,65 +88,67 @@ protected boolean batchInsert(List<IMessage> messages) {
initProducer();

try {
Map<String,List<Message>> msgsByQueueId=new HashMap<>();// group by queueId, if the message not contains queue info ,the set default string as default queueId
Map<String, MessageQueue> messageQueueMap=new HashMap<>();//if has queue id in message, save the map for queueid 2 messagequeeue
String defaultQueueId="<null>";//message is not contains queue ,use default
for(IMessage msg:messages){
Map<String, List<Message>> msgsByQueueId = new HashMap<>();// group by queueId, if the message not contains queue info ,the set default string as default queueId
Map<String, MessageQueue> messageQueueMap = new HashMap<>();//if has queue id in message, save the map for queueid 2 messagequeeue
String defaultQueueId = "<null>";//message is not contains queue ,use default
for (IMessage msg : messages) {
ISplit<RocketMQMessageQueue, MessageQueue> channelQueue = getSplit(msg);
String queueId=defaultQueueId;
String queueId = defaultQueueId;
if (channelQueue != null) {
queueId=channelQueue.getQueueId();
RocketMQMessageQueue metaqMessageQueue=(RocketMQMessageQueue)channelQueue;
messageQueueMap.put(queueId,metaqMessageQueue.getQueue());
queueId = channelQueue.getQueueId();
RocketMQMessageQueue metaqMessageQueue = (RocketMQMessageQueue) channelQueue;
messageQueueMap.put(queueId, metaqMessageQueue.getQueue());
}
List<Message> messageList=msgsByQueueId.get(queueId);
if(messageList==null){
messageList=new ArrayList<>();
msgsByQueueId.put(queueId,messageList);
List<Message> messageList = msgsByQueueId.get(queueId);
if (messageList == null) {
messageList = new ArrayList<>();
msgsByQueueId.put(queueId, messageList);
}
messageList.add(new Message(topic, tags, null, msg.getMessageBody().toJSONString().getBytes("UTF-8")));
}
List<Message> messageList=msgsByQueueId.get(defaultQueueId);
if(messageList!=null){
for(Message message:messageList){
List<Message> messageList = msgsByQueueId.get(defaultQueueId);
if (messageList != null) {
for (Message message : messageList) {
producer.sendOneway(message);
}
messageQueueMap.remove(defaultQueueId);
}
if(messageQueueMap.size()<=0){
if (messageQueueMap.size() <= 0) {
return true;
}
for(String queueId:msgsByQueueId.keySet()){
messageList=msgsByQueueId.get(queueId);
for(Message message:messageList){
MessageQueue queue=messageQueueMap.get(queueId);
producer.send(message,queue);
for (String queueId : msgsByQueueId.keySet()) {
messageList = msgsByQueueId.get(queueId);
for (Message message : messageList) {
MessageQueue queue = messageQueueMap.get(queueId);
producer.send(message, queue);
}

}
}catch (Exception e){
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("batch insert error ",e);
throw new RuntimeException("batch insert error ", e);
}

return true;
}


protected void initProducer() {
if(producer==null){
synchronized (this){
if(producer==null){
if (producer == null) {
synchronized (this) {
if (producer == null) {
destroy();
producer = new DefaultMQProducer(groupName + "producer", true, null);
try {
if (this.namesrvAddr != null && !"".equalsIgnoreCase(this.namesrvAddr)) {
producer.setNamesrvAddr(this.namesrvAddr);
if (this.namesrvAddr == null || "".equals(this.namesrvAddr)) {
throw new RuntimeException("namesrvAddr can not be null.");
}

producer.setNamesrvAddr(this.namesrvAddr);
producer.start();
} catch (Exception e) {
setInitSuccess(false);
throw new RuntimeException("创建队列失败," + topic + ",msg=" + e.getMessage(), e);
throw new RuntimeException("create producer failed," + topic + ",msg=" + e.getMessage(), e);
}
}
}
Expand All @@ -151,7 +160,7 @@ public void destroyProduce() {
if (producer != null) {
try {
producer.shutdown();
producer=null;
producer = null;
} catch (Throwable t) {
if (LOG.isWarnEnabled()) {
LOG.warn(t.getMessage(), t);
Expand All @@ -160,14 +169,6 @@ public void destroyProduce() {
}
}

public static void main(String[] args) {
String topic = "shuffle_TOPIC_DIPPER_SYSTEM_MSG_6_namespace_name1";
RocketMQSink metaqSink = new RocketMQSink();
metaqSink.setTopic(topic);
metaqSink.setSplitNum(5);
metaqSink.init();
System.out.println(metaqSink.getSplitList().size());
}

@Override
public void destroy() {
Expand Down Expand Up @@ -195,7 +196,7 @@ protected void createTopicIfNotExist(int splitNum) {
}
}

//defaultMQAdminExt.createTopic(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC, topic, splitNum, 1);

defaultMQAdminExt.createTopic(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC, topic, splitNum, 1);
} catch (Exception e) {
e.printStackTrace();
Expand All @@ -209,7 +210,7 @@ protected void createTopicIfNotExist(int splitNum) {
@Override
public List<ISplit> getSplitList() {
initProducer();
List<ISplit> messageQueues=new ArrayList<>();
List<ISplit> messageQueues = new ArrayList<>();
try {

if (messageQueues == null || messageQueues.size() == 0) {
Expand All @@ -223,7 +224,7 @@ public List<ISplit> getSplitList() {
Collections.sort(queueList);
messageQueues = queueList;
}
}catch (Exception e){
} catch (Exception e) {
throw new RuntimeException(e);
}

Expand All @@ -232,13 +233,13 @@ public List<ISplit> getSplitList() {

@Override
public int getSplitNum() {
List<ISplit> splits=getSplitList();
if(splits==null||splits.size()==0){
List<ISplit> splits = getSplitList();
if (splits == null || splits.size() == 0) {
return 0;
}
Set<Integer> splitNames=new HashSet<>();
for(ISplit split:splits){
MessageQueue messageQueue= (MessageQueue)split.getQueue();
Set<Integer> splitNames = new HashSet<>();
for (ISplit split : splits) {
MessageQueue messageQueue = (MessageQueue) split.getQueue();
splitNames.add(messageQueue.getQueueId());
}
return splitNames.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean is
ConcurrentMap<MessageQueue, AtomicLong> offsetTable = ReflectUtil.getDeclaredField(this, "offsetTable");
DebugWriter.getInstance(getTopic()).writeSaveOffset(mq, offsetTable.get(mq));
}
LOG.info("the queue Id is " + new RocketMQMessageQueue(mq).getQueueId() + ",rocketmq start save offset,the save time is " + DateUtil.getCurrentTimeString());
// LOG.info("the queue Id is " + new RocketMQMessageQueue(mq).getQueueId() + ",rocketmq start save offset,the save time is " + DateUtil.getCurrentTimeString());
super.updateConsumeOffsetToBroker(mq, offset, isOneway);
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import org.apache.rocketmq.streams.common.topology.builder.PipelineBuilder;
import org.apache.rocketmq.streams.source.RocketMQSource;

public class DataStreamSource {
import java.io.Serializable;

public class DataStreamSource implements Serializable {
protected PipelineBuilder mainPipelineBuilder;

public DataStreamSource(String namespace, String pipelineName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ protected <T> T operate(IMessage message, AbstractContext context) {
if (result instanceof JSONObject) {
subMessage=new Message((JSONObject)t);
} else {
subMessage=new Message(new UserDefinedMessage(result));
subMessage=new Message(new UserDefinedMessage(t));
}
splitMessages.add(subMessage);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class OutputPrintChannel extends AbstractSink {
@Override
protected boolean batchInsert(List<IMessage> messages) {
for (IMessage msg : messages) {
System.out.println(msg.getMessageBody().toJSONString());
System.out.println(msg.getMessageValue());
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ public class MessageCache<R> implements IMessageCache<R> {
protected volatile int autoFlushSize=300;
protected volatile int autoFlushTimeGap=1000;

protected transient AtomicBoolean LOCK=new AtomicBoolean(false);

public MessageCache(IMessageFlushCallBack<R> flushCallBack) {
this.flushCallBack = flushCallBack;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,21 +61,10 @@ public void checkpoint(IMessage message, AbstractContext context, CheckPointMess
@Override
public void addNewSplit(IMessage message, AbstractContext context, NewSplitMessage newSplitMessage) {


//do nothigh
}
@Override
public void removeSplit(IMessage message, AbstractContext context, RemoveSplitMessage removeSplitMessage) {
//if(message.getHeader().isNeedFlush()){
// if(message.getHeader().getCheckpointQueueIds()!=null&&message.getHeader().getCheckpointQueueIds().size()>0){
// window.getWindowCache().flush(message.getHeader().getCheckpointQueueIds());
// }else {
// Set<String> queueIds=new HashSet<>();
// queueIds.add(message.getHeader().getQueueId());
// window.getWindowCache().flush(queueIds);
// }
//
//}

}

@Override
Expand Down
Loading