diff --git a/.github/workflows/ci-pr.yml b/.github/workflows/ci-pr.yml index 932bc89929..37d1a2291d 100644 --- a/.github/workflows/ci-pr.yml +++ b/.github/workflows/ci-pr.yml @@ -51,7 +51,7 @@ jobs: - name: Capture Test Results if: failure() - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: test-results path: '*/target/surefire-reports/*.*' diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java index f4d4ee74ff..e5adfe5031 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2024 the original author or authors. + * Copyright 2014-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -222,9 +222,9 @@ public class KafkaMessageChannelBinder extends private final Map topicsInUse = new ConcurrentHashMap<>(); - private final KafkaTransactionManager transactionManager; + private KafkaTransactionManager transactionManager; - private final TransactionTemplate transactionTemplate; + private TransactionTemplate transactionTemplate; private KafkaBindingRebalanceListener rebalanceListener; @@ -278,23 +278,24 @@ public KafkaMessageChannelBinder( super(headersToMap(configurationProperties), provisioningProvider, containerCustomizer, sourceCustomizer); this.configurationProperties = configurationProperties; - String txId = configurationProperties.getTransaction().getTransactionIdPrefix(); - if (StringUtils.hasText(txId)) { - this.transactionManager = new KafkaTransactionManager<>(getProducerFactory( - txId, new ExtendedProducerProperties<>(configurationProperties - .getTransaction().getProducer().getExtension()), txId + ".producer", null)); - this.transactionTemplate = new TransactionTemplate(this.transactionManager); - } - else { - this.transactionManager = null; - this.transactionTemplate = null; - } this.rebalanceListener = rebalanceListener; this.dlqPartitionFunction = dlqPartitionFunction; this.dlqDestinationResolver = dlqDestinationResolver; this.kafkaAdmin = new KafkaAdmin(new HashMap<>(provisioningProvider.getAdminClientProperties())); } + @Override + protected void onInit() throws Exception { + super.onInit(); + String txId = this.configurationProperties.getTransaction().getTransactionIdPrefix(); + if (StringUtils.hasText(txId)) { + this.transactionManager = new KafkaTransactionManager<>(getProducerFactory( + txId, new ExtendedProducerProperties<>(configurationProperties + .getTransaction().getProducer().getExtension()), txId + ".producer", null)); + this.transactionTemplate = new TransactionTemplate(this.transactionManager); + } + } + private static String[] headersToMap( KafkaBinderConfigurationProperties configurationProperties) { String[] headersToMap; diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderConfigurationWithTransactionsTest.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderConfigurationWithTransactionsTest.java new file mode 100644 index 0000000000..c06b8a1ba2 --- /dev/null +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderConfigurationWithTransactionsTest.java @@ -0,0 +1,109 @@ +/* + * Copyright 2025-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.kafka; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.junit.jupiter.api.Test; + +import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.runner.ApplicationContextRunner; +import org.springframework.cloud.stream.binder.kafka.config.ClientFactoryCustomizer; +import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.transaction.KafkaTransactionManager; +import org.springframework.util.ReflectionUtils; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Soby Chacko + */ +@SpringBootTest(classes = { KafkaBinderConfiguration.class }) +public class KafkaBinderConfigurationWithTransactionsTest { + + private final ApplicationContextRunner contextRunner = new ApplicationContextRunner() + .withUserConfiguration(KafkaBinderConfiguration.class, KafkaAutoConfiguration.class) + .withPropertyValues( + "spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix=test-tx-", + "spring.kafka.bootstrap-servers=localhost:9092"); + + @Test + public void clientFactoryCustomizersAppliedToTransactionManager() { + contextRunner.withUserConfiguration(TransactionClientFactoryCustomizerConfig.class) + .run(context -> { + assertThat(context).hasSingleBean(KafkaMessageChannelBinder.class); + KafkaMessageChannelBinder kafkaMessageChannelBinder = + context.getBean(KafkaMessageChannelBinder.class); + + Map customizers = + context.getBeansOfType(ClientFactoryCustomizer.class); + assertThat(customizers).hasSize(1); + + Field transactionManagerField = ReflectionUtils.findField( + KafkaMessageChannelBinder.class, "transactionManager", + KafkaTransactionManager.class); + assertThat(transactionManagerField).isNotNull(); + ReflectionUtils.makeAccessible(transactionManagerField); + KafkaTransactionManager transactionManager = + (KafkaTransactionManager) ReflectionUtils.getField( + transactionManagerField, kafkaMessageChannelBinder); + + assertThat(transactionManager).isNotNull(); + + ProducerFactory producerFactory = transactionManager.getProducerFactory(); + assertThat(producerFactory).isNotNull(); + + // Verify customizer was applied - check if our flag was set + TransactionClientFactoryCustomizerConfig config = + context.getBean(TransactionClientFactoryCustomizerConfig.class); + assertThat(config.wasCustomizerApplied()).isTrue(); + assertThat(config.getCustomizedFactories()).contains(producerFactory); + }); + } + + @Configuration + static class TransactionClientFactoryCustomizerConfig { + private final List> customizedFactories = new ArrayList<>(); + private boolean customizerApplied = false; + + @Bean + ClientFactoryCustomizer testClientFactoryCustomizer() { + return new ClientFactoryCustomizer() { + @Override + public void configure(ProducerFactory pf) { + customizerApplied = true; + customizedFactories.add(pf); + } + }; + } + + public boolean wasCustomizerApplied() { + return customizerApplied; + } + + public List> getCustomizedFactories() { + return customizedFactories; + } + } +} diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTransactionCustomizerTest.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTransactionCustomizerTest.java new file mode 100644 index 0000000000..e99e688299 --- /dev/null +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTransactionCustomizerTest.java @@ -0,0 +1,184 @@ +/* + * Copyright 2025-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.kafka; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.cloud.stream.binder.ExtendedProducerProperties; +import org.springframework.cloud.stream.binder.TestUtils; +import org.springframework.cloud.stream.binder.kafka.config.ClientFactoryCustomizer; +import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties; +import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner; +import org.springframework.context.support.GenericApplicationContext; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.condition.EmbeddedKafkaCondition; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.transaction.KafkaTransactionManager; +import org.springframework.retry.support.RetryTemplate; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + +/** + * @author Soby Chacko + */ +@EmbeddedKafka(count = 1, controlledShutdown = true, brokerProperties = {"transaction.state.log.replication.factor=1", + "transaction.state.log.min.isr=1"}) +class KafkaBinderTransactionCustomizerTest { + + private static EmbeddedKafkaBroker embeddedKafka; + + @BeforeAll + public static void setup() { + embeddedKafka = EmbeddedKafkaCondition.getBroker(); + } + + @SuppressWarnings("unchecked") + @Test + void clientFactoryCustomizerAppliedBeforeTransactionManager() throws Exception { + KafkaProperties kafkaProperties = new TestKafkaProperties(); + kafkaProperties.setBootstrapServers(Collections + .singletonList(embeddedKafka.getBrokersAsString())); + + KafkaBinderConfigurationProperties configurationProperties = new KafkaBinderConfigurationProperties( + kafkaProperties, mock(ObjectProvider.class)); + configurationProperties.getTransaction().setTransactionIdPrefix("custom-tx-"); + + KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner( + configurationProperties, kafkaProperties, prop -> { + }); + provisioningProvider.setMetadataRetryOperations(new RetryTemplate()); + + // Create a tracking list for customized factories + List> customizedFactories = new ArrayList<>(); + + // Create a customizer that we'll register after the binder is created + ClientFactoryCustomizer customizer = new ClientFactoryCustomizer() { + @Override + public void configure(ProducerFactory pf) { + customizedFactories.add(pf); + } + }; + + KafkaMessageChannelBinder binder = spy(new KafkaMessageChannelBinder( + configurationProperties, provisioningProvider)); + + GenericApplicationContext applicationContext = new GenericApplicationContext(); + applicationContext.refresh(); + binder.setApplicationContext(applicationContext); + + // Add the customizer AFTER binder creation but BEFORE afterPropertiesSet + binder.addClientFactoryCustomizer(customizer); + + // Now initialize the binder (this triggers onInit) + binder.afterPropertiesSet(); + + // Verify KafkaMessageChannelBinder.getProducerFactory was called from onInit + verify(binder).getProducerFactory( + eq("custom-tx-"), + any(ExtendedProducerProperties.class), + eq("custom-tx-.producer"), + isNull()); + + // Verify customizer was applied + assertThat(customizedFactories).isNotEmpty(); + + // Verify that the producer factory from the transaction manager is in our list of customized factories + KafkaTransactionManager txManager = (KafkaTransactionManager) + TestUtils.getPropertyValue(binder, "transactionManager"); + assertThat(txManager).isNotNull(); + ProducerFactory producerFactory = txManager.getProducerFactory(); + // This verifies that the same producer factory that was customized is used for the transaction manager + assertThat(customizedFactories).contains(producerFactory); + } + + @SuppressWarnings("unchecked") + @Test + void multipleCustomizersAppliedInOrder() throws Exception { + KafkaProperties kafkaProperties = new TestKafkaProperties(); + kafkaProperties.setBootstrapServers(Collections + .singletonList(embeddedKafka.getBrokersAsString())); + + KafkaBinderConfigurationProperties configurationProperties = new KafkaBinderConfigurationProperties( + kafkaProperties, mock(ObjectProvider.class)); + configurationProperties.getTransaction().setTransactionIdPrefix("multi-tx-"); + + KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner( + configurationProperties, kafkaProperties, prop -> { + }); + provisioningProvider.setMetadataRetryOperations(new RetryTemplate()); + + // Track order of customizers and customized factories + List customizationOrder = new ArrayList<>(); + List> customizedFactories = new ArrayList<>(); + + KafkaMessageChannelBinder binder = spy(new KafkaMessageChannelBinder( + configurationProperties, provisioningProvider)); + + GenericApplicationContext applicationContext = new GenericApplicationContext(); + applicationContext.refresh(); + binder.setApplicationContext(applicationContext); + + // Add multiple customizers + binder.addClientFactoryCustomizer(new ClientFactoryCustomizer() { + @Override + public void configure(ProducerFactory pf) { + customizationOrder.add("customizer1"); + customizedFactories.add(pf); + } + }); + + binder.addClientFactoryCustomizer(new ClientFactoryCustomizer() { + @Override + public void configure(ProducerFactory pf) { + customizationOrder.add("customizer2"); + } + }); + + binder.addClientFactoryCustomizer(new ClientFactoryCustomizer() { + @Override + public void configure(ProducerFactory pf) { + customizationOrder.add("customizer3"); + } + }); + + binder.afterPropertiesSet(); + + assertThat(customizationOrder).containsExactly("customizer1", "customizer2", "customizer3"); + + KafkaTransactionManager txManager = (KafkaTransactionManager) + TestUtils.getPropertyValue(binder, "transactionManager"); + assertThat(txManager).isNotNull(); + ProducerFactory producerFactory = txManager.getProducerFactory(); + // Verify that the producer factory used in transaction manager is one that was customized + assertThat(customizedFactories).contains(producerFactory); + } + +} diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaTransactionTests.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaTransactionTests.java index 25a65a10b8..1c018396f3 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaTransactionTests.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaTransactionTests.java @@ -71,7 +71,7 @@ public static void setup() { @SuppressWarnings({ "rawtypes", "unchecked" }) @Test - void producerRunsInTx() { + void producerRunsInTx() throws Exception { KafkaProperties kafkaProperties = new TestKafkaProperties(); kafkaProperties.setBootstrapServers(Collections .singletonList(embeddedKafka.getBrokersAsString())); @@ -111,6 +111,10 @@ protected DefaultKafkaProducerFactory getProducerFactory( GenericApplicationContext applicationContext = new GenericApplicationContext(); applicationContext.refresh(); binder.setApplicationContext(applicationContext); + + // Important: Initialize the binder to trigger onInit() + binder.afterPropertiesSet(); + DirectChannel channel = new DirectChannel(); KafkaProducerProperties extension = new KafkaProducerProperties(); ExtendedProducerProperties properties = new ExtendedProducerProperties<>(