diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-demo/pom.xml b/rocketmq-spring-boot-samples/rocketmq-consume-demo/pom.xml index 6f7fdf19..99fe1d67 100644 --- a/rocketmq-spring-boot-samples/rocketmq-consume-demo/pom.xml +++ b/rocketmq-spring-boot-samples/rocketmq-consume-demo/pom.xml @@ -23,7 +23,7 @@ org.apache.rocketmq rocketmq-spring-boot-samples - 2.2.1-SNAPSHOT + 2.2.2-SNAPSHOT rocketmq-consume-demo diff --git a/rocketmq-spring-boot-samples/rocketmq-produce-demo/pom.xml b/rocketmq-spring-boot-samples/rocketmq-produce-demo/pom.xml index c9d121d4..9473bcd8 100644 --- a/rocketmq-spring-boot-samples/rocketmq-produce-demo/pom.xml +++ b/rocketmq-spring-boot-samples/rocketmq-produce-demo/pom.xml @@ -23,7 +23,7 @@ org.apache.rocketmq rocketmq-spring-boot-samples - 2.2.1-SNAPSHOT + 2.2.2-SNAPSHOT rocketmq-produce-demo diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQConsumerConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQConsumerConfiguration.java index 84d641a3..0390fecb 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQConsumerConfiguration.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQConsumerConfiguration.java @@ -110,4 +110,9 @@ * The name value of message trace topic.If you don't config,you can use the default trace topic name. */ String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER; + + /** + * The namespace of consumer. + */ + String namespace() default ""; } \ No newline at end of file diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQTemplateConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQTemplateConfiguration.java index 8e801472..090100cb 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQTemplateConfiguration.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQTemplateConfiguration.java @@ -89,4 +89,8 @@ * The property of "tlsEnable" default false. */ String tlsEnable() default "false"; + /** + * The namespace of producer. + */ + String namespace() default ""; } diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java index e80f3284..ecc3e0e4 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java @@ -127,4 +127,9 @@ * The property of "tlsEnable" default false. */ String tlsEnable() default "false"; + + /** + * The namespace of consumer. + */ + String namespace() default ""; } diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtConsumerResetConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtConsumerResetConfiguration.java index f2ece8a1..b1e9288b 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtConsumerResetConfiguration.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtConsumerResetConfiguration.java @@ -132,6 +132,7 @@ private DefaultLitePullConsumer createConsumer(ExtRocketMQConsumerConfiguration groupName, topicName, messageModel, selectorType, selectorExpression, ak, sk, pullBatchSize, useTLS); litePullConsumer.setEnableMsgTrace(annotation.enableMsgTrace()); litePullConsumer.setCustomizedTraceTopic(resolvePlaceholders(annotation.customizedTraceTopic(), consumerConfig.getCustomizedTraceTopic())); + litePullConsumer.setNamespace(annotation.namespace()); return litePullConsumer; } diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java index ca304e13..ed9fb7d6 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java @@ -129,7 +129,7 @@ private DefaultMQProducer createProducer(ExtRocketMQTemplateConfiguration annota producer.setCompressMsgBodyOverHowmuch(annotation.compressMessageBodyThreshold() == -1 ? producerConfig.getCompressMessageBodyThreshold() : annotation.compressMessageBodyThreshold()); producer.setRetryAnotherBrokerWhenNotStoreOK(annotation.retryNextServer()); producer.setUseTLS(useTLS); - + producer.setNamespace(annotation.namespace()); return producer; } diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java index ea00432e..b908950a 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java @@ -116,7 +116,7 @@ public DefaultMQProducer defaultMQProducer(RocketMQProperties rocketMQProperties producer.setCompressMsgBodyOverHowmuch(producerConfig.getCompressMessageBodyThreshold()); producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryNextServer()); producer.setUseTLS(producerConfig.isTlsEnable()); - + producer.setNamespace(producerConfig.getNamespace()); return producer; } @@ -146,6 +146,7 @@ public DefaultLitePullConsumer defaultLitePullConsumer(RocketMQProperties rocket groupName, topicName, messageModel, selectorType, selectorExpression, ak, sk, pullBatchSize, useTLS); litePullConsumer.setEnableMsgTrace(consumerConfig.isEnableMsgTrace()); litePullConsumer.setCustomizedTraceTopic(consumerConfig.getCustomizedTraceTopic()); + litePullConsumer.setNamespace(consumerConfig.getNamespace()); return litePullConsumer; } diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java index d6c8a906..974bec9d 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java @@ -80,6 +80,11 @@ public static class Producer { */ private String group; + /** + * Namespace for this MQ Producer instance. + */ + private String namespace; + /** * Millis of send message timeout. */ @@ -232,6 +237,14 @@ public boolean isTlsEnable() { public void setTlsEnable(boolean tlsEnable) { this.tlsEnable = tlsEnable; } + + public String getNamespace() { + return namespace; + } + + public void setNamespace(String namespace) { + this.namespace = namespace; + } } public Consumer getConsumer() { @@ -248,6 +261,11 @@ public static final class Consumer { */ private String group; + /** + * Namespace for this MQ Consumer instance. + */ + private String namespace; + /** * Topic name of consumer. */ @@ -403,6 +421,14 @@ public boolean isTlsEnable() { public void setTlsEnable(boolean tlsEnable) { this.tlsEnable = tlsEnable; } + + public String getNamespace() { + return namespace; + } + + public void setNamespace(String namespace) { + this.namespace = namespace; + } } } \ No newline at end of file diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java index 2a42fe47..f48788e0 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java @@ -124,6 +124,7 @@ public class DefaultRocketMQListenerContainer implements InitializingBean, private int maxReconsumeTimes; private int replyTimeout; private String tlsEnable; + private String namespace; public long getSuspendCurrentQueueTimeMillis() { return suspendCurrentQueueTimeMillis; @@ -226,6 +227,7 @@ public void setRocketMQMessageListener(RocketMQMessageListener anno) { this.maxReconsumeTimes = anno.maxReconsumeTimes(); this.replyTimeout = anno.replyTimeout(); this.tlsEnable = anno.tlsEnable(); + this.namespace = anno.namespace(); } public ConsumeMode getConsumeMode() { @@ -256,6 +258,14 @@ public void setTlsEnable(String tlsEnable) { this.tlsEnable = tlsEnable; } + public String getNamespace() { + return namespace; + } + + public void setNamespace(String namespace) { + this.namespace = namespace; + } + public DefaultMQPushConsumer getConsumer() { return consumer; } @@ -344,13 +354,14 @@ public void setApplicationContext(ApplicationContext applicationContext) throws public String toString() { return "DefaultRocketMQListenerContainer{" + "consumerGroup='" + consumerGroup + '\'' + + ", namespace='" + namespace + '\'' + ", nameServer='" + nameServer + '\'' + ", topic='" + topic + '\'' + ", consumeMode=" + consumeMode + ", selectorType=" + selectorType + ", selectorExpression='" + selectorExpression + '\'' + - ", messageModel=" + messageModel + '\'' + - ", tlsEnable=" + tlsEnable + + ", messageModel=" + messageModel + '\'' + + ", tlsEnable=" + tlsEnable + '}'; } @@ -579,7 +590,7 @@ private void initRocketMQPushConsumer() throws MQClientException { this.applicationContext.getEnvironment(). resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic())); } - + consumer.setNamespace(namespace); consumer.setInstanceName(RocketMQUtil.getInstanceName(nameServer)); String customizedNameServer = this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.nameServer());