Skip to content

Commit 1946030

Browse files
garyrussellartembilan
authored andcommitted
Add ErrorHandlingDeserializer
A delegating deserializer that populates the key or value with a `DeserializationException` containing the raw data when deserialization fails. * Polishing - PR Comments
1 parent 82322d4 commit 1946030

File tree

5 files changed

+365
-0
lines changed

5 files changed

+365
-0
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import org.springframework.kafka.support.LogIfLevelEnabled;
6666
import org.springframework.kafka.support.TopicPartitionInitialOffset;
6767
import org.springframework.kafka.support.TopicPartitionInitialOffset.SeekPosition;
68+
import org.springframework.kafka.support.serializer.DeserializationException;
6869
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
6970
import org.springframework.scheduling.SchedulingAwareRunnable;
7071
import org.springframework.scheduling.TaskScheduler;
@@ -1095,6 +1096,12 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> recor
10951096
@SuppressWarnings("rawtypes") Producer producer,
10961097
Iterator<ConsumerRecord<K, V>> iterator) throws Error {
10971098
try {
1099+
if (record.value() instanceof DeserializationException) {
1100+
throw (DeserializationException) record.value();
1101+
}
1102+
if (record.key() instanceof DeserializationException) {
1103+
throw (DeserializationException) record.key();
1104+
}
10981105
switch (this.listenerType) {
10991106
case ACKNOWLEDGING_CONSUMER_AWARE:
11001107
this.listener.onMessage(record,
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.support.serializer;
18+
19+
import org.springframework.kafka.KafkaException;
20+
21+
/**
22+
* Exception returned in the consumer record value or key when a deserialization failure
23+
* occurs.
24+
*
25+
* @author Gary Russell
26+
* @since 2.2
27+
*
28+
*/
29+
@SuppressWarnings("serial")
30+
public class DeserializationException extends KafkaException {
31+
32+
private final byte[] data;
33+
34+
private final boolean isKey;
35+
36+
public DeserializationException(String message, byte[] data, boolean isKey, Throwable cause) {
37+
super(message, cause);
38+
this.data = data;
39+
this.isKey = isKey;
40+
}
41+
42+
public byte[] getData() {
43+
return this.data;
44+
}
45+
46+
public boolean isKey() {
47+
return this.isKey;
48+
}
49+
50+
}
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.support.serializer;
18+
19+
import java.util.Map;
20+
21+
import org.apache.kafka.common.header.Headers;
22+
import org.apache.kafka.common.serialization.ExtendedDeserializer;
23+
24+
import org.springframework.util.Assert;
25+
import org.springframework.util.ClassUtils;
26+
27+
/**
28+
* Delegating key/value deserializer that catches exceptions, returning them
29+
* in the consumer record.
30+
*
31+
* @author Gary Russell
32+
* @since 2.2
33+
*
34+
*/
35+
public class ErrorHandlingDeserializer implements ExtendedDeserializer<Object> {
36+
37+
/**
38+
* Property name for the delegate key deserializer.
39+
*/
40+
public static final String KEY_DESERIALIZER_CLASS = "spring.deserializer.key.delegate.class";
41+
42+
/**
43+
* Property name for the delegate value deserializer.
44+
*/
45+
public static final String VALUE_DESERIALIZER_CLASS = "spring.deserializer.value.delegate.class";
46+
47+
private ExtendedDeserializer<Object> delegate;
48+
49+
private boolean isKey;
50+
51+
public ErrorHandlingDeserializer() {
52+
super();
53+
}
54+
55+
public ErrorHandlingDeserializer(ExtendedDeserializer<Object> delegate) {
56+
this.delegate = delegate;
57+
}
58+
59+
@SuppressWarnings("unchecked")
60+
@Override
61+
public void configure(Map<String, ?> configs, boolean isKey) {
62+
if (isKey && configs.containsKey(KEY_DESERIALIZER_CLASS)) {
63+
try {
64+
Object value = configs.get(KEY_DESERIALIZER_CLASS);
65+
Class<?> clazz = value instanceof Class ? (Class<?>) value : ClassUtils.forName((String) value, null);
66+
this.delegate = (ExtendedDeserializer<Object>) clazz.newInstance();
67+
}
68+
catch (ClassNotFoundException | LinkageError | InstantiationException | IllegalAccessException e) {
69+
throw new IllegalStateException(e);
70+
}
71+
}
72+
else if (!isKey && configs.containsKey(VALUE_DESERIALIZER_CLASS)) {
73+
try {
74+
Object value = configs.get(VALUE_DESERIALIZER_CLASS);
75+
Class<?> clazz = value instanceof Class ? (Class<?>) value : ClassUtils.forName((String) value, null);
76+
this.delegate = (ExtendedDeserializer<Object>) clazz.newInstance();
77+
}
78+
catch (ClassNotFoundException | LinkageError | InstantiationException | IllegalAccessException e) {
79+
throw new IllegalStateException(e);
80+
}
81+
}
82+
Assert.state(this.delegate != null, "No delegate deserializer configured");
83+
this.delegate.configure(configs, isKey);
84+
this.isKey = isKey;
85+
}
86+
87+
@Override
88+
public Object deserialize(String topic, byte[] data) {
89+
try {
90+
return this.delegate.deserialize(topic, data);
91+
}
92+
catch (Exception e) {
93+
return new DeserializationException("Failed to deserialize", data, this.isKey, e);
94+
}
95+
}
96+
97+
@Override
98+
public void close() {
99+
this.delegate.close();
100+
}
101+
102+
@Override
103+
public Object deserialize(String topic, Headers headers, byte[] data) {
104+
try {
105+
return this.delegate.deserialize(topic, headers, data);
106+
}
107+
catch (Exception e) {
108+
return new DeserializationException("Failed to deserialize", data, this.isKey, e);
109+
}
110+
}
111+
112+
}
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import java.util.Map;
22+
import java.util.concurrent.CountDownLatch;
23+
import java.util.concurrent.TimeUnit;
24+
25+
import org.apache.kafka.clients.consumer.ConsumerConfig;
26+
import org.apache.kafka.clients.consumer.ConsumerRecord;
27+
import org.apache.kafka.clients.producer.ProducerConfig;
28+
import org.apache.kafka.common.header.Headers;
29+
import org.apache.kafka.common.serialization.ExtendedDeserializer;
30+
import org.apache.kafka.common.serialization.StringSerializer;
31+
import org.junit.jupiter.api.Test;
32+
33+
import org.springframework.beans.factory.annotation.Autowired;
34+
import org.springframework.context.annotation.Bean;
35+
import org.springframework.context.annotation.Configuration;
36+
import org.springframework.kafka.annotation.EnableKafka;
37+
import org.springframework.kafka.annotation.KafkaListener;
38+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
39+
import org.springframework.kafka.core.ConsumerFactory;
40+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
41+
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
42+
import org.springframework.kafka.core.KafkaTemplate;
43+
import org.springframework.kafka.core.ProducerFactory;
44+
import org.springframework.kafka.support.serializer.DeserializationException;
45+
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
46+
import org.springframework.kafka.test.rule.KafkaEmbedded;
47+
import org.springframework.kafka.test.utils.KafkaTestUtils;
48+
import org.springframework.test.annotation.DirtiesContext;
49+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
50+
51+
/**
52+
* @author Gary Russell
53+
* @since 2.2
54+
*
55+
*/
56+
@SpringJUnitConfig
57+
@DirtiesContext
58+
public class ErrorHandlingDeserializerTests {
59+
60+
private static final String TOPIC = "ehdt";
61+
62+
@Autowired
63+
public Config config;
64+
65+
@Test
66+
public void testBadDeser() throws Exception {
67+
this.config.template().send(TOPIC, "foo", "bar");
68+
this.config.template().send(TOPIC, "fail", "bar");
69+
this.config.template().send(TOPIC, "foo", "fail");
70+
assertThat(this.config.latch.await(10, TimeUnit.SECONDS)).isTrue();
71+
assertThat(this.config.goodCount).isEqualTo(1);
72+
assertThat(this.config.keyErrorCount).isEqualTo(1);
73+
assertThat(this.config.valueErrorCount).isEqualTo(1);
74+
}
75+
76+
@Configuration
77+
@EnableKafka
78+
public static class Config {
79+
80+
private final CountDownLatch latch = new CountDownLatch(3);
81+
82+
private int goodCount;
83+
84+
private int keyErrorCount;
85+
86+
private int valueErrorCount;
87+
88+
@KafkaListener(topics = TOPIC)
89+
public void listen(ConsumerRecord<String, String> record) {
90+
this.goodCount++;
91+
this.latch.countDown();
92+
}
93+
94+
95+
@Bean
96+
public KafkaEmbedded embeddedKafka() {
97+
return new KafkaEmbedded(1, true, 1, TOPIC);
98+
}
99+
100+
@Bean
101+
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
102+
ConcurrentKafkaListenerContainerFactory<String, String> factory =
103+
new ConcurrentKafkaListenerContainerFactory<>();
104+
factory.setConsumerFactory(cf());
105+
factory.setErrorHandler((t, r) -> {
106+
if (r.value() instanceof DeserializationException) {
107+
this.valueErrorCount++;
108+
}
109+
else if (r.key() instanceof DeserializationException) {
110+
this.keyErrorCount++;
111+
}
112+
this.latch.countDown();
113+
});
114+
return factory;
115+
}
116+
117+
@Bean
118+
public ConsumerFactory<String, String> cf() {
119+
Map<String, Object> props = KafkaTestUtils.consumerProps(TOPIC, "false", embeddedKafka());
120+
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
121+
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
122+
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
123+
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, FailSometimesDeserializer.class);
124+
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, FailSometimesDeserializer.class.getName());
125+
return new DefaultKafkaConsumerFactory<>(props);
126+
}
127+
128+
@Bean
129+
public ProducerFactory<String, String> pf() {
130+
Map<String, Object> props = KafkaTestUtils.producerProps(embeddedKafka());
131+
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
132+
return new DefaultKafkaProducerFactory<>(props);
133+
}
134+
135+
@Bean
136+
public KafkaTemplate<String, String> template() {
137+
return new KafkaTemplate<>(pf());
138+
}
139+
140+
}
141+
142+
public static class FailSometimesDeserializer implements ExtendedDeserializer<String> {
143+
144+
@Override
145+
public void configure(Map<String, ?> configs, boolean isKey) {
146+
}
147+
148+
@Override
149+
public String deserialize(String topic, byte[] data) {
150+
return new String(data);
151+
}
152+
153+
@Override
154+
public void close() {
155+
}
156+
157+
@Override
158+
public String deserialize(String topic, Headers headers, byte[] data) {
159+
String string = new String(data);
160+
if ("fail".equals(string)) {
161+
throw new RuntimeException("fail");
162+
}
163+
return string;
164+
}
165+
166+
}
167+
168+
}

src/reference/asciidoc/kafka.adoc

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1561,6 +1561,34 @@ NOTE: When using the `StringJsonMessageConverter`, you should use a `StringDeser
15611561
When using the `BytesJsonMessageConverter`, you should use a `BytesDeserializer` in the kafka consumer configuration and `BytesSerializer` in the kafka producer configuration, when using Spring Integration or the `KafkaTemplate.send(Message<?> message)` method.
15621562
Generally, the `BytesJsonMessageConverter` is more efficient because it avoids a `String` to/from `byte[]` conversion.
15631563

1564+
[[error-handling-deserializer]]
1565+
===== ErrorHandlingDeserializer
1566+
1567+
When a deserializer fails to deserialize a message, Spring has no way to handle the problem because it occurs before the `poll()` returns.
1568+
To solve this problem, version 2.2 introduced the `ErrorHandlingDeserializer`.
1569+
This deserializer delegates to a real deserializer (key or value).
1570+
If the delegate fails to deserialize the record content, the `ErrorHandlingDeserializer` returns a `DeserializationException` instead, containing the cause and raw bytes.
1571+
When using a record-level `MessageListener`, if either the key or value contains a `DeserializationException`, the container's `ErrorHandler` is called with the failed `ConsumerRecord`.
1572+
When using a `BatchMessageListener`, the failed record is passed to the application along with the remaining records in the batch, so it is the responsibility of the application listener to check whether the key or value in a particular record is a `DeserializationException`.
1573+
1574+
You can use the `DefaultKafkaConsumerFactory` constructor that takes key and value `Deserializer` objects and wire in appropriate `ErrorHandlingDeserializer` configured with the proper delegates.
1575+
Alternatively, you can use consumer configuration properties which are used by the `ErrorHandlingDeserializer` to instantiate the delegates.
1576+
The property names are `ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS` and `ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS`; the property value can be a class or class name.
1577+
For example:
1578+
1579+
[source, java]
1580+
----
1581+
... // other props
1582+
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
1583+
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
1584+
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
1585+
props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey")
1586+
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
1587+
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue")
1588+
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example")
1589+
return new DefaultKafkaConsumerFactory<>(props);
1590+
----
1591+
15641592
[[payload-conversion-with-batch]]
15651593
===== Payload Conversion with Batch Listeners
15661594

0 commit comments

Comments
 (0)