Reputation: 173
I am using a kafka produer and a Spring kafka consumer. I am using a Json serializer and deserializer. Whenever I try to read messages in the consumer from the topic i get the following error:
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition fan_topic-0 at offset 154. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
I have not configured anything about headers neither in the producer nor in the consumer. What am i missing here?
Upvotes: 6
Views: 22043
Reputation: 1058
just adding to above answer,
The below changes solved for me.
config.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
adding
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<>(String.class));
instead of
return new DefaultKafkaConsumerFactory<String, String>(config);
For reference,
the below method in deserialize
expecting the headers and "Assert.state..
" throws IllegalStateException
@Override
public T deserialize(String topic, Headers headers, byte[] data) {
JavaType javaType = this.typeMapper.toJavaType(headers);
if (javaType == null) {
Assert.state(this.targetType != null, "No type information in headers and no default type provided");
return deserialize(topic, data);
}
else {
try {
return this.objectMapper.readerFor(javaType).readValue(data);
}
catch (IOException e) {
throw new SerializationException("Can't deserialize data [" + Arrays.toString(data) +
"] from topic [" + topic + "]", e);
}
}
}
Upvotes: 1
Reputation: 121552
I believe that you are missing the fact that JsonDeserializer
has to be configured on the ConsumerFactory
with an appropriate default type to deserialize, but not in the Kafka properties.
All the info is presented in the Docs: https://docs.spring.io/spring-kafka/docs/2.1.7.RELEASE/reference/html/_reference.html#serdes
Upvotes: 5