Skip to content

Commit 8ee8d0f

Browse files
authored
GH-4321: Add error handling to share consumer container
Fixes: #4321 - A single record that failed deserialization or had a corrupt batch was stopping the whole consumer thread. Catch `RecordDeserializationException` and `CorruptRecordException` from `poll()`, `REJECT` or log and continue, so the thread keeps running and later records are still processed. - Every listener exception was always REJECTing the record, with no way to RELEASE for transient failures. Introduce `ShareConsumerRecordRecoverer` (default: `DefaultShareConsumerRecordRecoverer`, `REJECT`) so callers can plug in `ACCEPT/RELEASE/REJECT` per failure; wire it on `AbstractShareKafkaMessageListenerContainer` and `ShareKafkaListenerContainerFactory`. - In explicit mode, failed records were only removed from pending acks and not from `acknowledgmentTimestamps`, which could leak. Clear both on error. - Wire record recoverer in the `JavaUtils` chain with other optional config for consistent style. - Drop null check from the setter; rely on signature and nullability. - Make recoverer getter protected so subclasses in other packages can use it without expanding the public API. - Put default behavior on the interface as `REJECTING` and `RELEASING` so one extension point, no extra class, and a built-in for log-and-RELEASE; use `LogAccessor(Class)` instead of `LogFactory`. - Use a plain string for the `CorruptRecordException` log; the message is fixed so a supplier adds nothing. - Test changes. Signed-off-by: Soby Chacko <soby.chacko@broadcom.com>
1 parent 19070c3 commit 8ee8d0f

File tree

9 files changed

+447
-10
lines changed

9 files changed

+447
-10
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/kafka-queues.adoc

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -573,7 +573,7 @@ In explicit acknowledgment mode, the container enforces important constraints:
573573

574574
Poll Blocking: Subsequent polls are blocked until all records from the previous poll are acknowledged.
575575
One-time Acknowledgment: Each record can only be acknowledged once.
576-
Error Handling: If processing throws an exception, the record is automatically acknowledged as `REJECT`.
576+
Error Handling: If processing throws an exception, the outcome is determined by the <<share-error-handling,ShareConsumerRecordRecoverer>> (default: `REJECT`).
577577

578578
[WARNING]
579579
In explicit mode, failing to acknowledge records will block further message processing.
@@ -609,6 +609,75 @@ or ack.reject() for every record.
609609

610610
This feature helps developers quickly identify when acknowledgment calls are missing from their code, preventing the common issue of "Spring Kafka does not consume new records any more" due to forgotten acknowledgments.
611611

612+
[[share-error-handling]]
613+
=== Error Handling
614+
615+
The share container handles errors at two levels: during `poll()` (poll-level) and when the listener throws (listener-level).
616+
617+
==== Poll-Level Error Handling
618+
619+
If `consumer.poll()` throws an exception, the container handles it so the consumer thread does not stop:
620+
621+
* **RecordDeserializationException**: Thrown when a record cannot be deserialized (for example, invalid UTF-8 for `StringDeserializer`).
622+
The container catches it, REJECTs the affected record using the topic, partition, and offset from the exception, logs a warning, and continues polling.
623+
Subsequent records are processed normally.
624+
* **CorruptRecordException**: Thrown when CRC validation fails (for example, with `check.crcs` enabled).
625+
The Kafka client automatically rejects the corrupt batch.
626+
The container catches the exception, logs it, and continues polling.
627+
628+
Without this handling, a single bad record would terminate the consumer thread.
629+
630+
==== Listener-Level Error Handling: ShareConsumerRecordRecoverer
631+
632+
When the listener throws an exception, the container delegates to a `ShareConsumerRecordRecoverer` to decide whether to ACCEPT, RELEASE, or REJECT the failed record.
633+
The default is `ShareConsumerRecordRecoverer.REJECTING` (log and REJECT); `ShareConsumerRecordRecoverer.RELEASING` is also available (log and RELEASE for redelivery).
634+
635+
You can provide a custom recoverer to RELEASE records for transient failures (for example, downstream timeouts) so they can be redelivered to another consumer, while REJECTing permanent failures.
636+
637+
===== Configuring a Custom Recoverer
638+
639+
Set the recoverer on the container or on the factory so it applies to all containers created by that factory:
640+
641+
[source,java]
642+
----
643+
@Bean
644+
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
645+
ShareConsumerFactory<String, String> shareConsumerFactory) {
646+
647+
ShareKafkaListenerContainerFactory<String, String> factory =
648+
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
649+
650+
factory.setShareConsumerRecordRecoverer((record, ex) -> {
651+
if (ex instanceof TransientException || ex.getCause() instanceof TimeoutException) {
652+
return AcknowledgeType.RELEASE;
653+
}
654+
return AcknowledgeType.REJECT;
655+
});
656+
657+
return factory;
658+
}
659+
----
660+
661+
You can also set the recoverer on an individual container via `container.setShareConsumerRecordRecoverer(recoverer)`.
662+
663+
===== Recoverer Behavior
664+
665+
* The recoverer must not return `RENEW`; the container treats it as REJECT.
666+
* If the recoverer itself throws, the container falls back to REJECT for that record.
667+
* The default recoverer logs at `ERROR` and returns `REJECT` for every exception.
668+
669+
===== Relation to Poison Message Protection
670+
671+
Broker-side delivery count (see <<share-poison-message-protection>>) limits how many times a record can be acquired.
672+
Even if your recoverer always returns `RELEASE`, the broker will eventually archive the record after the configured limit (default 5), so poison messages cannot loop forever.
673+
674+
==== Difference from Traditional Container Error Handling
675+
676+
Share consumers do not use `CommonErrorHandler`.
677+
That interface is designed for partition-based consumers (seek, remaining records, offset commit).
678+
Share consumers use acknowledgment-based recovery: the recoverer only decides ACCEPT, RELEASE, or REJECT; the container performs the acknowledgment.
679+
There is no seek or dead-letter topic integration in the share container.
680+
612681
[[share-acknowledgment-examples]]
613682
=== Acknowledgment Examples
614683

spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,14 @@ For changes in earlier versions, see xref:appendix/change-history.adoc[Change Hi
1313
The `@KafkaListener` annotation now supports an `ackMode` attribute, allowing individual listeners to override the container factory's default acknowledgment mode without creating separate container factory beans.
1414
The attribute also supports SpEL expressions and property placeholders.
1515
See xref:kafka/receiving-messages/listener-annotation.adoc[`@KafkaListener` Annotation] for more information.
16+
17+
[[x41-share-consumer-error-handling]]
18+
=== Share Consumer Error Handling
19+
20+
Share consumer containers now provide configurable error handling:
21+
22+
* **Poll-level**: `RecordDeserializationException` and `CorruptRecordException` from `poll()` are caught so the consumer thread continues; undeserializable records are REJECTed and the next poll proceeds.
23+
* **Listener-level**: A `ShareConsumerRecordRecoverer` interface decides whether to ACCEPT, RELEASE, or REJECT when the listener throws.
24+
The default is `ShareConsumerRecordRecoverer.REJECTING`; `RELEASING` is also available.
25+
You can set a custom recoverer on the factory or container.
26+
* See xref:kafka/kafka-queues.adoc#share-error-handling[Share consumer error handling] in the Kafka Queues documentation.

spring-kafka/src/main/java/org/springframework/kafka/config/ShareKafkaListenerContainerFactory.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import org.apache.kafka.clients.consumer.ConsumerConfig;
2424
import org.apache.kafka.clients.consumer.internals.ShareAcknowledgementMode;
25+
import org.jspecify.annotations.Nullable;
2526

2627
import org.springframework.beans.BeanUtils;
2728
import org.springframework.context.ApplicationContext;
@@ -30,6 +31,7 @@
3031
import org.springframework.context.ApplicationEventPublisherAware;
3132
import org.springframework.kafka.core.ShareConsumerFactory;
3233
import org.springframework.kafka.listener.ContainerProperties;
34+
import org.springframework.kafka.listener.ShareConsumerRecordRecoverer;
3335
import org.springframework.kafka.listener.ShareKafkaMessageListenerContainer;
3436
import org.springframework.kafka.support.JavaUtils;
3537
import org.springframework.kafka.support.TopicPartitionOffset;
@@ -68,6 +70,8 @@ public class ShareKafkaListenerContainerFactory<K, V>
6870

6971
private int concurrency = 1;
7072

73+
private @Nullable ShareConsumerRecordRecoverer recordRecoverer;
74+
7175
@SuppressWarnings("NullAway.Init")
7276
private ApplicationEventPublisher applicationEventPublisher;
7377

@@ -119,6 +123,17 @@ public void setConcurrency(int concurrency) {
119123
this.concurrency = concurrency;
120124
}
121125

126+
/**
127+
* Set a {@link ShareConsumerRecordRecoverer} to use for all containers created
128+
* by this factory. If not set, the container's default
129+
* ({@link org.springframework.kafka.listener.ShareConsumerRecordRecoverer#REJECTING}) is used.
130+
* @param recordRecoverer the recoverer
131+
* @since 4.1
132+
*/
133+
public void setShareConsumerRecordRecoverer(ShareConsumerRecordRecoverer recordRecoverer) {
134+
this.recordRecoverer = recordRecoverer;
135+
}
136+
122137
/**
123138
* Obtain the factory-level container properties - set properties as needed
124139
* and they will be copied to each listener container instance created by this factory.
@@ -188,6 +203,7 @@ protected void initializeContainer(ShareKafkaMessageListenerContainer<K, V> inst
188203
instance.setApplicationEventPublisher(this.applicationEventPublisher);
189204

190205
JavaUtils.INSTANCE
206+
.acceptIfNotNull(this.recordRecoverer, instance::setShareConsumerRecordRecoverer)
191207
.acceptIfNotNull(endpoint.getGroupId(), properties::setGroupId)
192208
.acceptIfNotNull(endpoint.getClientIdPrefix(), properties::setClientId);
193209
}

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractShareKafkaMessageListenerContainer.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ public abstract class AbstractShareKafkaMessageListenerContainer<K, V>
8383

8484
private volatile boolean running = false;
8585

86+
private ShareConsumerRecordRecoverer recordRecoverer = ShareConsumerRecordRecoverer.REJECTING;
87+
8688
/**
8789
* Construct an instance with the provided factory and properties.
8890
* @param shareConsumerFactory the factory.
@@ -239,6 +241,27 @@ public void setupMessageListener(Object messageListener) {
239241
this.containerProperties.setMessageListener(messageListener);
240242
}
241243

244+
/**
245+
* Set the {@link ShareConsumerRecordRecoverer} to use when a listener throws
246+
* an exception. The recoverer determines whether to ACCEPT, RELEASE, or
247+
* REJECT the failed record.
248+
* @param recoverer the recoverer
249+
* @since 4.1
250+
* @see ShareConsumerRecordRecoverer
251+
*/
252+
public void setShareConsumerRecordRecoverer(ShareConsumerRecordRecoverer recoverer) {
253+
this.recordRecoverer = recoverer;
254+
}
255+
256+
/**
257+
* Return the configured {@link ShareConsumerRecordRecoverer}.
258+
* @return the recoverer
259+
* @since 4.1
260+
*/
261+
protected ShareConsumerRecordRecoverer getShareConsumerRecordRecoverer() {
262+
return this.recordRecoverer;
263+
}
264+
242265
protected abstract void doStart();
243266

244267
protected abstract void doStop();
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright 2025-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import org.apache.kafka.clients.consumer.AcknowledgeType;
20+
import org.apache.kafka.clients.consumer.ConsumerRecord;
21+
22+
import org.springframework.core.log.LogAccessor;
23+
24+
/**
25+
* A strategy interface for determining the acknowledgment action when a share
26+
* consumer record fails processing. Implementations decide whether to
27+
* {@link AcknowledgeType#ACCEPT ACCEPT}, {@link AcknowledgeType#RELEASE RELEASE},
28+
* or {@link AcknowledgeType#REJECT REJECT} the failed record.
29+
*
30+
* <p>Built-in implementations: {@link #REJECTING} (log and REJECT, default),
31+
* {@link #RELEASING} (log and RELEASE for redelivery). Users can provide custom
32+
* implementations or use a lambda.
33+
*
34+
* @author Soby Chacko
35+
*
36+
* @since 4.1
37+
*
38+
* @see ShareKafkaMessageListenerContainer
39+
*/
40+
@FunctionalInterface
41+
public interface ShareConsumerRecordRecoverer {
42+
43+
/**
44+
* Logger used by built-in implementations.
45+
*/
46+
LogAccessor LOGGER = new LogAccessor(ShareConsumerRecordRecoverer.class);
47+
48+
/**
49+
* Recoverer that logs the failure and REJECTs the record (archived, no redelivery).
50+
* This is the default when none is configured.
51+
*/
52+
ShareConsumerRecordRecoverer REJECTING = (record, ex) -> {
53+
LOGGER.error(ex, () -> "Share consumer record processing failed; rejecting record from "
54+
+ record.topic() + "-" + record.partition() + "@" + record.offset());
55+
return AcknowledgeType.REJECT;
56+
};
57+
58+
/**
59+
* Recoverer that logs the failure and RELEASEs the record (available for redelivery).
60+
*/
61+
ShareConsumerRecordRecoverer RELEASING = (record, ex) -> {
62+
LOGGER.error(ex, () -> "Share consumer record processing failed; releasing record from "
63+
+ record.topic() + "-" + record.partition() + "@" + record.offset());
64+
return AcknowledgeType.RELEASE;
65+
};
66+
67+
/**
68+
* Determine the acknowledgment action for a failed record.
69+
* @param record the record that failed processing
70+
* @param exception the exception thrown during processing
71+
* @return the {@link AcknowledgeType} to use — typically REJECT or RELEASE
72+
*/
73+
AcknowledgeType recover(ConsumerRecord<?, ?> record, Exception exception);
74+
75+
}

spring-kafka/src/main/java/org/springframework/kafka/listener/ShareKafkaMessageListenerContainer.java

Lines changed: 62 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@
3434
import org.apache.kafka.clients.consumer.ShareConsumer;
3535
import org.apache.kafka.common.Metric;
3636
import org.apache.kafka.common.MetricName;
37+
import org.apache.kafka.common.TopicPartition;
38+
import org.apache.kafka.common.errors.CorruptRecordException;
39+
import org.apache.kafka.common.errors.RecordDeserializationException;
3740
import org.jspecify.annotations.Nullable;
3841

3942
import org.springframework.context.ApplicationEventPublisher;
@@ -352,8 +355,31 @@ public void run() {
352355
try {
353356
records = this.consumer.poll(java.time.Duration.ofMillis(POLL_TIMEOUT));
354357
}
358+
catch (RecordDeserializationException e) {
359+
// poll() throws when a record can't be deserialized. Override client's auto-release
360+
// with REJECT so the record is archived and the consumer thread continues.
361+
TopicPartition tp = e.topicPartition();
362+
long offset = e.offset();
363+
this.logger.warn(e, () -> "RecordDeserializationException at "
364+
+ tp + " offset " + offset + "; rejecting record and continuing");
365+
try {
366+
this.consumer.acknowledge(tp.topic(), tp.partition(), offset,
367+
AcknowledgeType.REJECT);
368+
}
369+
catch (Exception ackEx) {
370+
this.logger.error(ackEx, () -> "Failed to reject undeserializable record at "
371+
+ tp + " offset " + offset);
372+
}
373+
continue;
374+
}
375+
catch (CorruptRecordException e) {
376+
// CRC check failure. The client automatically rejects the corrupt batch.
377+
this.logger.error(e, "CorruptRecordException during poll; "
378+
+ "Kafka client has auto-rejected the corrupt batch");
379+
continue;
380+
}
355381
catch (IllegalStateException e) {
356-
// KIP-932: In explicit mode, poll() throws if unacknowledged records exist
382+
// In explicit mode, poll() throws if unacknowledged records exist
357383
if (this.isExplicitMode && !this.pendingAcknowledgments.isEmpty()) {
358384
this.logger.trace(() -> "Poll blocked waiting for " + this.pendingAcknowledgments.size() +
359385
" acknowledgments");
@@ -432,25 +458,53 @@ private void processRecords(ConsumerRecords<K, V> records) {
432458

433459
private void handleProcessingError(ConsumerRecord<K, V> record,
434460
@Nullable ShareConsumerAcknowledgment acknowledgment, Exception e) {
435-
this.logger.error(e, "Error processing record: " + record);
461+
462+
// Delegate to recoverer to decide ACCEPT, RELEASE, or REJECT
463+
AcknowledgeType action;
464+
try {
465+
action = ShareKafkaMessageListenerContainer.this.getShareConsumerRecordRecoverer().recover(record, e);
466+
}
467+
catch (Exception recovererEx) {
468+
// If the recoverer itself throws, fall back to REJECT
469+
this.logger.error(recovererEx, () -> "ShareConsumerRecordRecoverer threw an exception; "
470+
+ "falling back to REJECT for record from "
471+
+ record.topic() + "-" + record.partition() + "@" + record.offset());
472+
action = AcknowledgeType.REJECT;
473+
}
474+
475+
// RENEW is not valid for error recovery (it extends lock during processing, not after failure)
476+
if (action == AcknowledgeType.RENEW) {
477+
this.logger.warn(() -> "ShareConsumerRecordRecoverer returned RENEW for record from "
478+
+ record.topic() + "-" + record.partition() + "@" + record.offset()
479+
+ "; RENEW is not valid for error recovery, using REJECT instead");
480+
action = AcknowledgeType.REJECT;
481+
}
482+
483+
final AcknowledgeType actionToLog = action;
436484

437485
if (this.isExplicitMode && acknowledgment != null) {
438-
// Remove from pending and auto-reject on error
486+
// Remove from pending and from timestamp tracking so timeout checker doesn't hold stale entries
439487
this.pendingAcknowledgments.remove(record);
488+
this.acknowledgmentTimestamps.remove(record);
440489
try {
441-
acknowledgment.reject();
490+
switch (action) {
491+
case ACCEPT -> acknowledgment.acknowledge();
492+
case RELEASE -> acknowledgment.release();
493+
case REJECT -> acknowledgment.reject();
494+
default -> acknowledgment.reject();
495+
}
442496
}
443497
catch (Exception ackEx) {
444-
this.logger.error(ackEx, "Failed to reject record after processing error");
498+
this.logger.error(ackEx, () -> "Failed to " + actionToLog + " record after processing error");
445499
}
446500
}
447501
else {
448-
// In implicit mode, auto-reject on error
502+
// Implicit mode: apply recoverer's decision via consumer.acknowledge
449503
try {
450-
this.consumer.acknowledge(record, AcknowledgeType.REJECT);
504+
this.consumer.acknowledge(record, action);
451505
}
452506
catch (Exception ackEx) {
453-
this.logger.error(ackEx, "Failed to reject record after processing error");
507+
this.logger.error(ackEx, () -> "Failed to " + actionToLog + " record after processing error");
454508
}
455509
}
456510
}

0 commit comments

Comments
 (0)