Reputation: 2581
Currently working with spring-boot 2.0.4 with spring-kafka 2.1.8.RELEASE. I've wanted to simplify the interchange a bit sending objects to kafka template and used json as format. Some of the messages that needs to be deserialized however contains java.time.LocalDateTime. So my setup is
Config (application.yml):
write_dates_as_timestamps: false
group-id: foo
enable-auto-commit: true
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.json.trusted.packages: my.package
key-serializer: org.apache.kafka.common.serialization.StringSerializer
spring.json.trusted.packages: my.package
retries: 3
acks: all
as for the jackson dependencies which is supposed to be needed for it to work, my dependency tree is:
[INFO] | | +- com.fasterxml.jackson.core:jackson-databind:jar:2.9.6:compile
[INFO] | | | +- com.fasterxml.jackson.core:jackson-annotations:jar:2.9.0:compile
[INFO] | | | \- com.fasterxml.jackson.core:jackson-core:jar:2.9.6:compile
[INFO] | | \- com.fasterxml.jackson.datatype:jackson-datatype-jsr310:jar:2.9.6:compile
[INFO] | | +- com.fasterxml.jackson.datatype:jackson-datatype-jdk8:jar:2.9.6:compile
[INFO] | | \- com.fasterxml.jackson.module:jackson-module-parameter-names:jar:2.9.6:compile
This however produces the following error:
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition Foo-0 at offset 4. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[123, 34, 105, 100, 34, 58, 34, 97, 50, 99, 50, 56, 99, 99, 101, 97, 49, 98, 98, 52, 51, 97, 97, 56, 53, 50, 49, 53, 99, 101, 49, 54, 57, 48, 52, 51, 51, 98, 51, 45, 50, 34, 44, 34, 97, 117, 116, 104, 111, 114, 34, 58, 34, 97, 110, 116, 111, 110, 105, 111, 34, 44, 34, 99, 114, 101, 97, 116, 101, 100, 34, 58, 123, 34, 104, 111, 117, 114, 34, 58, 49, 56, 44, 34, 109, 105, 110, 117, 116, 101, 34, 58, 52, 48, 44, 34, 115, 101, 99, 111, 110, 100, 34, 58, 53, 49, 44, 34, 110, 97, 110, 111, 34, 58, 51, 50, 53, 48, 48, 48, 48, 48, 48, 44, 34, 100, 97, 121, 79, 102, 89, 101, 97, 114, 34, 58, 50, 52, 48, 44, 34, 100, 97, 121, 79, 102, 87, 101, 101, 107, 34, 58, 34, 84, 85, 69, 83, 68, 65, 89, 34, 44, 34, 109, 111, 110, 116, 104, 34, 58, 34, 65, 85, 71, 85, 83, 84, 34, 44, 34, 100, 97, 121, 79, 102, 77, 111, 110, 116, 104, 34, 58, 50, 56, 44, 34, 121, 101, 97, 114, 34, 58, 50, 48, 49, 56, 44, 34, 109, 111, 110, 116, 104, 86, 97, 108, 117, 101, 34, 58, 56, 44, 34, 99, 104, 114, 111, 110, 111, 108, 111, 103, 121, 34, 58, 123, 34, 99, 97, 108, 101, 110, 100, 97, 114, 84, 121, 112, 101, 34, 58, 34, 105, 115, 111, 56, 54, 48, 49, 34, 44, 34, 105, 100, 34, 58, 34, 73, 83, 79, 34, 125, 125, 44, 34, 97, 103, 103, 114, 101, 103, 97, 116, 101, 73, 100, 34, 58, 34, 97, 50, 99, 50, 56, 99, 99, 101, 97, 49, 98, 98, 52, 51, 97, 97, 56, 53, 50, 49, 53, 99, 101, 49, 54, 57, 48, 52, 51, 51, 98, 51, 34, 44, 34, 118, 101, 114, 115, 105, 111, 110, 34, 58, 48, 44, 34, 112, 114, 105, 122, 101, 73, 110, 102, 111, 34, 58, 123, 34, 110, 117, 109, 98, 101, 114, 79, 102, 87, 105, 110, 110, 101, 114, 115, 34, 58, 49, 44, 34, 112, 114, 105, 122, 101, 80, 111, 111, 108, 34, 58, 49, 48, 44, 34, 112, 114, 105, 122, 101, 84, 97, 98, 108, 101, 34, 58, 91, 49, 48, 93, 125, 125]] from topic [Foo]
Caused by: com.fasterxml.jackson.databind.exc.MismatchedInputException: Expected array or string.
at [Source: (byte[])"{"id":"a2c28ccea1bb43aa85215ce1690433b3-2","author":"foo","created":{"hour":18,"minute":40,"second":51,"nano":325000000,"dayOfYear":240,"dayOfWeek":"TUESDAY","month":"AUGUST","dayOfMonth":28,"year":2018,"monthValue":8,"chronology":{"calendarType":"iso8601","id":"ISO"}},"aggregateId":"a2c28ccea1bb43aa85215ce1690433b3","version":0,"prizeInfo":{"numberOfWinners":1,"prizePool":10,"prizeTable":[10]}}"; line: 1, column: 73] (through reference chain: my.package.Foo["created"])
at com.fasterxml.jackson.databind.exc.MismatchedInputException.from( ~[jackson-databind-2.9.6.jar:2.9.6]
at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch( ~[jackson-databind-2.9.6.jar:2.9.6]
at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken( ~[jackson-databind-2.9.6.jar:2.9.6]
at com.fasterxml.jackson.datatype.jsr310.deser.JSR310DeserializerBase._handleUnexpectedToken( ~[jackson-datatype-jsr310-2.9.6.jar:2.9.6]
at com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer.deserialize( ~[jackson-datatype-jsr310-2.9.6.jar:2.9.6]
at com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer.deserialize( ~[jackson-datatype-jsr310-2.9.6.jar:2.9.6]
at com.fasterxml.jackson.databind.deser.impl.FieldProperty.deserializeAndSet( ~[jackson-databind-2.9.6.jar:2.9.6]
at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject( ~[jackson-databind-2.9.6.jar:2.9.6]
at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize( ~[jackson-databind-2.9.6.jar:2.9.6]
at com.fasterxml.jackson.databind.ObjectReader._bindAndClose( ~[jackson-databind-2.9.6.jar:2.9.6]
at com.fasterxml.jackson.databind.ObjectReader.readValue( ~[jackson-databind-2.9.6.jar:2.9.6]
at ~[spring-kafka-2.1.8.RELEASE.jar:2.1.8.RELEASE]
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord( ~[kafka-clients-1.0.2.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600( ~[kafka-clients-1.0.2.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords( ~[kafka-clients-1.0.2.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200( ~[kafka-clients-1.0.2.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords( ~[kafka-clients-1.0.2.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords( ~[kafka-clients-1.0.2.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce( ~[kafka-clients-1.0.2.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll( ~[kafka-clients-1.0.2.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ ~[spring-kafka-2.1.8.RELEASE.jar:2.1.8.RELEASE]
at java.util.concurrent.Executors$ [na:1.8.0_131]
at [na:1.8.0_131]
at [na:1.8.0_131]
Due to this i have tried the following but non had worked so far: 1.Custom ObjectMapper declared as bean
public ObjectMapper objectMapper() {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.registerModule(new JavaTimeModule());
return objectMapper;
2.Serializer annotation on LocalDateTime fields
To be sure that i have the correct object mapper settings and the necessary dependencies, i've created a rest controller to simulate the response as json as a rest endpoint returning an object with date time fields, this returns correctly; sample:
"playerId": "foo",
"points": 10,
"entryDateTime": "2018-08-19T09:30:20.051"
"playerId": "bar",
"points": 3,
"entryDateTime": "2018-08-27T09:30:20.051"
Upvotes: 12
Views: 12169
Reputation: 486
Using the Json(De)Serializer constructor with the object mapper param worked for me. I was having trouble (de)serializing a pojo that had an java.time.Instant field, so after hours of troubleshooting this same org.apache.kafka.common.errors.SerializationException
***, I finally realized (with the help of answers such as those on here) that the issue is not spring, but kafka's own serialization. Given the objectmapper bean I had, I resolved by autowiring this into the JsonSerializer
and JsonDeserializer
parameters of my kafka producer and consumer set-ups.
public class JacksonConfig {
public ObjectMapper objectMapper(Jackson2ObjectMapperBuilder builder) {
ObjectMapper objectMapper =;
objectMapper.registerModule(new JavaTimeModule());
return objectMapper;
public class KafkaProducerConfig {
private String bootstrapAddress;
private ObjectMapper objectMapper;
public KafkaTemplate<String, Order> orderKafkaTemplate(){
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
ProducerFactory<String, Order> producerFactory = new DefaultKafkaProducerFactory<>(props, new StringSerializer(), new JsonSerializer<Order>(objectMapper));
return new KafkaTemplate<>(producerFactory);
public class KafkaConsumerConfig {
private String bootstrapAddress;
private String groupId;
private ObjectMapper objectMapper;
public ConcurrentKafkaListenerContainerFactory<String, Order> orderKafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String, Order> factory = new ConcurrentKafkaListenerContainerFactory<>();
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
ConsumerFactory<String, Order> consumerFactory = new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(Order.class, objectMapper));
return factory;
(Pojo shown for further clarity)
public class Order {
private long accountId;
private long assetId;
private long quantity;
private long price;
private Instant createdOn =;
// no args constructor, constructor with params for all fields except createdOn, and getters/setters for all fields omitted
***often the cause was: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of 'java.time.Instant' (no Creators, like default construct, exist): cannot deserialize from object value (no delegate- or property-based Creator) at [Source: (byte[])"{"accountId":1,"assetId":2,"quantity":100,"price":1000,"createdOn":{"epochSecond":1558570217,"nano":728000000}}"...
Upvotes: 13
Reputation: 103
You can extend Spring Kafka's JsonSerializer
public class JsonSerializerWithJTM<T> extends JsonSerializer<T> {
public JsonSerializerWithJTM() {
objectMapper.registerModule(new JavaTimeModule());
//whatever you want to configure here
Use this class in Kafka's configuration instead of the original one:
Upvotes: 3
Reputation: 174689
When you set the serializers/deserializers using properties, Kafka instantiates them, not Spring. Kafka knows nothing about Spring or the customized ObjectMapper
You need to override Boot's default producer/consumer factories and use the alternate constructors (or setters) to add the serializers/deserializers.
See the documentation.
Only simple configuration can be performed with properties; for more advanced configuration (such as using a custom ObjectMapper in the serializer/deserializer), you should use the producer/consumer factory constructors that accept a pre-built serializer and deserializer. For example, with Spring Boot, to override the default factories:
public ConsumerFactory<Foo, Bar> kafkaConsumerFactory(KafkaProperties properties,
JsonDeserializer customDeserializer) {
return new DefaultKafkaConsumerFactory<>(properties.buildConsumerProperties(),
customDeserializer, customDeserializer);
public ProducererFactory<Foo, Bar> kafkaProducerFactory(KafkaProperties properties,
JsonSserializer customSerializer) {
return new DefaultKafkaConsumerFactory<>(properties.buildProducerProperties(),
customSerializer, customSerializer);
Setters are also provided, as an alternative to using these constructors.
Upvotes: 7