Skip to content

Commit 5fa5630

Browse files
committed
GH-3618: Support CommonErrorHandler for suspend functions
Fixes GH-3618(#3618) * Setup async failure callback when CommonErrorHandler is explicitly configured * Add test for suspend function with CommonErrorHandler Signed-off-by: zoo-code <kyj20908@naver.com>
1 parent 19070c3 commit 5fa5630

File tree

2 files changed

+50
-1
lines changed

2 files changed

+50
-1
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -949,6 +949,16 @@ else if (listener instanceof MessageListener) {
949949
this.isConsumerAwareListener = listenerType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE)
950950
|| listenerType.equals(ListenerType.CONSUMER_AWARE);
951951
this.commonErrorHandler = determineCommonErrorHandler();
952+
// Setup async failure callback for suspend functions when CommonErrorHandler is explicitly configured
953+
if (getCommonErrorHandler() != null && this.listener != null) {
954+
MessageListener<?, ?> target = unwrapDelegateIfAny(this.listener);
955+
if (target instanceof RecordMessagingMessageListenerAdapter<?, ?>) {
956+
@SuppressWarnings("unchecked")
957+
RecordMessagingMessageListenerAdapter<K, V> adapter =
958+
(RecordMessagingMessageListenerAdapter<K, V>) target;
959+
adapter.setCallbackForAsyncFailure(this::callbackForAsyncFailure);
960+
}
961+
}
952962
Assert.state(!this.isBatchListener || !this.isRecordAck,
953963
"Cannot use AckMode.RECORD with a batch listener");
954964
if (this.containerProperties.getScheduler() != null) {

spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinCoroutinesTests.kt

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import org.springframework.kafka.core.DefaultKafkaConsumerFactory
3535
import org.springframework.kafka.core.DefaultKafkaProducerFactory
3636
import org.springframework.kafka.core.KafkaTemplate
3737
import org.springframework.kafka.core.ProducerFactory
38+
import org.springframework.kafka.listener.DefaultErrorHandler
3839
import org.springframework.kafka.listener.KafkaListenerErrorHandler
3940
import org.springframework.kafka.support.Acknowledgment
4041
import org.springframework.kafka.test.EmbeddedKafkaBroker
@@ -59,7 +60,8 @@ import java.util.concurrent.TimeUnit
5960
@SpringJUnitConfig
6061
@DirtiesContext
6162
@EmbeddedKafka(topics = ["kotlinAsyncTestTopic1", "kotlinAsyncTestTopic2",
62-
"kotlinAsyncBatchTestTopic1", "kotlinAsyncBatchTestTopic2", "kotlinReplyTopic1"], partitions = 1)
63+
"kotlinAsyncBatchTestTopic1", "kotlinAsyncBatchTestTopic2", "kotlinReplyTopic1",
64+
"kotlinAsyncTestTopicCommonHandler"], partitions = 1)
6365
class EnableKafkaKotlinCoroutinesTests {
6466

6567
@Autowired
@@ -108,6 +110,13 @@ class EnableKafkaKotlinCoroutinesTests {
108110
assertThat(cr?.value() ?: "null").isEqualTo("FOO")
109111
}
110112

113+
@Test
114+
fun `test suspend function with CommonErrorHandler`() {
115+
this.template.send("kotlinAsyncTestTopicCommonHandler", "fail")
116+
assertThat(this.config.commonHandlerLatch.await(10, TimeUnit.SECONDS)).isTrue()
117+
assertThat(this.config.commonHandlerInvoked).isTrue()
118+
}
119+
111120
@KafkaListener(id = "sendTopic", topics = ["kotlinAsyncTestTopic3"],
112121
containerFactory = "kafkaListenerContainerFactory")
113122
class Listener {
@@ -138,6 +147,9 @@ class EnableKafkaKotlinCoroutinesTests {
138147
@Volatile
139148
var batchError: Boolean = false
140149

150+
@Volatile
151+
var commonHandlerInvoked: Boolean = false
152+
141153
val latch1 = CountDownLatch(1)
142154

143155
val latch2 = CountDownLatch(1)
@@ -146,6 +158,8 @@ class EnableKafkaKotlinCoroutinesTests {
146158

147159
val batchLatch2 = CountDownLatch(1)
148160

161+
val commonHandlerLatch = CountDownLatch(1)
162+
149163
@Value("\${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
150164
private lateinit var brokerAddresses: String
151165

@@ -217,6 +231,23 @@ class EnableKafkaKotlinCoroutinesTests {
217231
return factory
218232
}
219233

234+
@Bean
235+
fun commonErrorHandler(): DefaultErrorHandler {
236+
return DefaultErrorHandler { record, exception ->
237+
commonHandlerInvoked = true
238+
commonHandlerLatch.countDown()
239+
}
240+
}
241+
242+
@Bean
243+
fun kafkaListenerContainerFactoryWithCommonHandler(): ConcurrentKafkaListenerContainerFactory<String, String> {
244+
val factory: ConcurrentKafkaListenerContainerFactory<String, String>
245+
= ConcurrentKafkaListenerContainerFactory()
246+
factory.setConsumerFactory(kcf())
247+
factory.setCommonErrorHandler(commonErrorHandler())
248+
return factory
249+
}
250+
220251
@KafkaListener(id = "kotlin", topics = ["kotlinAsyncTestTopic1"],
221252
containerFactory = "kafkaListenerContainerFactory")
222253
suspend fun listen(value: String, acknowledgment: Acknowledgment) {
@@ -247,6 +278,14 @@ class EnableKafkaKotlinCoroutinesTests {
247278
}
248279
}
249280

281+
@KafkaListener(id = "kotlin-common-handler", topics = ["kotlinAsyncTestTopicCommonHandler"],
282+
containerFactory = "kafkaListenerContainerFactoryWithCommonHandler")
283+
suspend fun listenWithCommonHandler(value: String) {
284+
if (value == "fail") {
285+
throw RuntimeException("Test exception for CommonErrorHandler")
286+
}
287+
}
288+
250289
}
251290

252291
}

0 commit comments

Comments
 (0)