Skip to content

Commit 1a309af

Browse files
authored
Merge pull request #195 from ni-ze/Ibuilder
[ISSUE #196]fix(ChannelBuilder) add ChannelBuilder
2 parents a9fe443 + ec7be1b commit 1a309af

File tree

4 files changed

+73
-6
lines changed

4 files changed

+73
-6
lines changed

rocketmq-streams-channel-mqtt/pom.xml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,6 @@
2222
<groupId>org.apache.rocketmq</groupId>
2323
<artifactId>rocketmq-streams-commons</artifactId>
2424
</dependency>
25-
<!-- <dependency>-->
26-
<!-- <groupId>com.hivemq</groupId>-->
27-
<!-- <artifactId>hivemq-mqtt-client</artifactId>-->
28-
<!-- </dependency>-->
2925
<dependency>
3026
<groupId>org.eclipse.paho</groupId>
3127
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>

rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/impl/CollectionSinkBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
@ServiceName(value = CollectionSinkBuilder.TYPE)
3232
public class CollectionSinkBuilder implements IChannelBuilder {
3333

34-
public static final String TYPE = "collection";
34+
public static final String TYPE = "CollectionSink";
3535

3636
@Override
3737
public ISource createSource(String namespace, String name, Properties properties, MetaData metaData) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.rocketmq.streams.common.channel.impl;
19+
20+
import com.google.auto.service.AutoService;
21+
import org.apache.rocketmq.streams.common.channel.builder.AbstractSupportShuffleChannelBuilder;
22+
import org.apache.rocketmq.streams.common.channel.builder.IChannelBuilder;
23+
import org.apache.rocketmq.streams.common.channel.impl.memory.MemoryChannel;
24+
import org.apache.rocketmq.streams.common.channel.sink.ISink;
25+
import org.apache.rocketmq.streams.common.channel.source.ISource;
26+
import org.apache.rocketmq.streams.common.metadata.MetaData;
27+
import org.apache.rocketmq.streams.common.model.ServiceName;
28+
29+
import java.util.Properties;
30+
31+
/*
32+
* Licensed to the Apache Software Foundation (ASF) under one or more
33+
* contributor license agreements. See the NOTICE file distributed with
34+
* this work for additional information regarding copyright ownership.
35+
* The ASF licenses this file to You under the Apache License, Version 2.0
36+
* (the "License"); you may not use this file except in compliance with
37+
* the License. You may obtain a copy of the License at
38+
*
39+
* http://www.apache.org/licenses/LICENSE-2.0
40+
*
41+
* Unless required by applicable law or agreed to in writing, software
42+
* distributed under the License is distributed on an "AS IS" BASIS,
43+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
44+
* See the License for the specific language governing permissions and
45+
* limitations under the License.
46+
*/
47+
48+
@AutoService(IChannelBuilder.class)
49+
@ServiceName(value = CollectionSourceBuilder.TYPE)
50+
public class CollectionSourceBuilder extends AbstractSupportShuffleChannelBuilder {
51+
public static final String TYPE = "CollectionSource";
52+
53+
@Override
54+
public ISource<?> createSource(String namespace, String name, Properties properties, MetaData metaData) {
55+
return new MemoryChannel();
56+
}
57+
58+
@Override
59+
public String getType() {
60+
return TYPE;
61+
}
62+
63+
@Override
64+
public ISink<?> createSink(String namespace, String name, Properties properties, MetaData metaData) {
65+
return new MemoryChannel();
66+
}
67+
68+
@Override
69+
public ISink<?> createBySource(ISource<?> pipelineSource) {
70+
return new MemoryChannel();
71+
}
72+
}

rocketmq-streams-serviceloader/pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
<name>ROCKETMQ STREAMS :: serviceloader</name>
3030
<dependencies>
3131

32-
<!-- 测试依赖 -->
3332
<dependency>
3433
<groupId>org.apache.rocketmq</groupId>
3534
<artifactId>rocketmq-streams-commons</artifactId>

0 commit comments

Comments
 (0)