3636import org .apache .kafka .common .config .ConfigResource ;
3737import org .apache .kafka .common .serialization .StringDeserializer ;
3838import org .apache .kafka .common .serialization .StringSerializer ;
39+ import org .awaitility .Awaitility ;
3940import org .jspecify .annotations .Nullable ;
4041import org .junit .jupiter .api .Test ;
4142
4243import org .springframework .beans .factory .annotation .Autowired ;
44+ import org .springframework .beans .factory .annotation .Qualifier ;
4345import org .springframework .context .annotation .Bean ;
4446import org .springframework .context .annotation .Configuration ;
4547import org .springframework .kafka .annotation .EnableKafka ;
4648import org .springframework .kafka .annotation .KafkaListener ;
49+ import org .springframework .kafka .config .KafkaListenerEndpointRegistry ;
4750import org .springframework .kafka .config .ShareKafkaListenerContainerFactory ;
4851import org .springframework .kafka .core .DefaultKafkaProducerFactory ;
4952import org .springframework .kafka .core .DefaultShareConsumerFactory ;
7578 "share-listener-error-handling-test" ,
7679 "share-listener-factory-props-test"
7780},
81+ partitions = 1 ,
7882 brokerProperties = {
7983 "share.coordinator.state.topic.replication.factor=1" ,
8084 "share.coordinator.state.topic.min.isr=1"
@@ -87,6 +91,9 @@ class ShareKafkaListenerIntegrationTests {
8791 @ Autowired
8892 KafkaTemplate <String , String > kafkaTemplate ;
8993
94+ @ Autowired
95+ KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry ;
96+
9097 @ Test
9198 void shouldSupportBasicShareKafkaListener () throws Exception {
9299 final String topic = "share-listener-basic-test" ;
@@ -95,6 +102,7 @@ void shouldSupportBasicShareKafkaListener() throws Exception {
95102
96103 // Send test message
97104 kafkaTemplate .send (topic , "basic-test-message" );
105+ kafkaTemplate .flush ();
98106
99107 // Wait for processing
100108 assertThat (BasicTestListener .latch .await (10 , TimeUnit .SECONDS )).isTrue ();
@@ -111,6 +119,7 @@ void shouldSupportExplicitAcknowledgmentWithShareAcknowledgment() throws Excepti
111119 kafkaTemplate .send (topic , "accept" , "accept-message" );
112120 kafkaTemplate .send (topic , "release" , "release-message" );
113121 kafkaTemplate .send (topic , "reject" , "reject-message" );
122+ kafkaTemplate .flush ();
114123
115124 // Wait for processing
116125 assertThat (ExplicitAckTestListener .latch .await (15 , TimeUnit .SECONDS )).isTrue ();
@@ -134,6 +143,7 @@ void shouldSupportShareConsumerAwareListener() throws Exception {
134143
135144 // Send test message
136145 kafkaTemplate .send (topic , "consumer-aware-message" );
146+ kafkaTemplate .flush ();
137147
138148 // Wait for processing
139149 assertThat (ShareConsumerAwareTestListener .latch .await (10 , TimeUnit .SECONDS )).isTrue ();
@@ -147,8 +157,12 @@ void shouldSupportAcknowledgingShareConsumerAwareListener() throws Exception {
147157 final String groupId = "share-ack-consumer-aware-group" ;
148158 setShareAutoOffsetResetEarliest (this .broker .getBrokersAsString (), groupId );
149159
160+ // This test can run first (JUnit 6); wait for containers before producing so share consumers are subscribed.
161+ awaitRunningShareListenerContainers ();
162+
150163 // Send test message
151164 kafkaTemplate .send (topic , "ack-consumer-aware-message" );
165+ kafkaTemplate .flush ();
152166
153167 // Wait for processing
154168 assertThat (AckShareConsumerAwareTestListener .latch .await (30 , TimeUnit .SECONDS )).isTrue ();
@@ -168,6 +182,7 @@ void shouldHandleMixedAcknowledgmentScenarios() throws Exception {
168182 kafkaTemplate .send (topic , "success1" , "success-message-1" );
169183 kafkaTemplate .send (topic , "success2" , "success-message-2" );
170184 kafkaTemplate .send (topic , "retry" , "retry-message" );
185+ kafkaTemplate .flush ();
171186
172187 // Wait for processing
173188 assertThat (MixedAckTestListener .processedLatch .await (15 , TimeUnit .SECONDS )).isTrue ();
@@ -189,6 +204,7 @@ void shouldHandleProcessingErrorsCorrectly() throws Exception {
189204 kafkaTemplate .send (topic , "success" , "success-message" );
190205 kafkaTemplate .send (topic , "error" , "error-message" );
191206 kafkaTemplate .send (topic , "success2" , "success-message-2" );
207+ kafkaTemplate .flush ();
192208
193209 // Wait for processing
194210 assertThat (ErrorHandlingTestListener .latch .await (10 , TimeUnit .SECONDS )).isTrue ();
@@ -206,6 +222,7 @@ void shouldSupportExplicitAcknowledgmentViaFactoryContainerProperties() throws E
206222
207223 // Send test message
208224 kafkaTemplate .send (topic , "factory-test" , "factory-props-message" );
225+ kafkaTemplate .flush ();
209226
210227 // Wait for processing
211228 assertThat (FactoryPropsTestListener .latch .await (10 , TimeUnit .SECONDS )).isTrue ();
@@ -243,6 +260,20 @@ private boolean isAcknowledgedInternal(ShareAcknowledgment ack) {
243260 }
244261 }
245262
263+ /**
264+ * Wait until all listener containers have started (needed when a test runs before others).
265+ */
266+ private void awaitRunningShareListenerContainers () {
267+ Awaitility .await ()
268+ .atMost (30 , TimeUnit .SECONDS )
269+ .pollInterval (100 , TimeUnit .MILLISECONDS )
270+ .untilAsserted (() -> {
271+ assertThat (this .kafkaListenerEndpointRegistry .getListenerContainerIds ()).isNotEmpty ();
272+ this .kafkaListenerEndpointRegistry .getListenerContainers ().forEach (container ->
273+ assertThat (container .isRunning ()).isTrue ());
274+ });
275+ }
276+
246277 @ Configuration
247278 @ EnableKafka
248279 static class TestConfig {
@@ -262,7 +293,7 @@ public ShareConsumerFactory<String, String> explicitShareConsumerFactory(Embedde
262293 configs .put (ConsumerConfig .BOOTSTRAP_SERVERS_CONFIG , broker .getBrokersAsString ());
263294 configs .put (ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG , StringDeserializer .class );
264295 configs .put (ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG , StringDeserializer .class );
265- configs .put ("share.acknowledgement.mode" , "explicit" );
296+ configs .put (ConsumerConfig . SHARE_ACKNOWLEDGEMENT_MODE_CONFIG , "explicit" );
266297 return new DefaultShareConsumerFactory <>(configs );
267298 }
268299
@@ -280,10 +311,10 @@ public ShareKafkaListenerContainerFactory<String, String> explicitShareKafkaList
280311
281312 @ Bean
282313 public ShareKafkaListenerContainerFactory <String , String > factoryPropsShareKafkaListenerContainerFactory (
283- ShareConsumerFactory <String , String > shareConsumerFactory ) {
314+ @ Qualifier ( "explicitShareConsumerFactory" ) ShareConsumerFactory <String , String > explicitShareConsumerFactory ) {
284315 ShareKafkaListenerContainerFactory <String , String > factory =
285- new ShareKafkaListenerContainerFactory <>(shareConsumerFactory );
286- // Configure explicit acknowledgment via factory's container properties
316+ new ShareKafkaListenerContainerFactory <>(explicitShareConsumerFactory );
317+ // Configure explicit acknowledgment via factory's container properties (consumer must use explicit mode too)
287318 factory .getContainerProperties ().setExplicitShareAcknowledgment (true );
288319 return factory ;
289320 }
0 commit comments