Skip to content

Commit f672fe7

Browse files
committed
Address PR review feedback for ShareAckMode
- Simplify whats-new.adoc and move detailed migration guide to ref docs - Remove try-catch around `getConfigurationProperties()` - let it fail if not implemented Signed-off-by: Soby Chacko <soby.chacko@broadcom.com>
1 parent aae9b8f commit f672fe7

File tree

3 files changed

+43
-68
lines changed

3 files changed

+43
-68
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/kafka-queues.adoc

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -524,6 +524,37 @@ Use this mode only when per-record delivery guarantees are not required.
524524
|Broker auto-ACCEPTs (no recovery)
525525
|===
526526

527+
[[share-ack-mode-migration]]
528+
=== Migration from 4.0
529+
530+
In 4.0, share consumer acknowledgment was controlled by `setExplicitShareAcknowledgment(boolean)`.
531+
In 4.1, this is replaced by `setShareAckMode(ShareAckMode)`.
532+
The deprecated method still works: `true` maps to `MANUAL`, `false` maps to `EXPLICIT`.
533+
534+
**Default behavior is unchanged.**
535+
The 4.0 default (`setExplicitShareAcknowledgment(false)`) was container-managed acknowledgment, which is exactly what `ShareAckMode.EXPLICIT` does.
536+
No action is required for applications using the default.
537+
538+
**If you used `setExplicitShareAcknowledgment(true)`**, replace it:
539+
540+
[source,java]
541+
----
542+
// Before (4.0)
543+
factory.getContainerProperties().setExplicitShareAcknowledgment(true);
544+
545+
// After (4.1)
546+
factory.getContainerProperties().setShareAckMode(ContainerProperties.ShareAckMode.MANUAL);
547+
----
548+
549+
**If you set `share.acknowledgement.mode=implicit` in the factory configuration** (via `ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG`), note that in 4.0 this setting had no effect because the container always called `consumer.acknowledge()` regardless.
550+
In 4.1, the container detects this conflict, logs a warning, and overrides the factory setting with explicit mode.
551+
To use Kafka client implicit mode, set it explicitly on the container:
552+
553+
[source,java]
554+
----
555+
factory.getContainerProperties().setShareAckMode(ContainerProperties.ShareAckMode.IMPLICIT);
556+
----
557+
527558
[[share-acknowledgment-types]]
528559
=== Acknowledgment Types
529560

spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc

Lines changed: 4 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -17,59 +17,10 @@ See xref:kafka/receiving-messages/listener-annotation.adoc[`@KafkaListener` Anno
1717
[[x41-share-ack-mode]]
1818
=== Share Consumer Acknowledgment Modes
1919

20-
The boolean `setExplicitShareAcknowledgment(boolean)` property on `ContainerProperties` has been replaced by the `ShareAckMode` enum, which clearly names the three distinct acknowledgment modes:
21-
22-
* `EXPLICIT` (default) — Container-managed.
23-
The container sends `ACCEPT` after successful processing and delegates error handling to the `ShareConsumerRecordRecoverer` (default: `REJECT`).
24-
This is the analogue of disabling `auto.commit` on a regular consumer.
25-
* `MANUAL` — Listener-managed.
26-
The listener must acknowledge each record via the provided `ShareAcknowledgment`.
27-
Subsequent polls are blocked until all records from the previous poll are acknowledged.
28-
* `IMPLICIT` — Kafka client implicit mode.
29-
The broker auto-accepts all records regardless of processing outcome.
30-
31-
Configure the mode on the factory:
32-
33-
[source,java]
34-
----
35-
factory.getContainerProperties().setShareAckMode(ContainerProperties.ShareAckMode.MANUAL);
36-
----
37-
38-
The deprecated `setExplicitShareAcknowledgment(true)` maps to `MANUAL`; `setExplicitShareAcknowledgment(false)` maps to `EXPLICIT`.
39-
See xref:kafka/kafka-queues.adoc#share-record-acknowledgment[Record Acknowledgment] for the full reference.
40-
41-
==== Migration Guide
42-
43-
**Default behavior is unchanged.**
44-
The old `setExplicitShareAcknowledgment(false)` default was already container-managed acknowledgment (the container sent `ACCEPT` on success), which is exactly what `ShareAckMode.EXPLICIT` does.
45-
No action is required for applications using the default.
46-
47-
**If you used `setExplicitShareAcknowledgment(true)`**, replace it:
48-
49-
[source,java]
50-
----
51-
// Before (4.0)
52-
factory.getContainerProperties().setExplicitShareAcknowledgment(true);
53-
54-
// After (4.1)
55-
factory.getContainerProperties().setShareAckMode(ContainerProperties.ShareAckMode.MANUAL);
56-
----
57-
58-
**If you set `share.acknowledgement.mode=implicit` in the factory configuration** (via `ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG`), this is a breaking change.
59-
In 4.0 this setting had no effect because the container always called `consumer.acknowledge()` regardless, which would have thrown `IllegalStateException` in true Kafka implicit mode.
60-
In 4.1, the container detects this conflict and logs a warning, then overrides the factory setting with explicit mode.
61-
To genuinely use Kafka client implicit mode — where the broker auto-accepts all records regardless of processing outcome — you must now opt in explicitly:
62-
63-
[source,java]
64-
----
65-
factory.getContainerProperties().setShareAckMode(ContainerProperties.ShareAckMode.IMPLICIT);
66-
----
67-
68-
[WARNING]
69-
====
70-
In `ShareAckMode.IMPLICIT`, the `ShareConsumerRecordRecoverer` is not consulted and processing errors do not influence acknowledgment.
71-
Records are always ACCEPTed by the broker.
72-
====
20+
The boolean `setExplicitShareAcknowledgment(boolean)` property on `ContainerProperties` has been replaced by the `ShareAckMode` enum (`EXPLICIT`, `MANUAL`, `IMPLICIT`).
21+
The deprecated method still works: `true` maps to `MANUAL`, `false` maps to `EXPLICIT`.
22+
Default behavior is unchanged — applications using the default require no migration.
23+
See xref:kafka/kafka-queues.adoc#share-record-acknowledgment[Record Acknowledgment] for mode descriptions and xref:kafka/kafka-queues.adoc#share-ack-mode-migration[Migration from 4.0] for upgrade details.
7324

7425
[[x41-share-consumer-error-handling]]
7526
=== Share Consumer Error Handling

spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,6 @@ private class ShareListenerConsumer implements Runnable {
310310
this.ackTimeoutMs = containerProperties.getShareAcknowledgmentTimeout().toMillis();
311311

312312
if (shareAckMode == ContainerProperties.ShareAckMode.IMPLICIT) {
313-
// Kafka implicit mode: the broker auto-ACCEPTs all records; acknowledge() must not be called.
314313
ShareConsumerRecordRecoverer recoverer =
315314
ShareKafkaMessageListenerContainer.this.getShareConsumerRecordRecoverer();
316315
if (recoverer != ShareConsumerRecordRecoverer.REJECTING) {
@@ -325,20 +324,14 @@ private class ShareListenerConsumer implements Runnable {
325324
else {
326325
// EXPLICIT and MANUAL both use Kafka client explicit mode so the container
327326
// has full per-record acknowledgment control.
328-
try {
329-
Object configured = ShareKafkaMessageListenerContainer.this.shareConsumerFactory
330-
.getConfigurationProperties()
331-
.get(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG);
332-
if (configured != null && "implicit".equalsIgnoreCase(configured.toString())) {
333-
this.logger.warn("Factory configuration has share.acknowledgement.mode=implicit "
334-
+ "but ShareAckMode." + shareAckMode + " is active; the container will "
335-
+ "override it with explicit mode. To use implicit mode, set "
336-
+ "ShareAckMode.IMPLICIT on the container properties instead.");
337-
}
338-
}
339-
catch (UnsupportedOperationException ex) {
340-
this.logger.debug(ex, "ShareConsumerFactory does not expose configuration properties; "
341-
+ "skipping implicit acknowledgment mode conflict check.");
327+
Object configured = ShareKafkaMessageListenerContainer.this.shareConsumerFactory
328+
.getConfigurationProperties()
329+
.get(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG);
330+
if (configured != null && "implicit".equalsIgnoreCase(configured.toString())) {
331+
this.logger.warn("Factory configuration has share.acknowledgement.mode=implicit "
332+
+ "but ShareAckMode." + shareAckMode + " is active; the container will "
333+
+ "override it with explicit mode. To use implicit mode, set "
334+
+ "ShareAckMode.IMPLICIT on the container properties instead.");
342335
}
343336
this.consumer = ShareKafkaMessageListenerContainer.this.shareConsumerFactory.createShareConsumer(
344337
ShareKafkaMessageListenerContainer.this.getGroupId(),

0 commit comments

Comments
 (0)