Skip to content

Commit 0c9a8ce

Browse files
authored
Fix npe two streams & fix joinwindow's production donot be conducted by mainpipeline's following stages (apache#104)
* fix NPE in two streams case * init otherPipeline first to avoid config be overwrote, then join window's production will be conduct by mainPipeline's following stages
1 parent bd357f4 commit 0c9a8ce

File tree

2 files changed

+6
-3
lines changed

2 files changed

+6
-3
lines changed

rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/DataStreamAction.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,14 +99,17 @@ protected void start(boolean isAsync) {
9999
}
100100

101101
ConfigurableComponent configurableComponent = ComponentCreator.getComponent(mainPipelineBuilder.getPipelineNameSpace(), ConfigurableComponent.class, kvs);
102-
ChainPipeline pipeline = this.mainPipelineBuilder.build(configurableComponent.getService());
103-
pipeline.startChannel();
102+
104103
if (this.otherPipelineBuilders != null) {
105104
for (PipelineBuilder builder : otherPipelineBuilders) {
106105
ChainPipeline otherPipeline = builder.build(configurableComponent.getService());
107106
otherPipeline.startChannel();
108107
}
109108
}
109+
110+
ChainPipeline pipeline = this.mainPipelineBuilder.build(configurableComponent.getService());
111+
pipeline.startChannel();
112+
110113
if (isAsync) {
111114
return;
112115
}

rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/source/DataStreamSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ public DataStream fromRocketmq(String topic, String groupName, String tags, bool
107107
rocketMQSource.setJsonData(isJson);
108108
rocketMQSource.setNamesrvAddr(namesrvAddress);
109109
this.mainPipelineBuilder.setSource(rocketMQSource);
110-
return new DataStream(this.mainPipelineBuilder, null);
110+
return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, null);
111111
}
112112

113113
public DataStream fromMultipleDB(String url, String userName, String password, String tablePattern) {

0 commit comments

Comments
 (0)