Hasan Isleem
Hasan Isleem

Reputation: 21

Kafka Listener method could not be invoked with the incoming message Endpoint handler details

I'm sending a json object from producer spring boot but when I'm trying to receive the message in consumer I am getting following error :

Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.***.***.kafka.messages.Message] for GenericMessage [payload={"type":"HHH","actorId":1,"entity":null,"entityType":"ssss"}, headers={kafka_offset=45, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@789f2579, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=isomantopictest, kafka_receivedTimestamp=1671973825381, __TypeId__=[B@500ac626, kafka_groupId=isoman5}]
    at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:144) ~[spring-messaging-6.0.2.jar:6.0.2]
    at org.springframework.kafka.annotation.KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaNullAwarePayloadArgumentResolver.java:46) ~[spring-kafka-3.0.0.jar:3.0.0]
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:118) ~[spring-messaging-6.0.2.jar:6.0.2]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147) ~[spring-messaging-6.0.2.jar:6.0.2]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:115) ~[spring-messaging-6.0.2.jar:6.0.2]
    at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-3.0.0.jar:3.0.0]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:366) ~[spring-kafka-3.0.0.jar:3.0.0]
    ... 15 common frames omitted

KafkaConfig :

 @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(
                ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapAddress);
        configProps.put(
                ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
//        configProps.put(
//                ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
//                StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate kafkaTemplate() {
        KafkaTemplate<String, Message> kafkaTemplate = new KafkaTemplate(producerFactory());
        kafkaTemplate.setConsumerFactory(messageConsumerFactory());
        return kafkaTemplate;
    }

    public ConsumerFactory<String, Message> messageConsumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        return new DefaultKafkaConsumerFactory<>(
                props,
                new StringDeserializer(),
                new JsonDeserializer<>(Message.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Message> messageKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Message> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(messageConsumerFactory());
//        factory.setMessageConverter(new StringJsonMessageConverter());
        return factory;
    }

Receive File :

@KafkaListener(idIsGroup = false, groupId = "${kafka.group-id}",
        topics="#{'${kafka.topics}'.split(',')}")
void hearing(@Payload  Message message )  {
    if (verbose) {
        log.trace("New message received : type = {}, payload = {}", message.getType(), message);
        log.trace("Calling processMessage()");
    }
   
}

Upvotes: 0

Views: 805

Answers (1)

Hasan Isleem
Hasan Isleem

Reputation: 21

I solve it by adding this statement on messageKafkaListenerContainerFactory:

factory.setMessageConverter(new StringJsonMessageConverter());

And change @Payload Message message to @Payload String message.

Upvotes: 1

Related Questions