From 525a5c122006fa0924d526778776625e93a0c025 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Thu, 13 Mar 2025 09:57:17 -0400 Subject: [PATCH 1/2] Update actions/upload-artifact@v4 in CI PR build Signed-off-by: Soby Chacko --- .github/workflows/ci-pr.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci-pr.yml b/.github/workflows/ci-pr.yml index 932bc89929..37d1a2291d 100644 --- a/.github/workflows/ci-pr.yml +++ b/.github/workflows/ci-pr.yml @@ -51,7 +51,7 @@ jobs: - name: Capture Test Results if: failure() - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: test-results path: '*/target/surefire-reports/*.*' From de7ffd0874172eb81ee277715b410a11663c8311 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Thu, 6 Mar 2025 17:24:16 -0500 Subject: [PATCH 2/2] GH-3090: Ensure client factory customizers are applied before transaction manager creation Fixes: #3090 Issue: https://github.com/spring-cloud/spring-cloud-stream/issues/3090 Fixes issue where KafkaBinderConfiguration would add customizers after the transaction manager was already created. The KafkaMessageChannelBinder was initializing the transaction manager in the constructor before client factory customizers were added, which meant the customizers were never applied to the producer factory used by the transaction manager. - Moves transaction manager initialization from the constructor to the onInit() method - Leverages the existing InitializingBean lifecycle to ensure customizers are applied before the transaction manager is created - Adds comprehensive test coverage to verify the fix in the binder and via auto-configuration Signed-off-by: Soby Chacko --- .../kafka/KafkaMessageChannelBinder.java | 29 +-- ...nderConfigurationWithTransactionsTest.java | 109 +++++++++++ .../KafkaBinderTransactionCustomizerTest.java | 184 ++++++++++++++++++ .../binder/kafka/KafkaTransactionTests.java | 6 +- 4 files changed, 313 insertions(+), 15 deletions(-) create mode 100644 binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderConfigurationWithTransactionsTest.java create mode 100644 binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTransactionCustomizerTest.java diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java index f4d4ee74ff..e5adfe5031 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2024 the original author or authors. + * Copyright 2014-2025 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -222,9 +222,9 @@ public class KafkaMessageChannelBinder extends private final Map topicsInUse = new ConcurrentHashMap<>(); - private final KafkaTransactionManager transactionManager; + private KafkaTransactionManager transactionManager; - private final TransactionTemplate transactionTemplate; + private TransactionTemplate transactionTemplate; private KafkaBindingRebalanceListener rebalanceListener; @@ -278,23 +278,24 @@ public KafkaMessageChannelBinder( super(headersToMap(configurationProperties), provisioningProvider, containerCustomizer, sourceCustomizer); this.configurationProperties = configurationProperties; - String txId = configurationProperties.getTransaction().getTransactionIdPrefix(); - if (StringUtils.hasText(txId)) { - this.transactionManager = new KafkaTransactionManager<>(getProducerFactory( - txId, new ExtendedProducerProperties<>(configurationProperties - .getTransaction().getProducer().getExtension()), txId + ".producer", null)); - this.transactionTemplate = new TransactionTemplate(this.transactionManager); - } - else { - this.transactionManager = null; - this.transactionTemplate = null; - } this.rebalanceListener = rebalanceListener; this.dlqPartitionFunction = dlqPartitionFunction; this.dlqDestinationResolver = dlqDestinationResolver; this.kafkaAdmin = new KafkaAdmin(new HashMap<>(provisioningProvider.getAdminClientProperties())); } + @Override + protected void onInit() throws Exception { + super.onInit(); + String txId = this.configurationProperties.getTransaction().getTransactionIdPrefix(); + if (StringUtils.hasText(txId)) { + this.transactionManager = new KafkaTransactionManager<>(getProducerFactory( + txId, new ExtendedProducerProperties<>(configurationProperties + .getTransaction().getProducer().getExtension()), txId + ".producer", null)); + this.transactionTemplate = new TransactionTemplate(this.transactionManager); + } + } + private static String[] headersToMap( KafkaBinderConfigurationProperties configurationProperties) { String[] headersToMap; diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderConfigurationWithTransactionsTest.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderConfigurationWithTransactionsTest.java new file mode 100644 index 0000000000..c06b8a1ba2 --- /dev/null +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderConfigurationWithTransactionsTest.java @@ -0,0 +1,109 @@ +/* + * Copyright 2025-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.kafka; + +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.junit.jupiter.api.Test; + +import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.runner.ApplicationContextRunner; +import org.springframework.cloud.stream.binder.kafka.config.ClientFactoryCustomizer; +import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.transaction.KafkaTransactionManager; +import org.springframework.util.ReflectionUtils; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Soby Chacko + */ +@SpringBootTest(classes = { KafkaBinderConfiguration.class }) +public class KafkaBinderConfigurationWithTransactionsTest { + + private final ApplicationContextRunner contextRunner = new ApplicationContextRunner() + .withUserConfiguration(KafkaBinderConfiguration.class, KafkaAutoConfiguration.class) + .withPropertyValues( + "spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix=test-tx-", + "spring.kafka.bootstrap-servers=localhost:9092"); + + @Test + public void clientFactoryCustomizersAppliedToTransactionManager() { + contextRunner.withUserConfiguration(TransactionClientFactoryCustomizerConfig.class) + .run(context -> { + assertThat(context).hasSingleBean(KafkaMessageChannelBinder.class); + KafkaMessageChannelBinder kafkaMessageChannelBinder = + context.getBean(KafkaMessageChannelBinder.class); + + Map customizers = + context.getBeansOfType(ClientFactoryCustomizer.class); + assertThat(customizers).hasSize(1); + + Field transactionManagerField = ReflectionUtils.findField( + KafkaMessageChannelBinder.class, "transactionManager", + KafkaTransactionManager.class); + assertThat(transactionManagerField).isNotNull(); + ReflectionUtils.makeAccessible(transactionManagerField); + KafkaTransactionManager transactionManager = + (KafkaTransactionManager) ReflectionUtils.getField( + transactionManagerField, kafkaMessageChannelBinder); + + assertThat(transactionManager).isNotNull(); + + ProducerFactory producerFactory = transactionManager.getProducerFactory(); + assertThat(producerFactory).isNotNull(); + + // Verify customizer was applied - check if our flag was set + TransactionClientFactoryCustomizerConfig config = + context.getBean(TransactionClientFactoryCustomizerConfig.class); + assertThat(config.wasCustomizerApplied()).isTrue(); + assertThat(config.getCustomizedFactories()).contains(producerFactory); + }); + } + + @Configuration + static class TransactionClientFactoryCustomizerConfig { + private final List> customizedFactories = new ArrayList<>(); + private boolean customizerApplied = false; + + @Bean + ClientFactoryCustomizer testClientFactoryCustomizer() { + return new ClientFactoryCustomizer() { + @Override + public void configure(ProducerFactory pf) { + customizerApplied = true; + customizedFactories.add(pf); + } + }; + } + + public boolean wasCustomizerApplied() { + return customizerApplied; + } + + public List> getCustomizedFactories() { + return customizedFactories; + } + } +} diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTransactionCustomizerTest.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTransactionCustomizerTest.java new file mode 100644 index 0000000000..e99e688299 --- /dev/null +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTransactionCustomizerTest.java @@ -0,0 +1,184 @@ +/* + * Copyright 2025-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.kafka; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.cloud.stream.binder.ExtendedProducerProperties; +import org.springframework.cloud.stream.binder.TestUtils; +import org.springframework.cloud.stream.binder.kafka.config.ClientFactoryCustomizer; +import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties; +import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner; +import org.springframework.context.support.GenericApplicationContext; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.condition.EmbeddedKafkaCondition; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.transaction.KafkaTransactionManager; +import org.springframework.retry.support.RetryTemplate; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + +/** + * @author Soby Chacko + */ +@EmbeddedKafka(count = 1, controlledShutdown = true, brokerProperties = {"transaction.state.log.replication.factor=1", + "transaction.state.log.min.isr=1"}) +class KafkaBinderTransactionCustomizerTest { + + private static EmbeddedKafkaBroker embeddedKafka; + + @BeforeAll + public static void setup() { + embeddedKafka = EmbeddedKafkaCondition.getBroker(); + } + + @SuppressWarnings("unchecked") + @Test + void clientFactoryCustomizerAppliedBeforeTransactionManager() throws Exception { + KafkaProperties kafkaProperties = new TestKafkaProperties(); + kafkaProperties.setBootstrapServers(Collections + .singletonList(embeddedKafka.getBrokersAsString())); + + KafkaBinderConfigurationProperties configurationProperties = new KafkaBinderConfigurationProperties( + kafkaProperties, mock(ObjectProvider.class)); + configurationProperties.getTransaction().setTransactionIdPrefix("custom-tx-"); + + KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner( + configurationProperties, kafkaProperties, prop -> { + }); + provisioningProvider.setMetadataRetryOperations(new RetryTemplate()); + + // Create a tracking list for customized factories + List> customizedFactories = new ArrayList<>(); + + // Create a customizer that we'll register after the binder is created + ClientFactoryCustomizer customizer = new ClientFactoryCustomizer() { + @Override + public void configure(ProducerFactory pf) { + customizedFactories.add(pf); + } + }; + + KafkaMessageChannelBinder binder = spy(new KafkaMessageChannelBinder( + configurationProperties, provisioningProvider)); + + GenericApplicationContext applicationContext = new GenericApplicationContext(); + applicationContext.refresh(); + binder.setApplicationContext(applicationContext); + + // Add the customizer AFTER binder creation but BEFORE afterPropertiesSet + binder.addClientFactoryCustomizer(customizer); + + // Now initialize the binder (this triggers onInit) + binder.afterPropertiesSet(); + + // Verify KafkaMessageChannelBinder.getProducerFactory was called from onInit + verify(binder).getProducerFactory( + eq("custom-tx-"), + any(ExtendedProducerProperties.class), + eq("custom-tx-.producer"), + isNull()); + + // Verify customizer was applied + assertThat(customizedFactories).isNotEmpty(); + + // Verify that the producer factory from the transaction manager is in our list of customized factories + KafkaTransactionManager txManager = (KafkaTransactionManager) + TestUtils.getPropertyValue(binder, "transactionManager"); + assertThat(txManager).isNotNull(); + ProducerFactory producerFactory = txManager.getProducerFactory(); + // This verifies that the same producer factory that was customized is used for the transaction manager + assertThat(customizedFactories).contains(producerFactory); + } + + @SuppressWarnings("unchecked") + @Test + void multipleCustomizersAppliedInOrder() throws Exception { + KafkaProperties kafkaProperties = new TestKafkaProperties(); + kafkaProperties.setBootstrapServers(Collections + .singletonList(embeddedKafka.getBrokersAsString())); + + KafkaBinderConfigurationProperties configurationProperties = new KafkaBinderConfigurationProperties( + kafkaProperties, mock(ObjectProvider.class)); + configurationProperties.getTransaction().setTransactionIdPrefix("multi-tx-"); + + KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner( + configurationProperties, kafkaProperties, prop -> { + }); + provisioningProvider.setMetadataRetryOperations(new RetryTemplate()); + + // Track order of customizers and customized factories + List customizationOrder = new ArrayList<>(); + List> customizedFactories = new ArrayList<>(); + + KafkaMessageChannelBinder binder = spy(new KafkaMessageChannelBinder( + configurationProperties, provisioningProvider)); + + GenericApplicationContext applicationContext = new GenericApplicationContext(); + applicationContext.refresh(); + binder.setApplicationContext(applicationContext); + + // Add multiple customizers + binder.addClientFactoryCustomizer(new ClientFactoryCustomizer() { + @Override + public void configure(ProducerFactory pf) { + customizationOrder.add("customizer1"); + customizedFactories.add(pf); + } + }); + + binder.addClientFactoryCustomizer(new ClientFactoryCustomizer() { + @Override + public void configure(ProducerFactory pf) { + customizationOrder.add("customizer2"); + } + }); + + binder.addClientFactoryCustomizer(new ClientFactoryCustomizer() { + @Override + public void configure(ProducerFactory pf) { + customizationOrder.add("customizer3"); + } + }); + + binder.afterPropertiesSet(); + + assertThat(customizationOrder).containsExactly("customizer1", "customizer2", "customizer3"); + + KafkaTransactionManager txManager = (KafkaTransactionManager) + TestUtils.getPropertyValue(binder, "transactionManager"); + assertThat(txManager).isNotNull(); + ProducerFactory producerFactory = txManager.getProducerFactory(); + // Verify that the producer factory used in transaction manager is one that was customized + assertThat(customizedFactories).contains(producerFactory); + } + +} diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaTransactionTests.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaTransactionTests.java index 25a65a10b8..1c018396f3 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaTransactionTests.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaTransactionTests.java @@ -71,7 +71,7 @@ public static void setup() { @SuppressWarnings({ "rawtypes", "unchecked" }) @Test - void producerRunsInTx() { + void producerRunsInTx() throws Exception { KafkaProperties kafkaProperties = new TestKafkaProperties(); kafkaProperties.setBootstrapServers(Collections .singletonList(embeddedKafka.getBrokersAsString())); @@ -111,6 +111,10 @@ protected DefaultKafkaProducerFactory getProducerFactory( GenericApplicationContext applicationContext = new GenericApplicationContext(); applicationContext.refresh(); binder.setApplicationContext(applicationContext); + + // Important: Initialize the binder to trigger onInit() + binder.afterPropertiesSet(); + DirectChannel channel = new DirectChannel(); KafkaProducerProperties extension = new KafkaProducerProperties(); ExtendedProducerProperties properties = new ExtendedProducerProperties<>(