Reputation: 94
I have one question about Deserialization in kafka.
I have this String receiving in my kafka application
Payload: {"post":{"postId":"5e22fac7f7356803e8784172","tags":["a","lovve","asldkjfsbajfdkjlnzx","z"]},"date":"2020-01-18T16:12:50.833423","user":{"userId":"5dfcfd77367c690edd91b2d9"},"reactionType":"unloved"}
and I have this config in my kafka
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>(
kafkaProperties.buildConsumerProperties());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG,
"magpie-trending");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
and I have one class with the same code of the Payload, and how at any moment I can transform this in my object?
So I have this Approach too
Hello, everyone, I have this String received by one microservice and working, but i need to transform this string to one specific object, and when I use ObjectMapper to transform The Application returns this exception:
threw exception; nested exception is com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of
com.avenuecode.magpie.trending.component.kafka.message.PostMessage
(no Creators, like default construct, exist): cannot deserialize from Object value (no delegate- or property-based Creator)
So my Object are:
public class ReactionMessage {
private PostMessage post;
private String date;
private UserMessage user;
private String reactionType;
@JsonCreator
public ReactionMessage(@JsonProperty("post") PostMessage post,
@JsonProperty("user") UserMessage user,
@JsonProperty("date") String date,
@JsonProperty("reactionType") String reactionType) {
this.post = post;
this.date = date;
this.user = user;
this.reactionType = reactionType;
}
@JsonCreator
public ReactionMessage() {
}
}
When I call the mapper are in this method:
@KafkaListener(topics = "reaction-topic", clientIdPrefix = "string", groupId = "magpie-trending")
public void listenAsObject(ConsumerRecord<String, String> cr,
@Payload String payload) throws IOException {
logger.info("Logger 1 [JSON] received key {}: Type [{}] | Payload: {} | Record: {}", cr.key(),
typeIdHeader(cr.headers()), payload, cr.toString());
ObjectMapper mapper = new ObjectMapper();
ReactionMessage message = mapper.readValue(payload, ReactionMessage.class);
}
private static String typeIdHeader(Headers headers) {
return StreamSupport.stream(headers.spliterator(), false)
.filter(header -> header.key().equals("__TypeId__"))
.findFirst().map(header -> new String(header.value())).orElse("N/A");
}
Upvotes: 0
Views: 2201
Reputation: 191743
You need to make your JsonDeserializer
return the object type you expect
For example, Kafka's built in JSON deserializer only returns JsonNode
.
The Spring JSON deserializer has extra properties to pass in class names
Kafka - Deserializing the object in Consumer
Upvotes: 2