Skip to content

Commit eee0b2f

Browse files
zach-schoenbergergaryrussell
authored andcommitted
GH-699: Fix Batch Listener with Kotlin
Fixes #699 showing issue with batch listener, along with fix fixing checkstyle fixing illegal import
1 parent bcd6843 commit eee0b2f

File tree

2 files changed

+33
-4
lines changed

2 files changed

+33
-4
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,13 @@ else if (parameterizedType.getRawType().equals(List.class)
427427
Type paramType = parameterizedType.getActualTypeArguments()[0];
428428
this.isConsumerRecordList = paramType.equals(ConsumerRecord.class)
429429
|| (paramType instanceof ParameterizedType
430-
&& ((ParameterizedType) paramType).getRawType().equals(ConsumerRecord.class));
430+
&& ((ParameterizedType) paramType).getRawType().equals(ConsumerRecord.class)
431+
|| (paramType instanceof WildcardType
432+
&& ((WildcardType) paramType).getUpperBounds() != null
433+
&& ((WildcardType) paramType).getUpperBounds().length > 0
434+
&& ((WildcardType) paramType).getUpperBounds()[0] instanceof ParameterizedType
435+
&& ((ParameterizedType) ((WildcardType) paramType).getUpperBounds()[0]).getRawType().equals(ConsumerRecord.class))
436+
);
431437
boolean messageHasGeneric = paramType instanceof ParameterizedType
432438
&& ((ParameterizedType) paramType).getRawType().equals(Message.class);
433439
this.isMessageList = paramType.equals(Message.class) || messageHasGeneric;

spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinTests.kt

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616

1717
import org.apache.kafka.clients.consumer.ConsumerConfig
18+
import org.apache.kafka.clients.consumer.ConsumerRecord
1819
import org.apache.kafka.clients.producer.ProducerConfig
1920
import org.apache.kafka.common.serialization.StringDeserializer
2021
import org.apache.kafka.common.serialization.StringSerializer
@@ -59,13 +60,21 @@ class EnableKafkaKotlinTests {
5960
assertThat(this.config.received).isEqualTo("foo")
6061
}
6162

63+
@Test
64+
fun `test batch listener`() {
65+
this.template.send("kotlinTestTopic", "foo")
66+
assertThat(this.config.latch.await(10, TimeUnit.SECONDS)).isTrue()
67+
assertThat(this.config.batchReceived).isEqualTo("foo")
68+
}
69+
6270
@Configuration
6371
@EnableKafka
6472
class Config {
6573

6674
lateinit var received: String
75+
lateinit var batchReceived: String
6776

68-
val latch = CountDownLatch(1)
77+
val latch = CountDownLatch(2)
6978

7079
@Value("\${" + KafkaEmbedded.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
7180
private lateinit var brokerAddresses: String
@@ -82,7 +91,6 @@ class EnableKafkaKotlinTests {
8291
@Bean
8392
fun kcf(): ConsumerFactory<String, String> {
8493
val configs = HashMap<String, Any>()
85-
configs["foo"] = "bar"
8694
configs[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = this.brokerAddresses
8795
configs[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
8896
configs[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
@@ -104,12 +112,27 @@ class EnableKafkaKotlinTests {
104112
return factory
105113
}
106114

107-
@KafkaListener(id = "kotlin", topics = ["kotlinTestTopic"])
115+
@Bean
116+
fun kafkaBatchListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
117+
val factory: ConcurrentKafkaListenerContainerFactory<String, String>
118+
= ConcurrentKafkaListenerContainerFactory()
119+
factory.isBatchListener = true
120+
factory.consumerFactory = kcf()
121+
return factory
122+
}
123+
124+
@KafkaListener(id = "kotlin", topics = ["kotlinTestTopic"], containerFactory = "kafkaListenerContainerFactory")
108125
fun listen(value: String) {
109126
this.received = value
110127
this.latch.countDown()
111128
}
112129

130+
@KafkaListener(id = "kotlin-batch", topics = ["kotlinTestTopic"], containerFactory = "kafkaBatchListenerContainerFactory")
131+
fun batchListen(values: List<ConsumerRecord<String, String>>) {
132+
this.batchReceived = values.first().value()
133+
this.latch.countDown()
134+
}
135+
113136
}
114137

115138
}

0 commit comments

Comments
 (0)