diff --git a/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarPropertiesTests.java b/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarPropertiesTests.java index ea7e452ad..66131d7af 100644 --- a/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarPropertiesTests.java +++ b/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarPropertiesTests.java @@ -19,15 +19,20 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; import org.apache.pulsar.client.api.HashingScheme; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.ProducerAccessMode; import org.apache.pulsar.client.api.ProducerCryptoFailureAction; +import org.apache.pulsar.client.api.RegexSubscriptionMode; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.SubscriptionType; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; @@ -58,11 +63,11 @@ private void bind(Map map) { @Nested class AdminPropertiesTests { - private String authPluginClassName = "org.apache.pulsar.client.impl.auth.AuthenticationToken"; + private final String authPluginClassName = "org.apache.pulsar.client.impl.auth.AuthenticationToken"; - private String authParamsStr = "{\"token\":\"1234\"}"; + private final String authParamsStr = "{\"token\":\"1234\"}"; - private String authToken = "1234"; + private final String authToken = "1234"; @Test void authenticationUsingAuthParamsString() { @@ -98,9 +103,8 @@ void authenticationNotAllowedUsingBothAuthParamsStringAndAuthenticationMap() { props.put("spring.pulsar.administration.auth-params", authParamsStr); props.put("spring.pulsar.administration.authentication.token", authToken); bind(props); - assertThatIllegalArgumentException().isThrownBy(() -> properties.buildAdminProperties()) - .withMessageContaining( - "Cannot set both spring.pulsar.administration.authParams and spring.pulsar.administration.authentication.*"); + assertThatIllegalArgumentException().isThrownBy(properties::buildAdminProperties).withMessageContaining( + "Cannot set both spring.pulsar.administration.authParams and spring.pulsar.administration.authentication.*"); } } @@ -145,4 +149,64 @@ void producerProperties() { } + @Nested + class ConsumerPropertiesTests { + + @Test + @SuppressWarnings("unchecked") + void consumerProperties() { + Map props = new HashMap<>(); + props.put("spring.pulsar.consumer.topics[0]", "my-topic"); + props.put("spring.pulsar.consumer.topics-pattern", "my-pattern"); + props.put("spring.pulsar.consumer.subscription-name", "my-subscription"); + props.put("spring.pulsar.consumer.subscription-type", "Shared"); + props.put("spring.pulsar.consumer.receiver-queue-size", "1"); + props.put("spring.pulsar.consumer.acknowledgements-group-time", "2s"); + props.put("spring.pulsar.consumer.negative-ack-redelivery-delay", "3s"); + props.put("spring.pulsar.consumer.max-total-receiver-queue-size-across-partitions", "5"); + props.put("spring.pulsar.consumer.consumer-name", "my-consumer"); + props.put("spring.pulsar.consumer.ack-timeout", "6s"); + props.put("spring.pulsar.consumer.tick-duration", "7s"); + props.put("spring.pulsar.consumer.priority-level", "8"); + props.put("spring.pulsar.consumer.crypto-failure-action", "DISCARD"); + props.put("spring.pulsar.consumer.properties[my-prop]", "my-prop-value"); + props.put("spring.pulsar.consumer.read-compacted", "true"); + props.put("spring.pulsar.consumer.subscription-initial-position", "Earliest"); + props.put("spring.pulsar.consumer.pattern-auto-discovery-period", "9"); + props.put("spring.pulsar.consumer.regex-subscription-mode", "AllTopics"); + props.put("spring.pulsar.consumer.auto-update-partitions", "false"); + props.put("spring.pulsar.consumer.replicate-subscription-state", "true"); + props.put("spring.pulsar.consumer.auto-ack-oldest-chunked-message-on-queue-full", "false"); + props.put("spring.pulsar.consumer.max-pending-chunked-message", "11"); + props.put("spring.pulsar.consumer.expire-time-of-incomplete-chunked-message", "12s"); + + bind(props); + Map consumerProps = properties.buildConsumerProperties(); + + assertThat(consumerProps) + .hasEntrySatisfying("topicNames", + n -> assertThat((Collection) n).containsExactly("my-topic")) + .hasEntrySatisfying("topicsPattern", p -> assertThat(p.toString()).isEqualTo("my-pattern")) + .containsEntry("subscriptionName", "my-subscription") + .containsEntry("subscriptionType", SubscriptionType.Shared).containsEntry("receiverQueueSize", 1) + .containsEntry("acknowledgementsGroupTimeMicros", 2_000_000L) + .containsEntry("negativeAckRedeliveryDelayMicros", 3_000_000L) + .containsEntry("maxTotalReceiverQueueSizeAcrossPartitions", 5) + .containsEntry("consumerName", "my-consumer").containsEntry("ackTimeoutMillis", 6_000L) + .containsEntry("tickDurationMillis", 7_000L).containsEntry("priorityLevel", 8) + .containsEntry("cryptoFailureAction", ConsumerCryptoFailureAction.DISCARD) + .hasEntrySatisfying("properties", + p -> assertThat((Map) p).containsEntry("my-prop", "my-prop-value")) + .containsEntry("readCompacted", true) + .containsEntry("subscriptionInitialPosition", SubscriptionInitialPosition.Earliest) + .containsEntry("patternAutoDiscoveryPeriod", 9) + .containsEntry("regexSubscriptionMode", RegexSubscriptionMode.AllTopics) + .containsEntry("autoUpdatePartitions", false).containsEntry("replicateSubscriptionState", true) + .containsEntry("autoAckOldestChunkedMessageOnQueueFull", false) + .containsEntry("maxPendingChunkedMessage", 11) + .containsEntry("expireTimeOfIncompleteChunkedMessageMillis", 12_000L); + } + + } + }