Skip to content

Commit aae9b8f

Browse files
committed
Introduce ShareAckMode enum for share consumers
The 4.0 boolean `explicitShareAcknowledgment` only controlled whether the listener received a non-null acknowledgment object. It had no effect on Kafka's `share.acknowledgement.mode`, so `acknowledge()` would throw `IllegalStateException` when the factory used the Kafka default (implicit mode). `ContainerProperties.ShareAckMode` (EXPLICIT, MANUAL, IMPLICIT) makes the modes explicit, with EXPLICIT as the default — the same philosophy as regular containers disabling auto.commit and owning the commit lifecycle themselves. For EXPLICIT and MANUAL the container now enforces `share.acknowledgement.mode=explicit` via consumer override properties, regardless of factory configuration. Other related fixes: - `RecordDeserializationException` handler now guards `acknowledge()` against IMPLICIT mode - `createShareConsumer` variant added to `ShareConsumerFactory` for consumer-level overrides - 4.0 dead code and naming issues cleaned up Deprecated methods keep the 4.0 API intact. Reference docs and What's New include a migration guide. Adding tests to verify the changes. Signed-off-by: Soby Chacko <soby.chacko@broadcom.com>
1 parent 30d242b commit aae9b8f

File tree

12 files changed

+714
-385
lines changed

12 files changed

+714
-385
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/kafka-queues.adoc

Lines changed: 179 additions & 132 deletions
Large diffs are not rendered by default.

spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,63 @@ The `@KafkaListener` annotation now supports an `ackMode` attribute, allowing in
1414
The attribute also supports SpEL expressions and property placeholders.
1515
See xref:kafka/receiving-messages/listener-annotation.adoc[`@KafkaListener` Annotation] for more information.
1616

17+
[[x41-share-ack-mode]]
18+
=== Share Consumer Acknowledgment Modes
19+
20+
The boolean `setExplicitShareAcknowledgment(boolean)` property on `ContainerProperties` has been replaced by the `ShareAckMode` enum, which clearly names the three distinct acknowledgment modes:
21+
22+
* `EXPLICIT` (default) — Container-managed.
23+
The container sends `ACCEPT` after successful processing and delegates error handling to the `ShareConsumerRecordRecoverer` (default: `REJECT`).
24+
This is the analogue of disabling `auto.commit` on a regular consumer.
25+
* `MANUAL` — Listener-managed.
26+
The listener must acknowledge each record via the provided `ShareAcknowledgment`.
27+
Subsequent polls are blocked until all records from the previous poll are acknowledged.
28+
* `IMPLICIT` — Kafka client implicit mode.
29+
The broker auto-accepts all records regardless of processing outcome.
30+
31+
Configure the mode on the factory:
32+
33+
[source,java]
34+
----
35+
factory.getContainerProperties().setShareAckMode(ContainerProperties.ShareAckMode.MANUAL);
36+
----
37+
38+
The deprecated `setExplicitShareAcknowledgment(true)` maps to `MANUAL`; `setExplicitShareAcknowledgment(false)` maps to `EXPLICIT`.
39+
See xref:kafka/kafka-queues.adoc#share-record-acknowledgment[Record Acknowledgment] for the full reference.
40+
41+
==== Migration Guide
42+
43+
**Default behavior is unchanged.**
44+
The old `setExplicitShareAcknowledgment(false)` default was already container-managed acknowledgment (the container sent `ACCEPT` on success), which is exactly what `ShareAckMode.EXPLICIT` does.
45+
No action is required for applications using the default.
46+
47+
**If you used `setExplicitShareAcknowledgment(true)`**, replace it:
48+
49+
[source,java]
50+
----
51+
// Before (4.0)
52+
factory.getContainerProperties().setExplicitShareAcknowledgment(true);
53+
54+
// After (4.1)
55+
factory.getContainerProperties().setShareAckMode(ContainerProperties.ShareAckMode.MANUAL);
56+
----
57+
58+
**If you set `share.acknowledgement.mode=implicit` in the factory configuration** (via `ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG`), this is a breaking change.
59+
In 4.0 this setting had no effect because the container always called `consumer.acknowledge()` regardless, which would have thrown `IllegalStateException` in true Kafka implicit mode.
60+
In 4.1, the container detects this conflict and logs a warning, then overrides the factory setting with explicit mode.
61+
To genuinely use Kafka client implicit mode — where the broker auto-accepts all records regardless of processing outcome — you must now opt in explicitly:
62+
63+
[source,java]
64+
----
65+
factory.getContainerProperties().setShareAckMode(ContainerProperties.ShareAckMode.IMPLICIT);
66+
----
67+
68+
[WARNING]
69+
====
70+
In `ShareAckMode.IMPLICIT`, the `ShareConsumerRecordRecoverer` is not consulted and processing errors do not influence acknowledgment.
71+
Records are always ACCEPTed by the broker.
72+
====
73+
1774
[[x41-share-consumer-error-handling]]
1875
=== Share Consumer Error Handling
1976

spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java

Lines changed: 0 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@
2020
import java.util.Collection;
2121
import java.util.regex.Pattern;
2222

23-
import org.apache.kafka.clients.consumer.ConsumerConfig;
24-
import org.apache.kafka.clients.consumer.internals.ShareAcknowledgementMode;
2523
import org.jspecify.annotations.Nullable;
2624

2725
import org.springframework.beans.BeanUtils;
@@ -183,11 +181,6 @@ protected void initializeContainer(ShareKafkaMessageListenerContainer<K, V> inst
183181
BeanUtils.copyProperties(this.containerProperties, properties, "topics", "topicPartitions", "topicPattern",
184182
"messageListener", "ackCount", "ackTime", "subBatchPerPartition", "kafkaConsumerProperties");
185183

186-
// Determine acknowledgment mode following Spring Kafka's configuration precedence patterns
187-
// Check factory-level properties first, then consumer factory config
188-
boolean explicitAck = determineExplicitAcknowledgment(properties);
189-
properties.setExplicitShareAcknowledgment(explicitAck);
190-
191184
// Set concurrency - endpoint setting takes precedence over factory setting
192185
Integer conc = endpoint.getConcurrency();
193186
if (conc != null) {
@@ -208,39 +201,6 @@ protected void initializeContainer(ShareKafkaMessageListenerContainer<K, V> inst
208201
.acceptIfNotNull(endpoint.getClientIdPrefix(), properties::setClientId);
209202
}
210203

211-
/**
212-
* Determine whether explicit acknowledgment is required following Spring Kafka's configuration precedence patterns.
213-
* <p>
214-
* Configuration precedence (highest to lowest):
215-
* <ol>
216-
* <li>Container Properties: {@code containerProperties.isExplicitShareAcknowledgment()} (if explicitly set via factory-level properties)</li>
217-
* <li>Consumer Config: {@code ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG}</li>
218-
* <li>Default: {@code false} (implicit acknowledgment)</li>
219-
* </ol>
220-
* @param containerProperties the container properties to check
221-
* @return true if explicit acknowledgment is required, false for implicit
222-
* @throws IllegalArgumentException if an invalid acknowledgment mode is configured
223-
*/
224-
private boolean determineExplicitAcknowledgment(ContainerProperties containerProperties) {
225-
// Check factory-level properties first
226-
// If explicitly set to true (non-default), use it with highest precedence
227-
if (this.containerProperties.isExplicitShareAcknowledgment()) {
228-
return true;
229-
}
230-
231-
// Check Kafka client configuration as fallback
232-
Object clientAckMode = this.shareConsumerFactory.getConfigurationProperties()
233-
.get(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG);
234-
235-
if (clientAckMode != null) {
236-
ShareAcknowledgementMode mode = ShareAcknowledgementMode.fromString(clientAckMode.toString());
237-
return mode == ShareAcknowledgementMode.EXPLICIT;
238-
}
239-
240-
// Default to implicit acknowledgment (false)
241-
return false;
242-
}
243-
244204
private static void validateShareConfiguration(KafkaListenerEndpoint endpoint) {
245205
// Validate that batch listeners aren't used with share consumers
246206
if (Boolean.TRUE.equals(endpoint.getBatchListener())) {

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultShareConsumerFactory.java

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.concurrent.ConcurrentHashMap;
2727
import java.util.function.Supplier;
2828

29+
import org.apache.kafka.clients.consumer.ConsumerConfig;
2930
import org.apache.kafka.clients.consumer.KafkaShareConsumer;
3031
import org.apache.kafka.clients.consumer.ShareConsumer;
3132
import org.apache.kafka.common.MetricName;
@@ -141,19 +142,42 @@ public ShareConsumer<K, V> createShareConsumer(@Nullable String groupId, @Nullab
141142
return createRawConsumer(groupId, clientId);
142143
}
143144

145+
@Override
146+
public ShareConsumer<K, V> createShareConsumer(@Nullable String groupId, @Nullable String clientId,
147+
Map<String, Object> overrideProperties) {
148+
return createRawConsumer(groupId, clientId, overrideProperties);
149+
}
150+
144151
/**
145-
* Actually create the consumer.
152+
* Create the consumer with no override properties.
153+
* Subclasses may override to customize consumer creation.
146154
* @param groupId the group id (maybe null).
147-
* @param clientId the client id.
155+
* @param clientId the client id (maybe null).
148156
* @return the share consumer.
149157
*/
150158
protected ShareConsumer<K, V> createRawConsumer(@Nullable String groupId, @Nullable String clientId) {
159+
return createRawConsumer(groupId, clientId, Collections.emptyMap());
160+
}
161+
162+
/**
163+
* Actually create the consumer, applying override properties on top of the
164+
* factory configuration. Override properties take precedence over factory
165+
* configuration; {@code groupId} and {@code clientId} are applied last.
166+
* @param groupId the group id (maybe null).
167+
* @param clientId the client id (maybe null).
168+
* @param overrideProperties properties to apply on top of the factory configuration.
169+
* @return the share consumer.
170+
* @since 4.1
171+
*/
172+
protected ShareConsumer<K, V> createRawConsumer(@Nullable String groupId, @Nullable String clientId,
173+
Map<String, Object> overrideProperties) {
151174
Map<String, Object> consumerProperties = new HashMap<>(this.configs);
175+
consumerProperties.putAll(overrideProperties);
152176
if (groupId != null) {
153-
consumerProperties.put("group.id", groupId);
177+
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
154178
}
155179
if (clientId != null) {
156-
consumerProperties.put("client.id", clientId);
180+
consumerProperties.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
157181
}
158182
return new ExtendedShareConsumer(consumerProperties);
159183
}

spring-kafka/src/main/java/org/springframework/kafka/core/ShareConsumerFactory.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,25 @@ public interface ShareConsumerFactory<K, V> {
4343
*/
4444
ShareConsumer<K, V> createShareConsumer(@Nullable String groupId, @Nullable String clientId);
4545

46+
/**
47+
* Create a share consumer with the provided group id, client id, and
48+
* additional properties that override the factory's configuration.
49+
* The container uses this to enforce internal configuration (e.g. acknowledgement
50+
* mode) without mutating the factory's configuration.
51+
* Implementations that do not override this method will fall back to
52+
* {@link #createShareConsumer(String, String)}, and the override properties
53+
* will be ignored.
54+
* @param groupId the group id (maybe null).
55+
* @param clientId the client id (maybe null).
56+
* @param overrideProperties properties to apply on top of the factory configuration.
57+
* @return the share consumer.
58+
* @since 4.1
59+
*/
60+
default ShareConsumer<K, V> createShareConsumer(@Nullable String groupId, @Nullable String clientId,
61+
Map<String, Object> overrideProperties) {
62+
return createShareConsumer(groupId, clientId);
63+
}
64+
4665
/**
4766
* Return an unmodifiable reference to the configuration map for this factory.
4867
* Useful for cloning to make a similar factory.

spring-kafka/src/main/java/org/springframework/kafka/listener/AcknowledgingShareConsumerAwareMessageListener.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,20 @@
2727
* <p>
2828
* This interface provides access to both the {@link ShareConsumer} instance and acknowledgment
2929
* capabilities. The acknowledgment parameter behavior depends on the container's
30-
* acknowledgment mode:
30+
* {@link ContainerProperties.ShareAckMode}:
3131
* <ul>
32-
* <li><strong>Explicit mode</strong>: The acknowledgment parameter is non-null and must
33-
* be used to acknowledge each record</li>
34-
* <li><strong>Implicit mode</strong>: The acknowledgment parameter is null and records
35-
* are automatically acknowledged</li>
32+
* <li><strong>MANUAL</strong>: The acknowledgment is non-null; the listener must call
33+
* {@link org.springframework.kafka.support.ShareAcknowledgment#acknowledge()},
34+
* {@link org.springframework.kafka.support.ShareAcknowledgment#release()}, or
35+
* {@link org.springframework.kafka.support.ShareAcknowledgment#reject()} for every record</li>
36+
* <li><strong>EXPLICIT</strong>: The acknowledgment is null; the container sends ACCEPT
37+
* automatically on success and delegates errors to the {@link ShareConsumerRecordRecoverer}</li>
38+
* <li><strong>IMPLICIT</strong>: The acknowledgment is null; the broker auto-accepts all records</li>
3639
* </ul>
3740
* <p>
3841
* This is the primary listener interface for share consumers when you need access
39-
* to the ShareConsumer instance or need explicit acknowledgment control.
42+
* to the {@link ShareConsumer} instance or need listener-managed acknowledgment
43+
* control ({@code ShareAckMode.MANUAL}).
4044
*
4145
* @param <K> the key type
4246
* @param <V> the value type
@@ -52,12 +56,11 @@
5256
public interface AcknowledgingShareConsumerAwareMessageListener<K, V> extends GenericMessageListener<ConsumerRecord<K, V>> {
5357

5458
/**
55-
* Invoked with data from kafka, an acknowledgment, and provides access to the consumer.
56-
* When explicit acknowledgment mode is used, the acknowledgment parameter will be non-null
57-
* and must be used to acknowledge the record. When implicit acknowledgment mode is used,
58-
* the acknowledgment parameter will be null.
59+
* Invoked with data from Kafka, an acknowledgment, and provides access to the consumer.
60+
* The acknowledgment is non-null only in {@link ContainerProperties.ShareAckMode#MANUAL} mode;
61+
* it is null in {@code EXPLICIT} and {@code IMPLICIT} modes.
5962
* @param data the data to be processed.
60-
* @param acknowledgment the acknowledgment (nullable in implicit mode).
63+
* @param acknowledgment the acknowledgment, or {@code null} if not in MANUAL mode.
6164
* @param consumer the consumer.
6265
*/
6366
void onShareRecord(ConsumerRecord<K, V> data, @Nullable ShareAcknowledgment acknowledgment, ShareConsumer<?, ?> consumer);

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java

Lines changed: 70 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import org.springframework.aop.framework.ProxyFactory;
3636
import org.springframework.aop.support.AopUtils;
3737
import org.springframework.core.task.AsyncTaskExecutor;
38-
import org.springframework.kafka.support.ShareAcknowledgment;
3938
import org.springframework.kafka.support.TopicPartitionOffset;
4039
import org.springframework.kafka.support.micrometer.KafkaListenerObservationConvention;
4140
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
@@ -117,6 +116,40 @@ public enum AckMode {
117116

118117
}
119118

119+
/**
120+
* The acknowledgment mode for share consumer containers.
121+
* @since 4.1
122+
* @see #setShareAckMode(ShareAckMode)
123+
*/
124+
public enum ShareAckMode {
125+
126+
/**
127+
* Kafka client implicit mode. All records are automatically acknowledged
128+
* as ACCEPT by the broker regardless of processing outcome. No per-record
129+
* acknowledgment control is available in this mode. Equivalent to setting
130+
* {@code share.acknowledgement.mode=implicit} on the Kafka client.
131+
*/
132+
IMPLICIT,
133+
134+
/**
135+
* Kafka client explicit mode, container-managed. The container sends ACCEPT
136+
* after successful processing and delegates to the
137+
* {@link ShareConsumerRecordRecoverer} (default: REJECT) on error.
138+
* This is the default.
139+
*/
140+
EXPLICIT,
141+
142+
/**
143+
* Kafka client explicit mode, listener-managed. The listener must acknowledge
144+
* each record manually via the provided
145+
* {@link org.springframework.kafka.support.ShareAcknowledgment}.
146+
* Subsequent polls are blocked until all records from the previous poll
147+
* are acknowledged.
148+
*/
149+
MANUAL
150+
151+
}
152+
120153
/**
121154
* Offset commit behavior during assignment.
122155
* @since 2.3.6
@@ -314,7 +347,7 @@ public enum EOSMode {
314347

315348
private boolean recordObservationsInBatch;
316349

317-
private boolean explicitShareAcknowledgment = false;
350+
private ShareAckMode shareAckMode = ShareAckMode.EXPLICIT; // default: container-managed explicit mode
318351

319352
private Duration shareAcknowledgmentTimeout = Duration.ofSeconds(30); // Align with Kafka's share.record.lock.duration.ms default
320353

@@ -1121,38 +1154,54 @@ public void setRecordObservationsInBatch(boolean recordObservationsInBatch) {
11211154
}
11221155

11231156
/**
1124-
* Set whether explicit acknowledgment is required for share consumer containers.
1157+
* Set the acknowledgment mode for share consumer containers.
11251158
* <p>
11261159
* This setting only applies to share consumer containers and is ignored
11271160
* by regular consumer containers.
1128-
* <p>
1129-
* When set to {@code false} (default), records are automatically acknowledged
1130-
* as ACCEPT when the next poll occurs or when commitSync/commitAsync is called.
1131-
* <p>
1132-
* When set to {@code true}, the application must explicitly acknowledge each
1133-
* record using the provided {@link ShareAcknowledgment}.
1134-
* @param explicitShareAcknowledgment true for explicit acknowledgment, false for implicit
1135-
* @since 4.0
1136-
* @see ShareAcknowledgment
1161+
* @param shareAckMode the acknowledgment mode; default {@link ShareAckMode#EXPLICIT}.
1162+
* @since 4.1
1163+
* @see ShareAckMode
1164+
*/
1165+
public void setShareAckMode(ShareAckMode shareAckMode) {
1166+
this.shareAckMode = shareAckMode;
1167+
}
1168+
1169+
/**
1170+
* Return the acknowledgment mode for share consumer containers.
1171+
* @return the acknowledgment mode.
1172+
* @since 4.1
1173+
*/
1174+
public ShareAckMode getShareAckMode() {
1175+
return this.shareAckMode;
1176+
}
1177+
1178+
/**
1179+
* Set whether to use explicit share acknowledgment mode.
1180+
* @param explicitShareAcknowledgment {@code true} to use {@link ShareAckMode#MANUAL},
1181+
* {@code false} to use {@link ShareAckMode#EXPLICIT}.
1182+
* @deprecated in favor of {@link #setShareAckMode(ShareAckMode)} with
1183+
* {@link ShareAckMode#MANUAL}.
11371184
*/
1185+
@Deprecated(since = "4.1", forRemoval = false)
11381186
public void setExplicitShareAcknowledgment(boolean explicitShareAcknowledgment) {
1139-
this.explicitShareAcknowledgment = explicitShareAcknowledgment;
1187+
this.shareAckMode = explicitShareAcknowledgment ? ShareAckMode.MANUAL : ShareAckMode.EXPLICIT;
11401188
}
11411189

11421190
/**
1143-
* Check whether explicit acknowledgment is required for share consumer containers.
1144-
* @return true if explicit acknowledgment is required, false for implicit acknowledgment
1191+
* Return whether the current mode is {@link ShareAckMode#MANUAL}.
1192+
* @return {@code true} if the current {@link ShareAckMode} is {@link ShareAckMode#MANUAL}.
1193+
* @deprecated in favor of {@link #getShareAckMode()}.
11451194
*/
1195+
@Deprecated(since = "4.1", forRemoval = false)
11461196
public boolean isExplicitShareAcknowledgment() {
1147-
return this.explicitShareAcknowledgment;
1197+
return this.shareAckMode == ShareAckMode.MANUAL;
11481198
}
11491199

11501200
/**
1151-
* Set the timeout for share acknowledgments in explicit mode.
1201+
* Set the timeout for share acknowledgments in {@link ShareAckMode#MANUAL} mode.
11521202
* <p>
1153-
* When a record is not acknowledged within this timeout, a warning
1154-
* will be logged to help identify missing acknowledgment calls.
1155-
* This only applies when using explicit acknowledgment mode.
1203+
* When a record is not acknowledged within this timeout, a warning will be logged
1204+
* to help identify missing acknowledgment calls. Only applies to {@code MANUAL} mode.
11561205
* <p>
11571206
* Default is 30 seconds.
11581207
* @param shareAcknowledgmentTimeout the timeout duration
@@ -1163,7 +1212,7 @@ public void setShareAcknowledgmentTimeout(Duration shareAcknowledgmentTimeout) {
11631212
}
11641213

11651214
/**
1166-
* Get the timeout for share acknowledgments in explicit mode.
1215+
* Get the timeout for share acknowledgments in {@link ShareAckMode#MANUAL} mode.
11671216
* @return the acknowledgment timeout
11681217
* @since 4.0
11691218
*/

0 commit comments

Comments
 (0)