Transform Kafka Payload to Object

I have one question about Deserialization in kafka.

I have this String receiving in my kafka application

Payload: {"post":{"postId":"5e22fac7f7356803e8784172","tags":["a","lovve","asldkjfsbajfdkjlnzx","z"]},"date":"2020-01-18T16:12:50.833423","user":{"userId":"5dfcfd77367c690edd91b2d9"},"reactionType":"unloved"}

and I have this config in my kafka

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>(
            kafkaProperties.buildConsumerProperties());

    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            JsonDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG,
            "magpie-trending");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    return props;
}

and I have one class with the same code of the Payload, and how at any moment I can transform this in my object?

So I have this Approach too

Hello, everyone, I have this String received by one microservice and working, but i need to transform this string to one specific object, and when I use ObjectMapper to transform The Application returns this exception:

threw exception; nested exception is com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of com.avenuecode.magpie.trending.component.kafka.message.PostMessage (no Creators, like default construct, exist): cannot deserialize from Object value (no delegate- or property-based Creator)

So my Object are:

public class ReactionMessage {
 private PostMessage post;
 private String date;
 private UserMessage user;
 private String reactionType;

 @JsonCreator
 public ReactionMessage(@JsonProperty("post") PostMessage post,
                       @JsonProperty("user") UserMessage user,
                       @JsonProperty("date") String date,
                       @JsonProperty("reactionType") String reactionType) {
    this.post = post;
    this.date = date;
    this.user = user;
    this.reactionType = reactionType;
 }

 @JsonCreator
 public ReactionMessage() {
 }
}

When I call the mapper are in this method:

  @KafkaListener(topics = "reaction-topic", clientIdPrefix = "string", groupId = "magpie-trending")
  public void listenAsObject(ConsumerRecord<String, String> cr,
                           @Payload String payload) throws IOException {
    logger.info("Logger 1 [JSON] received key {}: Type [{}] | Payload: {} | Record: {}", cr.key(),
            typeIdHeader(cr.headers()), payload, cr.toString());

    ObjectMapper mapper = new ObjectMapper();
    ReactionMessage message = mapper.readValue(payload, ReactionMessage.class);

}

   private static String typeIdHeader(Headers headers) {
    return StreamSupport.stream(headers.spliterator(), false)
            .filter(header -> header.key().equals("__TypeId__"))
            .findFirst().map(header -> new String(header.value())).orElse("N/A");
}

Upvotes: 0

Views: 2201

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 191743

You need to make your JsonDeserializer return the object type you expect

For example, Kafka's built in JSON deserializer only returns JsonNode.

The Spring JSON deserializer has extra properties to pass in class names

Kafka - Deserializing the object in Consumer

Upvotes: 2

Related Questions