Skip to content

Commit 70e614a

Browse files
committed
GH-3618: Support CommonErrorHandler for async listeners
1 parent 5fa5630 commit 70e614a

File tree

3 files changed

+11
-16
lines changed

3 files changed

+11
-16
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@
178178
* @author Su Ko
179179
* @author Jinhui Kim
180180
* @author Minchul Son
181+
* @author Youngjoo Kim
181182
*/
182183
public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
183184
extends AbstractMessageListenerContainer<K, V> implements ConsumerPauseResumeEventPublisher {
@@ -935,10 +936,12 @@ else if (listener instanceof MessageListener) {
935936
KafkaBackoffAwareMessageListenerAdapter<K, V> genListener =
936937
(KafkaBackoffAwareMessageListenerAdapter<K, V>) this.genericListener;
937938
if (genListener.getDelegate() instanceof RecordMessagingMessageListenerAdapter<K, V> adapterListener) {
938-
// This means that the async retry feature is supported only for SingleRecordListener with @RetryableTopic.
939939
adapterListener.setCallbackForAsyncFailure(this::callbackForAsyncFailure);
940940
}
941941
}
942+
else if (this.listener instanceof RecordMessagingMessageListenerAdapter<K, V> adapter) {
943+
adapter.setCallbackForAsyncFailure(this::callbackForAsyncFailure);
944+
}
942945
}
943946
else {
944947
throw new IllegalArgumentException("Listener must be one of 'MessageListener', "
@@ -949,16 +952,6 @@ else if (listener instanceof MessageListener) {
949952
this.isConsumerAwareListener = listenerType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE)
950953
|| listenerType.equals(ListenerType.CONSUMER_AWARE);
951954
this.commonErrorHandler = determineCommonErrorHandler();
952-
// Setup async failure callback for suspend functions when CommonErrorHandler is explicitly configured
953-
if (getCommonErrorHandler() != null && this.listener != null) {
954-
MessageListener<?, ?> target = unwrapDelegateIfAny(this.listener);
955-
if (target instanceof RecordMessagingMessageListenerAdapter<?, ?>) {
956-
@SuppressWarnings("unchecked")
957-
RecordMessagingMessageListenerAdapter<K, V> adapter =
958-
(RecordMessagingMessageListenerAdapter<K, V>) target;
959-
adapter.setCallbackForAsyncFailure(this::callbackForAsyncFailure);
960-
}
961-
}
962955
Assert.state(!this.isBatchListener || !this.isRecordAck,
963956
"Cannot use AckMode.RECORD with a batch listener");
964957
if (this.containerProperties.getScheduler() != null) {

spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@
8585
import org.springframework.kafka.core.KafkaTemplate;
8686
import org.springframework.kafka.core.ProducerFactory;
8787
import org.springframework.kafka.listener.ContainerProperties;
88+
import org.springframework.kafka.listener.DefaultErrorHandler;
8889
import org.springframework.kafka.listener.MessageListenerContainer;
8990
import org.springframework.kafka.listener.RecordInterceptor;
9091
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
@@ -100,6 +101,7 @@
100101
import org.springframework.test.annotation.DirtiesContext;
101102
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
102103
import org.springframework.util.StringUtils;
104+
import org.springframework.util.backoff.FixedBackOff;
103105

104106
import static org.assertj.core.api.Assertions.assertThat;
105107
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
@@ -114,6 +116,7 @@
114116
* @author Soby Chacko
115117
* @author Francois Rosiere
116118
* @author Christian Fredriksson
119+
* @author Youngjoo Kim
117120
*
118121
* @since 3.0
119122
*/
@@ -694,6 +697,9 @@ ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerF
694697
container.getContainerProperties().setAsyncAcks(true);
695698
container.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
696699
}
700+
if (container.getListenerId().equals("obs6") || container.getListenerId().equals("obs7")) {
701+
container.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(0L, 0)));
702+
}
697703
if (container.getListenerId().equals("obs4")) {
698704
container.setRecordInterceptor(new RecordInterceptor<>() {
699705

spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinCoroutinesTests.kt

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ import java.util.concurrent.TimeUnit
5454
*
5555
* @author Wang ZhiYang
5656
* @author Artem Bilan
57+
* @author Youngjoo Kim
5758
*
5859
* @since 3.1
5960
*/
@@ -114,7 +115,6 @@ class EnableKafkaKotlinCoroutinesTests {
114115
fun `test suspend function with CommonErrorHandler`() {
115116
this.template.send("kotlinAsyncTestTopicCommonHandler", "fail")
116117
assertThat(this.config.commonHandlerLatch.await(10, TimeUnit.SECONDS)).isTrue()
117-
assertThat(this.config.commonHandlerInvoked).isTrue()
118118
}
119119

120120
@KafkaListener(id = "sendTopic", topics = ["kotlinAsyncTestTopic3"],
@@ -147,9 +147,6 @@ class EnableKafkaKotlinCoroutinesTests {
147147
@Volatile
148148
var batchError: Boolean = false
149149

150-
@Volatile
151-
var commonHandlerInvoked: Boolean = false
152-
153150
val latch1 = CountDownLatch(1)
154151

155152
val latch2 = CountDownLatch(1)
@@ -234,7 +231,6 @@ class EnableKafkaKotlinCoroutinesTests {
234231
@Bean
235232
fun commonErrorHandler(): DefaultErrorHandler {
236233
return DefaultErrorHandler { record, exception ->
237-
commonHandlerInvoked = true
238234
commonHandlerLatch.countDown()
239235
}
240236
}

0 commit comments

Comments
 (0)