Skip to content

Commit d6cc07d

Browse files
sobychackoolegz
authored andcommitted
GH-3090: Ensure client factory customizers are applied before transaction manager creation
Fixes: #3090 Issue: #3090 Fixes issue where KafkaBinderConfiguration would add customizers after the transaction manager was already created. The KafkaMessageChannelBinder was initializing the transaction manager in the constructor before client factory customizers were added, which meant the customizers were never applied to the producer factory used by the transaction manager. - Moves transaction manager initialization from the constructor to the onInit() method - Leverages the existing InitializingBean lifecycle to ensure customizers are applied before the transaction manager is created - Adds comprehensive test coverage to verify the fix in the binder and via auto-configuration Signed-off-by: Soby Chacko <[email protected]> Resolves #3091
1 parent 525a5c1 commit d6cc07d

File tree

4 files changed

+313
-15
lines changed

4 files changed

+313
-15
lines changed

binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2024 the original author or authors.
2+
* Copyright 2014-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -222,9 +222,9 @@ public class KafkaMessageChannelBinder extends
222222

223223
private final Map<String, TopicInformation> topicsInUse = new ConcurrentHashMap<>();
224224

225-
private final KafkaTransactionManager<byte[], byte[]> transactionManager;
225+
private KafkaTransactionManager<byte[], byte[]> transactionManager;
226226

227-
private final TransactionTemplate transactionTemplate;
227+
private TransactionTemplate transactionTemplate;
228228

229229
private KafkaBindingRebalanceListener rebalanceListener;
230230

@@ -278,23 +278,24 @@ public KafkaMessageChannelBinder(
278278
super(headersToMap(configurationProperties), provisioningProvider,
279279
containerCustomizer, sourceCustomizer);
280280
this.configurationProperties = configurationProperties;
281-
String txId = configurationProperties.getTransaction().getTransactionIdPrefix();
282-
if (StringUtils.hasText(txId)) {
283-
this.transactionManager = new KafkaTransactionManager<>(getProducerFactory(
284-
txId, new ExtendedProducerProperties<>(configurationProperties
285-
.getTransaction().getProducer().getExtension()), txId + ".producer", null));
286-
this.transactionTemplate = new TransactionTemplate(this.transactionManager);
287-
}
288-
else {
289-
this.transactionManager = null;
290-
this.transactionTemplate = null;
291-
}
292281
this.rebalanceListener = rebalanceListener;
293282
this.dlqPartitionFunction = dlqPartitionFunction;
294283
this.dlqDestinationResolver = dlqDestinationResolver;
295284
this.kafkaAdmin = new KafkaAdmin(new HashMap<>(provisioningProvider.getAdminClientProperties()));
296285
}
297286

287+
@Override
288+
protected void onInit() throws Exception {
289+
super.onInit();
290+
String txId = this.configurationProperties.getTransaction().getTransactionIdPrefix();
291+
if (StringUtils.hasText(txId)) {
292+
this.transactionManager = new KafkaTransactionManager<>(getProducerFactory(
293+
txId, new ExtendedProducerProperties<>(configurationProperties
294+
.getTransaction().getProducer().getExtension()), txId + ".producer", null));
295+
this.transactionTemplate = new TransactionTemplate(this.transactionManager);
296+
}
297+
}
298+
298299
private static String[] headersToMap(
299300
KafkaBinderConfigurationProperties configurationProperties) {
300301
String[] headersToMap;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Copyright 2025-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.stream.binder.kafka;
18+
19+
import java.lang.reflect.Field;
20+
import java.util.ArrayList;
21+
import java.util.List;
22+
import java.util.Map;
23+
24+
import org.junit.jupiter.api.Test;
25+
26+
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
27+
import org.springframework.boot.test.context.SpringBootTest;
28+
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
29+
import org.springframework.cloud.stream.binder.kafka.config.ClientFactoryCustomizer;
30+
import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration;
31+
import org.springframework.context.annotation.Bean;
32+
import org.springframework.context.annotation.Configuration;
33+
import org.springframework.kafka.core.ProducerFactory;
34+
import org.springframework.kafka.transaction.KafkaTransactionManager;
35+
import org.springframework.util.ReflectionUtils;
36+
37+
import static org.assertj.core.api.Assertions.assertThat;
38+
39+
/**
40+
* @author Soby Chacko
41+
*/
42+
@SpringBootTest(classes = { KafkaBinderConfiguration.class })
43+
public class KafkaBinderConfigurationWithTransactionsTest {
44+
45+
private final ApplicationContextRunner contextRunner = new ApplicationContextRunner()
46+
.withUserConfiguration(KafkaBinderConfiguration.class, KafkaAutoConfiguration.class)
47+
.withPropertyValues(
48+
"spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix=test-tx-",
49+
"spring.kafka.bootstrap-servers=localhost:9092");
50+
51+
@Test
52+
public void clientFactoryCustomizersAppliedToTransactionManager() {
53+
contextRunner.withUserConfiguration(TransactionClientFactoryCustomizerConfig.class)
54+
.run(context -> {
55+
assertThat(context).hasSingleBean(KafkaMessageChannelBinder.class);
56+
KafkaMessageChannelBinder kafkaMessageChannelBinder =
57+
context.getBean(KafkaMessageChannelBinder.class);
58+
59+
Map<String, ClientFactoryCustomizer> customizers =
60+
context.getBeansOfType(ClientFactoryCustomizer.class);
61+
assertThat(customizers).hasSize(1);
62+
63+
Field transactionManagerField = ReflectionUtils.findField(
64+
KafkaMessageChannelBinder.class, "transactionManager",
65+
KafkaTransactionManager.class);
66+
assertThat(transactionManagerField).isNotNull();
67+
ReflectionUtils.makeAccessible(transactionManagerField);
68+
KafkaTransactionManager<?, ?> transactionManager =
69+
(KafkaTransactionManager<?, ?>) ReflectionUtils.getField(
70+
transactionManagerField, kafkaMessageChannelBinder);
71+
72+
assertThat(transactionManager).isNotNull();
73+
74+
ProducerFactory<?, ?> producerFactory = transactionManager.getProducerFactory();
75+
assertThat(producerFactory).isNotNull();
76+
77+
// Verify customizer was applied - check if our flag was set
78+
TransactionClientFactoryCustomizerConfig config =
79+
context.getBean(TransactionClientFactoryCustomizerConfig.class);
80+
assertThat(config.wasCustomizerApplied()).isTrue();
81+
assertThat(config.getCustomizedFactories()).contains(producerFactory);
82+
});
83+
}
84+
85+
@Configuration
86+
static class TransactionClientFactoryCustomizerConfig {
87+
private final List<ProducerFactory<?, ?>> customizedFactories = new ArrayList<>();
88+
private boolean customizerApplied = false;
89+
90+
@Bean
91+
ClientFactoryCustomizer testClientFactoryCustomizer() {
92+
return new ClientFactoryCustomizer() {
93+
@Override
94+
public void configure(ProducerFactory<?, ?> pf) {
95+
customizerApplied = true;
96+
customizedFactories.add(pf);
97+
}
98+
};
99+
}
100+
101+
public boolean wasCustomizerApplied() {
102+
return customizerApplied;
103+
}
104+
105+
public List<ProducerFactory<?, ?>> getCustomizedFactories() {
106+
return customizedFactories;
107+
}
108+
}
109+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
/*
2+
* Copyright 2025-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.stream.binder.kafka;
18+
19+
import java.util.ArrayList;
20+
import java.util.Collections;
21+
import java.util.List;
22+
23+
import org.junit.jupiter.api.BeforeAll;
24+
import org.junit.jupiter.api.Test;
25+
26+
import org.springframework.beans.factory.ObjectProvider;
27+
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
28+
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
29+
import org.springframework.cloud.stream.binder.TestUtils;
30+
import org.springframework.cloud.stream.binder.kafka.config.ClientFactoryCustomizer;
31+
import org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties;
32+
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
33+
import org.springframework.context.support.GenericApplicationContext;
34+
import org.springframework.kafka.core.ProducerFactory;
35+
import org.springframework.kafka.test.EmbeddedKafkaBroker;
36+
import org.springframework.kafka.test.condition.EmbeddedKafkaCondition;
37+
import org.springframework.kafka.test.context.EmbeddedKafka;
38+
import org.springframework.kafka.transaction.KafkaTransactionManager;
39+
import org.springframework.retry.support.RetryTemplate;
40+
41+
import static org.assertj.core.api.Assertions.assertThat;
42+
import static org.mockito.ArgumentMatchers.any;
43+
import static org.mockito.ArgumentMatchers.eq;
44+
import static org.mockito.ArgumentMatchers.isNull;
45+
import static org.mockito.Mockito.mock;
46+
import static org.mockito.Mockito.spy;
47+
import static org.mockito.Mockito.verify;
48+
49+
/**
50+
* @author Soby Chacko
51+
*/
52+
@EmbeddedKafka(count = 1, controlledShutdown = true, brokerProperties = {"transaction.state.log.replication.factor=1",
53+
"transaction.state.log.min.isr=1"})
54+
class KafkaBinderTransactionCustomizerTest {
55+
56+
private static EmbeddedKafkaBroker embeddedKafka;
57+
58+
@BeforeAll
59+
public static void setup() {
60+
embeddedKafka = EmbeddedKafkaCondition.getBroker();
61+
}
62+
63+
@SuppressWarnings("unchecked")
64+
@Test
65+
void clientFactoryCustomizerAppliedBeforeTransactionManager() throws Exception {
66+
KafkaProperties kafkaProperties = new TestKafkaProperties();
67+
kafkaProperties.setBootstrapServers(Collections
68+
.singletonList(embeddedKafka.getBrokersAsString()));
69+
70+
KafkaBinderConfigurationProperties configurationProperties = new KafkaBinderConfigurationProperties(
71+
kafkaProperties, mock(ObjectProvider.class));
72+
configurationProperties.getTransaction().setTransactionIdPrefix("custom-tx-");
73+
74+
KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner(
75+
configurationProperties, kafkaProperties, prop -> {
76+
});
77+
provisioningProvider.setMetadataRetryOperations(new RetryTemplate());
78+
79+
// Create a tracking list for customized factories
80+
List<ProducerFactory<?, ?>> customizedFactories = new ArrayList<>();
81+
82+
// Create a customizer that we'll register after the binder is created
83+
ClientFactoryCustomizer customizer = new ClientFactoryCustomizer() {
84+
@Override
85+
public void configure(ProducerFactory<?, ?> pf) {
86+
customizedFactories.add(pf);
87+
}
88+
};
89+
90+
KafkaMessageChannelBinder binder = spy(new KafkaMessageChannelBinder(
91+
configurationProperties, provisioningProvider));
92+
93+
GenericApplicationContext applicationContext = new GenericApplicationContext();
94+
applicationContext.refresh();
95+
binder.setApplicationContext(applicationContext);
96+
97+
// Add the customizer AFTER binder creation but BEFORE afterPropertiesSet
98+
binder.addClientFactoryCustomizer(customizer);
99+
100+
// Now initialize the binder (this triggers onInit)
101+
binder.afterPropertiesSet();
102+
103+
// Verify KafkaMessageChannelBinder.getProducerFactory was called from onInit
104+
verify(binder).getProducerFactory(
105+
eq("custom-tx-"),
106+
any(ExtendedProducerProperties.class),
107+
eq("custom-tx-.producer"),
108+
isNull());
109+
110+
// Verify customizer was applied
111+
assertThat(customizedFactories).isNotEmpty();
112+
113+
// Verify that the producer factory from the transaction manager is in our list of customized factories
114+
KafkaTransactionManager<?, ?> txManager = (KafkaTransactionManager<?, ?>)
115+
TestUtils.getPropertyValue(binder, "transactionManager");
116+
assertThat(txManager).isNotNull();
117+
ProducerFactory<?, ?> producerFactory = txManager.getProducerFactory();
118+
// This verifies that the same producer factory that was customized is used for the transaction manager
119+
assertThat(customizedFactories).contains(producerFactory);
120+
}
121+
122+
@SuppressWarnings("unchecked")
123+
@Test
124+
void multipleCustomizersAppliedInOrder() throws Exception {
125+
KafkaProperties kafkaProperties = new TestKafkaProperties();
126+
kafkaProperties.setBootstrapServers(Collections
127+
.singletonList(embeddedKafka.getBrokersAsString()));
128+
129+
KafkaBinderConfigurationProperties configurationProperties = new KafkaBinderConfigurationProperties(
130+
kafkaProperties, mock(ObjectProvider.class));
131+
configurationProperties.getTransaction().setTransactionIdPrefix("multi-tx-");
132+
133+
KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner(
134+
configurationProperties, kafkaProperties, prop -> {
135+
});
136+
provisioningProvider.setMetadataRetryOperations(new RetryTemplate());
137+
138+
// Track order of customizers and customized factories
139+
List<String> customizationOrder = new ArrayList<>();
140+
List<ProducerFactory<?, ?>> customizedFactories = new ArrayList<>();
141+
142+
KafkaMessageChannelBinder binder = spy(new KafkaMessageChannelBinder(
143+
configurationProperties, provisioningProvider));
144+
145+
GenericApplicationContext applicationContext = new GenericApplicationContext();
146+
applicationContext.refresh();
147+
binder.setApplicationContext(applicationContext);
148+
149+
// Add multiple customizers
150+
binder.addClientFactoryCustomizer(new ClientFactoryCustomizer() {
151+
@Override
152+
public void configure(ProducerFactory<?, ?> pf) {
153+
customizationOrder.add("customizer1");
154+
customizedFactories.add(pf);
155+
}
156+
});
157+
158+
binder.addClientFactoryCustomizer(new ClientFactoryCustomizer() {
159+
@Override
160+
public void configure(ProducerFactory<?, ?> pf) {
161+
customizationOrder.add("customizer2");
162+
}
163+
});
164+
165+
binder.addClientFactoryCustomizer(new ClientFactoryCustomizer() {
166+
@Override
167+
public void configure(ProducerFactory<?, ?> pf) {
168+
customizationOrder.add("customizer3");
169+
}
170+
});
171+
172+
binder.afterPropertiesSet();
173+
174+
assertThat(customizationOrder).containsExactly("customizer1", "customizer2", "customizer3");
175+
176+
KafkaTransactionManager<?, ?> txManager = (KafkaTransactionManager<?, ?>)
177+
TestUtils.getPropertyValue(binder, "transactionManager");
178+
assertThat(txManager).isNotNull();
179+
ProducerFactory<?, ?> producerFactory = txManager.getProducerFactory();
180+
// Verify that the producer factory used in transaction manager is one that was customized
181+
assertThat(customizedFactories).contains(producerFactory);
182+
}
183+
184+
}

binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaTransactionTests.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public static void setup() {
7171

7272
@SuppressWarnings({ "rawtypes", "unchecked" })
7373
@Test
74-
void producerRunsInTx() {
74+
void producerRunsInTx() throws Exception {
7575
KafkaProperties kafkaProperties = new TestKafkaProperties();
7676
kafkaProperties.setBootstrapServers(Collections
7777
.singletonList(embeddedKafka.getBrokersAsString()));
@@ -111,6 +111,10 @@ protected DefaultKafkaProducerFactory<byte[], byte[]> getProducerFactory(
111111
GenericApplicationContext applicationContext = new GenericApplicationContext();
112112
applicationContext.refresh();
113113
binder.setApplicationContext(applicationContext);
114+
115+
// Important: Initialize the binder to trigger onInit()
116+
binder.afterPropertiesSet();
117+
114118
DirectChannel channel = new DirectChannel();
115119
KafkaProducerProperties extension = new KafkaProducerProperties();
116120
ExtendedProducerProperties<KafkaProducerProperties> properties = new ExtendedProducerProperties<>(

0 commit comments

Comments
 (0)