Reputation: 21
I'm sending a json object from producer spring boot but when I'm trying to receive the message in consumer I am getting following error :
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.***.***.kafka.messages.Message] for GenericMessage [payload={"type":"HHH","actorId":1,"entity":null,"entityType":"ssss"}, headers={kafka_offset=45, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@789f2579, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=isomantopictest, kafka_receivedTimestamp=1671973825381, __TypeId__=[B@500ac626, kafka_groupId=isoman5}]
at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:144) ~[spring-messaging-6.0.2.jar:6.0.2]
at org.springframework.kafka.annotation.KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaNullAwarePayloadArgumentResolver.java:46) ~[spring-kafka-3.0.0.jar:3.0.0]
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:118) ~[spring-messaging-6.0.2.jar:6.0.2]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147) ~[spring-messaging-6.0.2.jar:6.0.2]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:115) ~[spring-messaging-6.0.2.jar:6.0.2]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-3.0.0.jar:3.0.0]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:366) ~[spring-kafka-3.0.0.jar:3.0.0]
... 15 common frames omitted
KafkaConfig :
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
// configProps.put(
// ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
// StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate kafkaTemplate() {
KafkaTemplate<String, Message> kafkaTemplate = new KafkaTemplate(producerFactory());
kafkaTemplate.setConsumerFactory(messageConsumerFactory());
return kafkaTemplate;
}
public ConsumerFactory<String, Message> messageConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
return new DefaultKafkaConsumerFactory<>(
props,
new StringDeserializer(),
new JsonDeserializer<>(Message.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Message> messageKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Message> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(messageConsumerFactory());
// factory.setMessageConverter(new StringJsonMessageConverter());
return factory;
}
Receive File :
@KafkaListener(idIsGroup = false, groupId = "${kafka.group-id}",
topics="#{'${kafka.topics}'.split(',')}")
void hearing(@Payload Message message ) {
if (verbose) {
log.trace("New message received : type = {}, payload = {}", message.getType(), message);
log.trace("Calling processMessage()");
}
}
Upvotes: 0
Views: 805
Reputation: 21
I solve it by adding this statement on messageKafkaListenerContainerFactory
:
factory.setMessageConverter(new StringJsonMessageConverter());
And change @Payload Message message
to @Payload String message
.
Upvotes: 1