Skip to content

Commit aaabe19

Browse files
authored
Merge pull request #39 from ni-ze/main
fix some error and make a code style.
2 parents eaad138 + 8582d04 commit aaabe19

File tree

41 files changed

+125
-141
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+125
-141
lines changed

rocketmq-streams-channel-db/src/main/java/org/apache/rocketmq/streams/db/sink/SelfMultiTableSink.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,14 @@
1717
package org.apache.rocketmq.streams.db.sink;
1818

1919
import org.apache.rocketmq.streams.common.channel.split.ISplit;
20-
import org.apache.rocketmq.streams.common.configurable.IAfterConfiguableRefreshListerner;
20+
import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener;
2121
import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
2222
import org.apache.rocketmq.streams.common.context.IMessage;
2323
import org.apache.rocketmq.streams.common.functions.MultiTableSplitFunction;
2424
import org.apache.rocketmq.streams.common.utils.Base64Utils;
2525
import org.apache.rocketmq.streams.common.utils.InstantiationUtil;
2626

27-
public class SelfMultiTableSink extends AbstractMultiTableSink implements IAfterConfiguableRefreshListerner {
27+
public class SelfMultiTableSink extends AbstractMultiTableSink implements IAfterConfigurableRefreshListener {
2828
protected String multiTableSplitFunctionSerializeValue;//用户自定义的operator的序列化字节数组,做了base64解码
2929
protected transient MultiTableSplitFunction<IMessage> multiTableSplitFunction;
3030

rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/source/RocketMQSource.java

Lines changed: 34 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@
1818
package org.apache.rocketmq.streams.source;
1919

2020
import com.alibaba.fastjson.JSONObject;
21+
2122
import java.util.HashMap;
2223
import java.util.Iterator;
2324
import java.util.Map;
2425
import java.util.UUID;
26+
2527
import org.apache.commons.logging.Log;
2628
import org.apache.commons.logging.LogFactory;
2729
import org.apache.rocketmq.client.AccessChannel;
@@ -54,6 +56,7 @@
5456
import java.util.HashSet;
5557
import java.util.List;
5658
import java.util.Set;
59+
5760
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
5861

5962
public class RocketMQSource extends AbstractSupportOffsetResetSource {
@@ -72,12 +75,12 @@ public class RocketMQSource extends AbstractSupportOffsetResetSource {
7275
protected String namesrvAddr;
7376

7477
protected transient ConsumeFromWhere consumeFromWhere;//默认从哪里消费,不会被持久化。不设置默认从尾部消费
75-
protected transient String consumerOffset;//从哪里开始消费
78+
protected transient String consumeTimestamp;//从哪里开始消费
7679

77-
public RocketMQSource() {}
80+
public RocketMQSource() {
81+
}
7882

79-
public RocketMQSource(String topic, String tags, String groupName, String endpoint,
80-
String namesrvAddr, String accessKey, String secretKey, String instanceId) {
83+
public RocketMQSource(String topic, String tags, String groupName, String namesrvAddr) {
8184
this.topic = topic;
8285
this.tags = tags;
8386
this.groupName = groupName;
@@ -93,7 +96,7 @@ protected boolean initConfigurable() {
9396
protected boolean startSource() {
9497
try {
9598
destroyConsumer();
96-
consumer=startConsumer();
99+
consumer = startConsumer();
97100
return true;
98101
} catch (Exception e) {
99102
setInitSuccess(false);
@@ -108,22 +111,20 @@ protected DefaultMQPushConsumer startConsumer() {
108111
if (pullIntervalMs != null) {
109112
consumer.setPullInterval(pullIntervalMs);
110113
}
111-
// consumer.setConsumeThreadMax(maxThread);
112-
// consumer.setConsumeThreadMin(maxThread);
113114

114-
consumer.setPersistConsumerOffsetInterval((int)this.checkpointTime);
115+
consumer.setPersistConsumerOffsetInterval((int) this.checkpointTime);
115116
consumer.setConsumeMessageBatchMaxSize(maxFetchLogGroupSize);
116117
consumer.setAccessChannel(AccessChannel.CLOUD);
117118
consumer.setNamesrvAddr(this.namesrvAddr);
118119
if (consumeFromWhere != null) {
119-
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
120-
if (consumerOffset != null) {
121-
consumer.setConsumeTimestamp(consumerOffset);
120+
consumer.setConsumeFromWhere(consumeFromWhere);
121+
if (consumeTimestamp != null) {
122+
consumer.setConsumeTimestamp(consumeTimestamp);
122123
}
123124
}
124125

125126
consumer.subscribe(topic, tags);
126-
consumer.registerMessageListener((MessageListenerOrderly)(msgs, context) -> {
127+
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
127128
try {
128129
int i = 0;
129130
for (MessageExt msg : msgs) {
@@ -143,7 +144,7 @@ protected DefaultMQPushConsumer startConsumer() {
143144
i++;
144145
}
145146
} catch (Exception e) {
146-
LOG.error("消费rocketmq报错:" + e, e);
147+
LOG.error("consume message from rocketmq error " + e, e);
147148
}
148149

149150
return ConsumeOrderlyStatus.SUCCESS;// 返回消费成功
@@ -159,8 +160,9 @@ protected DefaultMQPushConsumer startConsumer() {
159160
throw new RuntimeException("start rocketmq channel error " + topic, e);
160161
}
161162
}
163+
162164
@Override
163-
public List<ISplit> getAllSplits(){
165+
public List<ISplit> getAllSplits() {
164166
try {
165167
Set<MessageQueue> rocketmqQueueSet = consumer.fetchSubscribeMessageQueues(this.topic);
166168
List<ISplit> queueList = new ArrayList<>();
@@ -176,31 +178,32 @@ public List<ISplit> getAllSplits(){
176178
return queueList;
177179
} catch (MQClientException e) {
178180
e.printStackTrace();
179-
throw new RuntimeException("get all splits error ",e);
181+
throw new RuntimeException("get all splits error ", e);
180182
}
181183
}
182184

183185

184186
@Override
185-
public Map<String,List<ISplit>> getWorkingSplitsGroupByInstances(){
187+
public Map<String, List<ISplit>> getWorkingSplitsGroupByInstances() {
186188
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt();
187189
defaultMQAdminExt.setVipChannelEnabled(false);
188190
defaultMQAdminExt.setAdminExtGroup(UUID.randomUUID().toString());
189191
defaultMQAdminExt.setInstanceName(this.consumer.getInstanceName());
190192
try {
191193
defaultMQAdminExt.start();
194+
192195
Map<org.apache.rocketmq.common.message.MessageQueue, String> queue2Instances= getMessageQueueAllocationResult(defaultMQAdminExt,this.groupName);
193196
Map<String,List<ISplit>> instanceOwnerQueues=new HashMap<>();
194197
for(org.apache.rocketmq.common.message.MessageQueue messageQueue:queue2Instances.keySet()){
195198
RocketMQMessageQueue rocketmqMessageQueue = new RocketMQMessageQueue(new MessageQueue(messageQueue.getTopic(),messageQueue.getBrokerName(),messageQueue.getQueueId()));
196199
if(isNotDataSplit(rocketmqMessageQueue.getQueueId())){
197200
continue;
198201
}
199-
String instanceName=queue2Instances.get(messageQueue);
200-
List<ISplit> splits=instanceOwnerQueues.get(instanceName);
201-
if(splits==null){
202-
splits=new ArrayList<>();
203-
instanceOwnerQueues.put(instanceName,splits);
202+
String instanceName = queue2Instances.get(messageQueue);
203+
List<ISplit> splits = instanceOwnerQueues.get(instanceName);
204+
if (splits == null) {
205+
splits = new ArrayList<>();
206+
instanceOwnerQueues.put(instanceName, splits);
204207
}
205208
splits.add(rocketmqMessageQueue);
206209
}
@@ -212,21 +215,22 @@ public Map<String,List<ISplit>> getWorkingSplitsGroupByInstances(){
212215
defaultMQAdminExt.shutdown();
213216
}
214217
}
218+
215219
protected Map<org.apache.rocketmq.common.message.MessageQueue, String> getMessageQueueAllocationResult(DefaultMQAdminExt defaultMQAdminExt, String groupName) {
216220
HashMap results = new HashMap();
217221

218222
try {
219223
ConsumerConnection consumerConnection = defaultMQAdminExt.examineConsumerConnectionInfo(groupName);
220224
Iterator var5 = consumerConnection.getConnectionSet().iterator();
221225

222-
while(var5.hasNext()) {
223-
Connection connection = (Connection)var5.next();
226+
while (var5.hasNext()) {
227+
Connection connection = (Connection) var5.next();
224228
String clientId = connection.getClientId();
225229
ConsumerRunningInfo consumerRunningInfo = defaultMQAdminExt.getConsumerRunningInfo(groupName, clientId, false);
226230
Iterator var9 = consumerRunningInfo.getMqTable().keySet().iterator();
227231

228-
while(var9.hasNext()) {
229-
org.apache.rocketmq.common.message.MessageQueue messageQueue = (org.apache.rocketmq.common.message.MessageQueue)var9.next();
232+
while (var9.hasNext()) {
233+
org.apache.rocketmq.common.message.MessageQueue messageQueue = (org.apache.rocketmq.common.message.MessageQueue) var9.next();
230234
results.put(messageQueue, clientId.split("@")[1]);
231235
}
232236
}
@@ -309,7 +313,7 @@ public boolean supportOffsetRest() {
309313

310314
public void destroyConsumer() {
311315
List<DefaultMQPushConsumer> oldConsumers = new ArrayList<>();
312-
if(consumer!=null){
316+
if (consumer != null) {
313317
oldConsumers.add(consumer);
314318
}
315319
try {
@@ -367,11 +371,11 @@ public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) {
367371
this.consumeFromWhere = consumeFromWhere;
368372
}
369373

370-
public String getConsumerOffset() {
371-
return consumerOffset;
374+
public String getConsumeTimestamp() {
375+
return consumeTimestamp;
372376
}
373377

374-
public void setConsumerOffset(String consumerOffset) {
375-
this.consumerOffset = consumerOffset;
378+
public void setConsumeTimestamp(String consumeTimestamp) {
379+
this.consumeTimestamp = consumeTimestamp;
376380
}
377381
}

rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/source/DataStreamSource.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,6 @@
1717

1818
package org.apache.rocketmq.streams.client.source;
1919

20-
import com.google.common.collect.Sets;
21-
22-
import java.util.Set;
23-
2420
import org.apache.rocketmq.streams.client.transform.DataStream;
2521
import org.apache.rocketmq.streams.common.channel.impl.file.FileSource;
2622
import org.apache.rocketmq.streams.common.channel.source.ISource;
@@ -29,11 +25,9 @@
2925

3026
public class DataStreamSource {
3127
protected PipelineBuilder mainPipelineBuilder;
32-
protected Set<PipelineBuilder> otherPipelineBuilders;
3328

3429
public DataStreamSource(String namespace, String pipelineName) {
3530
this.mainPipelineBuilder = new PipelineBuilder(namespace, pipelineName);
36-
this.otherPipelineBuilders = Sets.newHashSet();
3731
}
3832

3933
public static DataStreamSource create(String namespace, String pipelineName) {
@@ -48,7 +42,7 @@ public DataStream fromFile(String filePath, Boolean isJsonData) {
4842
FileSource fileChannel = new FileSource(filePath);
4943
fileChannel.setJsonData(isJsonData);
5044
this.mainPipelineBuilder.setSource(fileChannel);
51-
return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, null);
45+
return new DataStream(this.mainPipelineBuilder, null);
5246
}
5347

5448
public DataStream fromRocketmq(String topic, String groupName, String namesrvAddress) {
@@ -67,12 +61,12 @@ public DataStream fromRocketmq(String topic, String groupName, String tags, bool
6761
rocketMQSource.setJsonData(isJson);
6862
rocketMQSource.setNamesrvAddr(namesrvAddress);
6963
this.mainPipelineBuilder.setSource(rocketMQSource);
70-
return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, null);
64+
return new DataStream(this.mainPipelineBuilder, null);
7165
}
7266

7367
public DataStream from(ISource<?> source) {
7468
this.mainPipelineBuilder.setSource(source);
75-
return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, null);
69+
return new DataStream(this.mainPipelineBuilder,null);
7670
}
7771

7872
}

rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/DataStream.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,11 @@ public DataStream(String namespace, String pipelineName) {
6666
this.otherPipelineBuilders = Sets.newHashSet();
6767
}
6868

69+
public DataStream(PipelineBuilder pipelineBuilder, ChainStage<?> currentChainStage) {
70+
this.mainPipelineBuilder = pipelineBuilder;
71+
this.currentChainStage = currentChainStage;
72+
}
73+
6974
public DataStream(PipelineBuilder pipelineBuilder, Set<PipelineBuilder> pipelineBuilders, ChainStage<?> currentChainStage) {
7075
this.mainPipelineBuilder = pipelineBuilder;
7176
this.otherPipelineBuilders = pipelineBuilders;
@@ -75,6 +80,7 @@ public DataStream(PipelineBuilder pipelineBuilder, Set<PipelineBuilder> pipeline
7580
public DataStream script(String script) {
7681
ChainStage<?> stage = this.mainPipelineBuilder.createStage(new ScriptOperator(script));
7782
this.mainPipelineBuilder.setTopologyStages(currentChainStage, stage);
83+
7884
return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, stage);
7985
}
8086

@@ -358,6 +364,8 @@ protected void start(boolean isAsyn) {
358364
if (this.mainPipelineBuilder == null) {
359365
return;
360366
}
367+
368+
361369
ConfigurableComponent configurableComponent = ComponentCreator.getComponent(mainPipelineBuilder.getPipelineNameSpace(), ConfigurableComponent.class, ConfigureFileKey.CONNECT_TYPE + ":memory");
362370
ChainPipeline pipeline = this.mainPipelineBuilder.build(configurableComponent.getService());
363371
pipeline.startChannel();

rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/memory/MemorySink.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@
2121
import java.util.concurrent.atomic.AtomicLong;
2222
import org.apache.rocketmq.streams.common.channel.sink.AbstractSupportShuffleSink;
2323
import org.apache.rocketmq.streams.common.channel.split.ISplit;
24-
import org.apache.rocketmq.streams.common.configurable.IAfterConfiguableRefreshListerner;
24+
import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener;
2525
import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
2626
import org.apache.rocketmq.streams.common.context.IMessage;
2727

28-
public class MemorySink extends AbstractSupportShuffleSink implements IAfterConfiguableRefreshListerner {
28+
public class MemorySink extends AbstractSupportShuffleSink implements IAfterConfigurableRefreshListener {
2929
/**
3030
* 是否启动qps的统计
3131
*/

rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/memory/MemorySource.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@
1717
package org.apache.rocketmq.streams.common.channel.impl.memory;
1818

1919
import org.apache.rocketmq.streams.common.channel.source.AbstractUnreliableSource;
20-
import org.apache.rocketmq.streams.common.configurable.IAfterConfiguableRefreshListerner;
20+
import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener;
2121
import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
2222

23-
public class MemorySource extends AbstractUnreliableSource implements IAfterConfiguableRefreshListerner {
23+
public class MemorySource extends AbstractUnreliableSource implements IAfterConfigurableRefreshListener {
2424

2525
protected String cacheName;
2626
protected transient MemoryCache memoryCache;
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
*/
1717
package org.apache.rocketmq.streams.common.configurable;
1818

19-
public interface IAfterConfiguableRefreshListerner {
19+
public interface IAfterConfigurableRefreshListener {
2020

2121
/**
2222
* 当configurable数据全部加载完成时,调用实现这个接口的configurable对象

rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/functions/FilterFunction.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,7 @@
1616
*/
1717
package org.apache.rocketmq.streams.common.functions;
1818

19-
import java.io.Serializable;
20-
21-
public interface FilterFunction<T> extends Function, Serializable {
19+
public interface FilterFunction<T> extends Function {
2220

2321
boolean filter(T value) throws Exception;
2422
}

rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/functions/FlatMapFunction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import java.io.Serializable;
2020
import java.util.List;
2121

22-
public interface FlatMapFunction <T, O> extends Function, Serializable {
22+
public interface FlatMapFunction <T, O> extends Function{
2323

2424
List<T> flatMap(O message) throws Exception;
2525
}

rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/functions/Function.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,6 @@
1616
*/
1717
package org.apache.rocketmq.streams.common.functions;
1818

19-
public interface Function extends java.io.Serializable {}
19+
public interface Function {
20+
21+
}

rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/functions/MapFunction.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,7 @@
1616
*/
1717
package org.apache.rocketmq.streams.common.functions;
1818

19-
import java.io.Serializable;
20-
21-
public interface MapFunction<T, O> extends Function, Serializable {
19+
public interface MapFunction<T, O> extends Function {
2220

2321
T map(O message) throws Exception;
2422
}

rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/functions/ReduceFunction.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,7 @@
1616
*/
1717
package org.apache.rocketmq.streams.common.functions;
1818

19-
import java.io.Serializable;
20-
21-
public interface ReduceFunction<R, O> extends Function, Serializable {
19+
public interface ReduceFunction<R, O> extends Function{
2220

2321
R reduce(R acccumulator, O msg);
2422
}

rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/topology/AbstractMutilPipelineChainPipline.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import org.apache.rocketmq.streams.common.channel.source.systemmsg.NewSplitMessage;
2626
import org.apache.rocketmq.streams.common.channel.source.systemmsg.RemoveSplitMessage;
2727
import org.apache.rocketmq.streams.common.checkpoint.CheckPointMessage;
28-
import org.apache.rocketmq.streams.common.configurable.IAfterConfiguableRefreshListerner;
28+
import org.apache.rocketmq.streams.common.configurable.IAfterConfigurableRefreshListener;
2929
import org.apache.rocketmq.streams.common.configurable.IConfigurableService;
3030
import org.apache.rocketmq.streams.common.context.AbstractContext;
3131
import org.apache.rocketmq.streams.common.context.Context;
@@ -40,7 +40,7 @@
4040
*
4141
* @param <T>
4242
*/
43-
public abstract class AbstractMutilPipelineChainPipline<T extends IMessage> extends ChainStage<T> implements IAfterConfiguableRefreshListerner {
43+
public abstract class AbstractMutilPipelineChainPipline<T extends IMessage> extends ChainStage<T> implements IAfterConfigurableRefreshListener {
4444
/**
4545
* pipeline name,这是一个汇聚节点,会有多个pipline,这里存的是pipline name
4646
*/

0 commit comments

Comments
 (0)