Description
Describe the issue
Even if default-retryable
is set to false, the messages are retried. It is the same for retryable-exceptions
.
There also seems to be a connection with the transactions, as once the transaction-id-prefix
is empty, it works as intended.
To Reproduce
SpringBootApplication
@Configuration
@Slf4j
public class ConsumerConfig {
@Bean
public Consumer<String> consumeMessage() {
return s -> {
log.info("Consuming {}", s);
throw new IllegalArgumentException(s);
};
}
}
application.xml
cloud:
function:
definition: consumeMessage
stream:
kafka:
binder:
transaction:
transaction-id-prefix: transaction-
required-acks: all
configuration:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
bindings:
consumeMessage-in-0:
consumer:
enable-dlq: true
bindings:
consumeMessage-in-0:
group: my-group
destination: my-topic
consumer:
default-retryable: false
max-attempts: 5
back-off-initial-interval: 100
retryable-exceptions:
java.lang.UnsupportedOperationException: true
java.lang.IllegalArgumentException: false
There is a GitHub project with the minimal setup to reproduce the issue:
https://github.com/DidierLoiseau/kafka-transactions-and-retries/tree/main
When we submit a message on my-topic, it will retry it 5 times despite the default-retryable: false and java.lang.IllegalArgumentException: false.
We have asked a question on StackOverflow but (so far) have not gotten any answers: https://stackoverflow.com/questions/79309828/how-to-configure-retryable-exceptions-for-consumers-when-kafka-transactions-are
Version of the framework
Spring Boot 3.4.0 and Spring Cloud 2024.0.0
Additional context
Diving into the Spring Cloud Stream code, we found that KafkaMessageChannelBinder will set a RetryTemplate configured by buildRetryTemplate(properties) if there is no TransactionManager, but if there is one, it will configure an AfterRollbackProcessor instead, passing it only a BackOff without using the retryable exceptions configuration.