Skip to content

[ISSUE #37]Add Apache license and fix some naming problems #38

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions build_without_test.sh
Original file line number Diff line number Diff line change
@@ -1 +1,18 @@
#!/bin/sh

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

mvn clean package -Dmaven.test.skip=true
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ protected boolean putMessage2Mq(IMessage fieldName2Value,AtomicInteger msgFinish
}

/**
* 发送metaq消息
* 发送RocketMQ消息
* @param content 消息内容
* @param key 消息的Key字段是为了唯一标识消息的,方便运维排查问题。如果不设置Key,则无法定位消息丢失原因。
* @param targetQueue
Expand Down Expand Up @@ -230,9 +230,9 @@ public List<ISplit> getSplitList() {
try {

if (messageQueues == null || messageQueues.size() == 0) {
List<MessageQueue> metaqQueueSet = producer.fetchPublishMessageQueues(topic);
List<MessageQueue> rocketmqQueueSet = producer.fetchPublishMessageQueues(topic);
List<ISplit> queueList = new ArrayList<>();
for (MessageQueue queue : metaqQueueSet) {
for (MessageQueue queue : rocketmqQueueSet) {
RocketMQMessageQueue rocketMQMessageQueue = new RocketMQMessageQueue(queue);
queueList.add(rocketMQMessageQueue);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ protected DefaultMQPushConsumer startConsumer() {
i++;
}
} catch (Exception e) {
LOG.error("消费metaq报错:" + e, e);
LOG.error("消费rocketmq报错:" + e, e);
}

return ConsumeOrderlyStatus.SUCCESS;// 返回消费成功
Expand All @@ -156,21 +156,21 @@ protected DefaultMQPushConsumer startConsumer() {
} catch (Exception e) {
setInitSuccess(false);
e.printStackTrace();
throw new RuntimeException("start metaq channel error " + topic, e);
throw new RuntimeException("start rocketmq channel error " + topic, e);
}
}
@Override
public List<ISplit> getAllSplits(){
try {
Set<MessageQueue> metaqQueueSet = consumer.fetchSubscribeMessageQueues(this.topic);
Set<MessageQueue> rocketmqQueueSet = consumer.fetchSubscribeMessageQueues(this.topic);
List<ISplit> queueList = new ArrayList<>();
for (MessageQueue queue : metaqQueueSet) {
RocketMQMessageQueue metaqMessageQueue = new RocketMQMessageQueue(queue);
if(isNotDataSplit(metaqMessageQueue.getQueueId())){
for (MessageQueue queue : rocketmqQueueSet) {
RocketMQMessageQueue rocketmqMessageQueue = new RocketMQMessageQueue(queue);
if(isNotDataSplit(rocketmqMessageQueue.getQueueId())){
continue;
}

queueList.add(metaqMessageQueue);
queueList.add(rocketmqMessageQueue);

}
return queueList;
Expand All @@ -192,8 +192,8 @@ public Map<String,List<ISplit>> getWorkingSplitsGroupByInstances(){
Map<org.apache.rocketmq.common.message.MessageQueue, String> queue2Instances= getMessageQueueAllocationResult(defaultMQAdminExt,this.groupName);
Map<String,List<ISplit>> instanceOwnerQueues=new HashMap<>();
for(org.apache.rocketmq.common.message.MessageQueue messageQueue:queue2Instances.keySet()){
RocketMQMessageQueue metaqMessageQueue = new RocketMQMessageQueue(new MessageQueue(messageQueue.getTopic(),messageQueue.getBrokerName(),messageQueue.getQueueId()));
if(isNotDataSplit(metaqMessageQueue.getQueueId())){
RocketMQMessageQueue rocketmqMessageQueue = new RocketMQMessageQueue(new MessageQueue(messageQueue.getTopic(),messageQueue.getBrokerName(),messageQueue.getQueueId()));
if(isNotDataSplit(rocketmqMessageQueue.getQueueId())){
continue;
}
String instanceName=queue2Instances.get(messageQueue);
Expand All @@ -202,7 +202,7 @@ public Map<String,List<ISplit>> getWorkingSplitsGroupByInstances(){
splits=new ArrayList<>();
instanceOwnerQueues.put(instanceName,splits);
}
splits.add(metaqMessageQueue);
splits.add(rocketmqMessageQueue);
}
return instanceOwnerQueues;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.streams.client.strategy;

import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.streams.client;

import com.alibaba.fastjson.JSONObject;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.streams.client;

import org.apache.rocketmq.streams.common.channel.impl.file.FileSource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,33 +52,4 @@ public void foreach(JSONObject o) {

}

// @Test
// public void testWindowFromMetaq() throws InterruptedException {
// String topic = "TOPIC_DIPPER_SYSTEM_MSG_4";
// StreamBuilder.dataStream("namespace", "name")
// .fromFile("/Users/yuanxiaodong/chris/sls_100.txt", true)
// .toRocketmq(topic)
// .asyncStart();
//
// StreamBuilder.dataStream("namespace", "name1")
// .fromRocketmq(topic, "chris", true)
// .window(TumblingWindow.of(Time.seconds(5)))
// .groupby("ProjectName", "LogStore")
// .setLocalStorageOnly(true)
// .count("total")
// .sum("OutFlow", "OutFlow")
// .sum("InFlow", "inflow")
// .toDataSteam()
// .forEach(new ForEachFunction<JSONObject>() {
// protected int sum = 0;
//
// @Override
// public void foreach(JSONObject o) {
// int total = o.getInteger("total");
// sum = sum + total;
// o.put("sum(total)", sum);
// }
// }).toPrint().start();
// }

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.rocketmq.streams.client.transform.DataStream;
import org.junit.Test;

public class WindowFromMetaq extends AbstractWindowFireModeTest {
public class WindowFromRocketMQ extends AbstractWindowFireModeTest {

String topic = "TOPIC_DIPPER_SYSTEM_MSG_5";
@Test
Expand All @@ -44,12 +44,12 @@ public void testWindowFireMode2() {


@Test
public void testWindowToMetaq() throws InterruptedException {
public void testWindowToRocketMQ() throws InterruptedException {

long start=System.currentTimeMillis();
StreamBuilder.dataStream("namespace", "name")
.fromFile("/Users/yuanxiaodong/chris/sls_10.txt", true)
.toMetaq(topic)
.toRocketmq(topic, "chris1", "", "", "", "", "")
.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public abstract class AbstractSupportShuffleSink extends AbstractSink {
protected volatile transient boolean hasCreated = false;

/**
* 获取sink的主题,在sls中是logStore,metaq是topic
* 获取sink的主题,在sls中是logStore,RocketMQ是topic
*
* @return
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,14 @@ static String getDipperJdbcPassword() {
}

/**
* shuffle相关配置,如果后面加上.namespace,则只对某个namespace生效,如window.shuffle.channel.type.namespace=metaq,相当于只对这个namespace配置
* shuffle相关配置,如果后面加上.namespace,则只对某个namespace生效,如window.shuffle.channel.type.namespace=rocketmq,相当于只对这个namespace配置
*/

String WINDOW_SHUFFLE_CHANNEL_TYPE = "window.shuffle.channel.type";//window 做shuffle中转需要的消息队列类型
//比如metaq,需要topic,tags和group,属性值和字段名保持一致即可。配置如下:window.shuffle.channel.topic=abdc window.shuffle.channel.tag=fdd
//比如rocketmq,需要topic,tags和group,属性值和字段名保持一致即可。配置如下:window.shuffle.channel.topic=abdc window.shuffle.channel.tag=fdd

String WINDOW_SHUFFLE_CHANNEL_PROPERTY_PREFIX = "window.shuffle.channel.";
String WINDOW_SYSTEM_MESSAGE_CHENNEL_OWNER = "window.system.message.channel.owner";//如果能做消息过滤,只过滤本window的消息,可以配置这个属性,如metaq的tags.不支持的会做客户端过滤
String WINDOW_SYSTEM_MESSAGE_CHENNEL_OWNER = "window.system.message.channel.owner";//如果能做消息过滤,只过滤本window的消息,可以配置这个属性,如rocketmq的tags.不支持的会做客户端过滤

/**
* 通知相关
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.streams.window.debug;

import com.alibaba.fastjson.JSONObject;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.streams.window.fire;

import java.util.HashMap;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.streams.window.fire;

import java.util.ArrayList;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.streams.window.fire;

import java.util.concurrent.ConcurrentHashMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class WindowBaseValue extends BasedConfigurable implements Serializable {
protected String windowInstanceId;

/**
* 分片信息(metaQ里是queue
* 分片信息(RocketMQ里是queue
*/
protected String partition;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ dipper.rds.table.name=dipper_sql_configure

#window 异步消息分发需要的消息队列类型
dipper.window.shuffle.rocketmq.dispatch.channel.type=rocketmq
#如果是metaq,统计和join分别用的topoc
#如果是RocketMQ,统计和join分别用的topoc
dipper.window.shuffle.rocketmq.dispatch.channel.topic=TOPIC_DIPPER_WINDOW_STATISTICS
#动态生成的消息队列的属性的key值,这个值会被动态赋值
dipper.window.shuffle.rocketmq.dispatch.channel.dynamic.property=tag
Expand Down