Wall-E
Wall-E

Reputation: 527

Spring Kafka message conversion failing on Spring Boot 3.3

We just updated our spring boot project from 3.2.4 to 3.3.0 and we faced some issues regarding the awesome lib spring-kafka. Our kafka listeners that are ack-mode=record and expecting as parameter a kafka json string message converted to a POJO stopped working as soon as we updated the version. While debugging the issue we noticed that the message comes as string if we use ConsumerRecord<> but where is expected the serialized POJO is null.

Download demo project: https://uploadnow.io/f/r4XXkw9

Current stack when a message is published to the topic:

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.example.demo.TestListener.consume(com.example.demo.PojoMessage)]
Bean [com.example.demo.TestListener@2d270181]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2869) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2814) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2778) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$53(KafkaMessageListenerContainer.java:2701) ~[spring-kafka-3.2.0.jar:3.2.0]
    at io.micrometer.observation.Observation.observe(Observation.java:565) ~[micrometer-observation-1.13.0.jar:1.13.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2699) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2541) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2430) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2085) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1461) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1426) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1296) ~[spring-kafka-3.2.0.jar:3.2.0]
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:1589) ~[na:na]
    Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.checkAckArg(MessagingMessageListenerAdapter.java:446) ~[spring-kafka-3.2.0.jar:3.2.0]
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:427) ~[spring-kafka-3.2.0.jar:3.2.0]
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invoke(MessagingMessageListenerAdapter.java:384) ~[spring-kafka-3.2.0.jar:3.2.0]
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:85) ~[spring-kafka-3.2.0.jar:3.2.0]
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50) ~[spring-kafka-3.2.0.jar:3.2.0]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2800) ~[spring-kafka-3.2.0.jar:3.2.0]
Caused by: org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException: Could not resolve method parameter at index 0 in public void com.example.demo.TestListener.consume(com.example.demo.PojoMessage): 1 error(s): [Error in object 'message': codes []; arguments []; default message [Payload value must not be empty]] 
    at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:128) ~[spring-messaging-6.1.8.jar:6.1.8]
    at org.springframework.kafka.listener.adapter.KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaNullAwarePayloadArgumentResolver.java:48) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:118) ~[spring-messaging-6.1.8.jar:6.1.8]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147) ~[spring-messaging-6.1.8.jar:6.1.8]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:115) ~[spring-messaging-6.1.8.jar:6.1.8]
    at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:70) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:420) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invoke(MessagingMessageListenerAdapter.java:384) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:85) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2800) ~[spring-kafka-3.2.0.jar:3.2.0]
    ... 12 common frames omitted

Project configuration

My best guess is that we're making some mistake on the deserialization configuration that we miraculously working until now because we use this lib in a non-tipical way. In our infra we have "many kafka brokers" and some applications needs to publish/consume from different brokers, so we have to create configuration for each one.

CUSTOM KAFKA PROPERTIES FOR ONE OF THE BROKERS

@Component
class KafkaBrokerPropertiesConfiguration {

    @Primary
    @Bean("privateKafkaProperties")
    @ConfigurationProperties("spring.kafka.private")
    fun private() = KafkaProperties()
}

CONFIGURATION FOR ONE OF THE BROKERS

@Configuration
class PrivateKafkaConfiguration(
    @Qualifier("privateKafkaProperties")
    private val kafkaProperties: KafkaProperties
) {

    @Primary
    @Bean("privateKafkaTemplate")
    fun privateKafkaTemplate(
        @Qualifier("privateKafkaAdmin")
        kafkaAdmin: KafkaAdmin
    ): KafkaTemplate<String, String> =
        KafkaTemplate(privateProducerFactory()).apply {
            setKafkaAdmin(kafkaAdmin)
            setObservationEnabled(true)
        }

    @Primary
    @Bean("privateProducerFactory")
    fun privateProducerFactory(): ProducerFactory<String, String> = DefaultKafkaProducerFactory(
        kafkaProperties.buildProducerProperties(null)
    )

    @Bean("privateKafkaAdmin")
    fun privateKafkaAdmin(): KafkaAdmin = KafkaAdmin(kafkaProperties.buildAdminProperties(null))
        .apply { setClusterId("admin-private") }

    @Primary
    @Bean("privateConsumerFactory")
    fun privateConsumerFactory(): ConsumerFactory<String, String> = DefaultKafkaConsumerFactory(
        kafkaProperties.buildConsumerProperties(null)
    )

    @Primary
    @Bean("privateListenerContainerFactory")
    fun privateListenerContainerFactory(
        @Qualifier("privateKafkaTemplate")
        kafkaTemplate: KafkaTemplate<String, String>,
        objectMapper: ObjectMapper
    ): KafkaListenerContainerFactory<*> {
        val containerFactory = ConcurrentKafkaListenerContainerFactory<String, String>()
        val jsonMessageConverter = JsonMessageConverter(objectMapper)
        containerFactory.consumerFactory = privateConsumerFactory()
        containerFactory.containerProperties.ackMode = kafkaProperties.listener.ackMode
        containerFactory.setBatchMessageConverter(BatchMessagingMessageConverter(jsonMessageConverter))
        containerFactory.setRecordMessageConverter(jsonMessageConverter)
        containerFactory.containerProperties.isObservationEnabled = true
        return containerFactory
    }
}

TOPIC LISTENER USING A CONTAINERFACTORY FROM ONE OF THE BROKERS

@Component
class TestListener {

    @KafkaListener(
        topics = ["test-topic"],
        containerFactory = "privateListenerContainerFactory"
    )
    fun consume(@Payload message: PojoMessage) {
        println(message)
    }
}

application.yaml

spring:
  application:
    name: demo

  config:
    import:
      - classpath:kafka-private-config.yaml

kafka-private-config.yaml

spring:
  kafka:
    private:
      bootstrap-servers: localhost:29091
      properties:
        security:
          protocol: PLAINTEXT
      producer:
        retries: 3
        acks: all
        properties:
          linger.ms: 50
          max.block.ms: 120000
      listener:
        concurrency: 1
        type: single
        ack-mode: record
      consumer:
        group-id: demo-consumer
        auto-offset-reset: latest
        enable-auto-commit: false
        key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        properties:
          max.poll.interval.ms: 3600000
          session.timeout.ms: 60000
          heartbeat.interval.ms: 3000
          allow.auto.create.topics: true
        max-poll-records: 5
      retry:
        topic:
          enabled: false

What I tried?

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public java.lang.String com.example.demo.TestListener.consume(com.example.demo.PojoMessage)]
Bean [com.example.demo.TestListener@7718ecad]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2869) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2814) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2778) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$53(KafkaMessageListenerContainer.java:2701) ~[spring-kafka-3.2.0.jar:3.2.0]
    at io.micrometer.observation.Observation.observe(Observation.java:565) ~[micrometer-observation-1.13.0.jar:1.13.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2699) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2541) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2430) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2085) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1461) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1426) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1296) ~[spring-kafka-3.2.0.jar:3.2.0]
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:1589) ~[na:na]
    Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.checkAckArg(MessagingMessageListenerAdapter.java:446) ~[spring-kafka-3.2.0.jar:3.2.0]
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:424) ~[spring-kafka-3.2.0.jar:3.2.0]
        at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invoke(MessagingMessageListenerAdapter.java:384) ~[spring-kafka-3.2.0.jar:3.2.0]
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:85) ~[spring-kafka-3.2.0.jar:3.2.0]
        at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50) ~[spring-kafka-3.2.0.jar:3.2.0]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2800) ~[spring-kafka-3.2.0.jar:3.2.0]
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:424) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invoke(MessagingMessageListenerAdapter.java:384) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:85) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2800) ~[spring-kafka-3.2.0.jar:3.2.0]
    ... 12 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.util.LinkedHashMap] to [com.example.demo.PojoMessage] for GenericMessage [payload={name=sdasd asdasd, age=1}, headers={kafka_offset=3, kafka_consumer=org.springframework.kafka.core.DefaultKafkaConsumerFactory$ExtendedKafkaConsumer@4148e62f, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=asdadas, kafka_receivedTopic=test-topic, kafka_receivedTimestamp=1718203444770, kafka_groupId=demo-consumer}]
    at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:151) ~[spring-messaging-6.1.8.jar:6.1.8]
    at org.springframework.kafka.listener.adapter.KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaNullAwarePayloadArgumentResolver.java:48) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:118) ~[spring-messaging-6.1.8.jar:6.1.8]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147) ~[spring-messaging-6.1.8.jar:6.1.8]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:115) ~[spring-messaging-6.1.8.jar:6.1.8]
    at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:70) ~[spring-kafka-3.2.0.jar:3.2.0]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:420) ~[spring-kafka-3.2.0.jar:3.2.0]
    ... 16 common frames omitted

Upvotes: 2

Views: 879

Answers (1)

Wall-E
Wall-E

Reputation: 527

Answered here: https://github.com/spring-projects/spring-kafka/discussions/3296#discussioncomment-9755724

Basically, for kotlin projects, there is a bug on the lib where is checked it the parameter is a kotlin class.

Upvotes: 2

Related Questions