Skip to content

Commit c8935a6

Browse files
authored
Merge pull request #38 from caigy/main
[ISSUE #37]Add Apache license and fix some naming problems
2 parents fced1e6 + 2a1ef0b commit c8935a6

File tree

16 files changed

+151
-51
lines changed

16 files changed

+151
-51
lines changed

build_without_test.sh

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,18 @@
1+
#!/bin/sh
2+
3+
# Licensed to the Apache Software Foundation (ASF) under one or more
4+
# contributor license agreements. See the NOTICE file distributed with
5+
# this work for additional information regarding copyright ownership.
6+
# The ASF licenses this file to You under the Apache License, Version 2.0
7+
# (the "License"); you may not use this file except in compliance with
8+
# the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
118
mvn clean package -Dmaven.test.skip=true

rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ protected boolean putMessage2Mq(IMessage fieldName2Value,AtomicInteger msgFinish
125125
}
126126

127127
/**
128-
* 发送metaq消息
128+
* 发送RocketMQ消息
129129
* @param content 消息内容
130130
* @param key 消息的Key字段是为了唯一标识消息的,方便运维排查问题。如果不设置Key,则无法定位消息丢失原因。
131131
* @param targetQueue
@@ -230,9 +230,9 @@ public List<ISplit> getSplitList() {
230230
try {
231231

232232
if (messageQueues == null || messageQueues.size() == 0) {
233-
List<MessageQueue> metaqQueueSet = producer.fetchPublishMessageQueues(topic);
233+
List<MessageQueue> rocketmqQueueSet = producer.fetchPublishMessageQueues(topic);
234234
List<ISplit> queueList = new ArrayList<>();
235-
for (MessageQueue queue : metaqQueueSet) {
235+
for (MessageQueue queue : rocketmqQueueSet) {
236236
RocketMQMessageQueue rocketMQMessageQueue = new RocketMQMessageQueue(queue);
237237
queueList.add(rocketMQMessageQueue);
238238

rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ protected DefaultMQPushConsumer startConsumer() {
143143
i++;
144144
}
145145
} catch (Exception e) {
146-
LOG.error("消费metaq报错:" + e, e);
146+
LOG.error("消费rocketmq报错:" + e, e);
147147
}
148148

149149
return ConsumeOrderlyStatus.SUCCESS;// 返回消费成功
@@ -156,21 +156,21 @@ protected DefaultMQPushConsumer startConsumer() {
156156
} catch (Exception e) {
157157
setInitSuccess(false);
158158
e.printStackTrace();
159-
throw new RuntimeException("start metaq channel error " + topic, e);
159+
throw new RuntimeException("start rocketmq channel error " + topic, e);
160160
}
161161
}
162162
@Override
163163
public List<ISplit> getAllSplits(){
164164
try {
165-
Set<MessageQueue> metaqQueueSet = consumer.fetchSubscribeMessageQueues(this.topic);
165+
Set<MessageQueue> rocketmqQueueSet = consumer.fetchSubscribeMessageQueues(this.topic);
166166
List<ISplit> queueList = new ArrayList<>();
167-
for (MessageQueue queue : metaqQueueSet) {
168-
RocketMQMessageQueue metaqMessageQueue = new RocketMQMessageQueue(queue);
169-
if(isNotDataSplit(metaqMessageQueue.getQueueId())){
167+
for (MessageQueue queue : rocketmqQueueSet) {
168+
RocketMQMessageQueue rocketmqMessageQueue = new RocketMQMessageQueue(queue);
169+
if(isNotDataSplit(rocketmqMessageQueue.getQueueId())){
170170
continue;
171171
}
172172

173-
queueList.add(metaqMessageQueue);
173+
queueList.add(rocketmqMessageQueue);
174174

175175
}
176176
return queueList;
@@ -192,8 +192,8 @@ public Map<String,List<ISplit>> getWorkingSplitsGroupByInstances(){
192192
Map<org.apache.rocketmq.common.message.MessageQueue, String> queue2Instances= getMessageQueueAllocationResult(defaultMQAdminExt,this.groupName);
193193
Map<String,List<ISplit>> instanceOwnerQueues=new HashMap<>();
194194
for(org.apache.rocketmq.common.message.MessageQueue messageQueue:queue2Instances.keySet()){
195-
RocketMQMessageQueue metaqMessageQueue = new RocketMQMessageQueue(new MessageQueue(messageQueue.getTopic(),messageQueue.getBrokerName(),messageQueue.getQueueId()));
196-
if(isNotDataSplit(metaqMessageQueue.getQueueId())){
195+
RocketMQMessageQueue rocketmqMessageQueue = new RocketMQMessageQueue(new MessageQueue(messageQueue.getTopic(),messageQueue.getBrokerName(),messageQueue.getQueueId()));
196+
if(isNotDataSplit(rocketmqMessageQueue.getQueueId())){
197197
continue;
198198
}
199199
String instanceName=queue2Instances.get(messageQueue);
@@ -202,7 +202,7 @@ public Map<String,List<ISplit>> getWorkingSplitsGroupByInstances(){
202202
splits=new ArrayList<>();
203203
instanceOwnerQueues.put(instanceName,splits);
204204
}
205-
splits.add(metaqMessageQueue);
205+
splits.add(rocketmqMessageQueue);
206206
}
207207
return instanceOwnerQueues;
208208

rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/strategy/ConfiguableConnector.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,19 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
117
package org.apache.rocketmq.streams.client.strategy;
218

319
import org.apache.rocketmq.streams.common.configurable.IConfigurableService;

rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/SinkTest.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,19 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
117
package org.apache.rocketmq.streams.client;
218

319
import com.alibaba.fastjson.JSONObject;

rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/SourceTest.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,19 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
117
package org.apache.rocketmq.streams.client;
218

319
import org.apache.rocketmq.streams.common.channel.impl.file.FileSource;

rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/WindowTest.java

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -52,33 +52,4 @@ public void foreach(JSONObject o) {
5252

5353
}
5454

55-
// @Test
56-
// public void testWindowFromMetaq() throws InterruptedException {
57-
// String topic = "TOPIC_DIPPER_SYSTEM_MSG_4";
58-
// StreamBuilder.dataStream("namespace", "name")
59-
// .fromFile("/Users/yuanxiaodong/chris/sls_100.txt", true)
60-
// .toRocketmq(topic)
61-
// .asyncStart();
62-
//
63-
// StreamBuilder.dataStream("namespace", "name1")
64-
// .fromRocketmq(topic, "chris", true)
65-
// .window(TumblingWindow.of(Time.seconds(5)))
66-
// .groupby("ProjectName", "LogStore")
67-
// .setLocalStorageOnly(true)
68-
// .count("total")
69-
// .sum("OutFlow", "OutFlow")
70-
// .sum("InFlow", "inflow")
71-
// .toDataSteam()
72-
// .forEach(new ForEachFunction<JSONObject>() {
73-
// protected int sum = 0;
74-
//
75-
// @Override
76-
// public void foreach(JSONObject o) {
77-
// int total = o.getInteger("total");
78-
// sum = sum + total;
79-
// o.put("sum(total)", sum);
80-
// }
81-
// }).toPrint().start();
82-
// }
83-
8455
}

rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/WindowFromMetaq.java renamed to rocketmq-streams-clients/src/test/java/org/apache/rocketmq/streams/client/windows/WindowFromRocketMQ.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import org.apache.rocketmq.streams.client.transform.DataStream;
2121
import org.junit.Test;
2222

23-
public class WindowFromMetaq extends AbstractWindowFireModeTest {
23+
public class WindowFromRocketMQ extends AbstractWindowFireModeTest {
2424

2525
String topic = "TOPIC_DIPPER_SYSTEM_MSG_5";
2626
@Test
@@ -44,12 +44,12 @@ public void testWindowFireMode2() {
4444

4545

4646
@Test
47-
public void testWindowToMetaq() throws InterruptedException {
47+
public void testWindowToRocketMQ() throws InterruptedException {
4848

4949
long start=System.currentTimeMillis();
5050
StreamBuilder.dataStream("namespace", "name")
5151
.fromFile("/Users/yuanxiaodong/chris/sls_10.txt", true)
52-
.toMetaq(topic)
52+
.toRocketmq(topic, "chris1", "", "", "", "", "")
5353
.start();
5454
}
5555

rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/sink/AbstractSupportShuffleSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public abstract class AbstractSupportShuffleSink extends AbstractSink {
2727
protected volatile transient boolean hasCreated = false;
2828

2929
/**
30-
* 获取sink的主题,在sls中是logStore,metaq是topic
30+
* 获取sink的主题,在sls中是logStore,RocketMQ是topic
3131
*
3232
* @return
3333
*/

rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/configure/ConfigureFileKey.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -123,14 +123,14 @@ static String getDipperJdbcPassword() {
123123
}
124124

125125
/**
126-
* shuffle相关配置,如果后面加上.namespace,则只对某个namespace生效,如window.shuffle.channel.type.namespace=metaq,相当于只对这个namespace配置
126+
* shuffle相关配置,如果后面加上.namespace,则只对某个namespace生效,如window.shuffle.channel.type.namespace=rocketmq,相当于只对这个namespace配置
127127
*/
128128

129129
String WINDOW_SHUFFLE_CHANNEL_TYPE = "window.shuffle.channel.type";//window 做shuffle中转需要的消息队列类型
130-
//比如metaq,需要topic,tags和group,属性值和字段名保持一致即可。配置如下:window.shuffle.channel.topic=abdc window.shuffle.channel.tag=fdd
130+
//比如rocketmq,需要topic,tags和group,属性值和字段名保持一致即可。配置如下:window.shuffle.channel.topic=abdc window.shuffle.channel.tag=fdd
131131

132132
String WINDOW_SHUFFLE_CHANNEL_PROPERTY_PREFIX = "window.shuffle.channel.";
133-
String WINDOW_SYSTEM_MESSAGE_CHENNEL_OWNER = "window.system.message.channel.owner";//如果能做消息过滤,只过滤本window的消息,可以配置这个属性,如metaq的tags.不支持的会做客户端过滤
133+
String WINDOW_SYSTEM_MESSAGE_CHENNEL_OWNER = "window.system.message.channel.owner";//如果能做消息过滤,只过滤本window的消息,可以配置这个属性,如rocketmq的tags.不支持的会做客户端过滤
134134

135135
/**
136136
* 通知相关

rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/debug/DebugWriter.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,19 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
117
package org.apache.rocketmq.streams.window.debug;
218

319
import com.alibaba.fastjson.JSONObject;

rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/EventTimeManager.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,19 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
117
package org.apache.rocketmq.streams.window.fire;
218

319
import java.util.HashMap;

rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/SplitEventTimeManager.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,19 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
117
package org.apache.rocketmq.streams.window.fire;
218

319
import java.util.ArrayList;

rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/fire/WindowFireManager.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,19 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
117
package org.apache.rocketmq.streams.window.fire;
218

319
import java.util.concurrent.ConcurrentHashMap;

rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/state/WindowBaseValue.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public class WindowBaseValue extends BasedConfigurable implements Serializable {
4242
protected String windowInstanceId;
4343

4444
/**
45-
* 分片信息(metaQ里是queue
45+
* 分片信息(RocketMQ里是queue
4646
*/
4747
protected String partition;
4848

rocketmq-streams-window/src/main/resources/dipper.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ dipper.rds.table.name=dipper_sql_configure
88

99
#window 异步消息分发需要的消息队列类型
1010
dipper.window.shuffle.rocketmq.dispatch.channel.type=rocketmq
11-
#如果是metaq,统计和join分别用的topoc
11+
#如果是RocketMQ,统计和join分别用的topoc
1212
dipper.window.shuffle.rocketmq.dispatch.channel.topic=TOPIC_DIPPER_WINDOW_STATISTICS
1313
#动态生成的消息队列的属性的key值,这个值会被动态赋值
1414
dipper.window.shuffle.rocketmq.dispatch.channel.dynamic.property=tag

0 commit comments

Comments
 (0)