Reputation: 527
We just updated our spring boot project from 3.2.4
to 3.3.0
and we faced some issues regarding the awesome lib spring-kafka
. Our kafka listeners that are ack-mode=record
and expecting as parameter a kafka json string message converted to a POJO stopped working as soon as we updated the version.
While debugging the issue we noticed that the message comes as string if we use ConsumerRecord<>
but where is expected the serialized POJO is null
.
Download demo project: https://uploadnow.io/f/r4XXkw9
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.example.demo.TestListener.consume(com.example.demo.PojoMessage)]
Bean [com.example.demo.TestListener@2d270181]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2869) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2814) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2778) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$53(KafkaMessageListenerContainer.java:2701) ~[spring-kafka-3.2.0.jar:3.2.0]
at io.micrometer.observation.Observation.observe(Observation.java:565) ~[micrometer-observation-1.13.0.jar:1.13.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2699) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2541) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2430) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2085) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1461) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1426) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1296) ~[spring-kafka-3.2.0.jar:3.2.0]
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:1589) ~[na:na]
Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.checkAckArg(MessagingMessageListenerAdapter.java:446) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:427) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invoke(MessagingMessageListenerAdapter.java:384) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:85) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2800) ~[spring-kafka-3.2.0.jar:3.2.0]
Caused by: org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException: Could not resolve method parameter at index 0 in public void com.example.demo.TestListener.consume(com.example.demo.PojoMessage): 1 error(s): [Error in object 'message': codes []; arguments []; default message [Payload value must not be empty]]
at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:128) ~[spring-messaging-6.1.8.jar:6.1.8]
at org.springframework.kafka.listener.adapter.KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaNullAwarePayloadArgumentResolver.java:48) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:118) ~[spring-messaging-6.1.8.jar:6.1.8]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147) ~[spring-messaging-6.1.8.jar:6.1.8]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:115) ~[spring-messaging-6.1.8.jar:6.1.8]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:70) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:420) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invoke(MessagingMessageListenerAdapter.java:384) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:85) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2800) ~[spring-kafka-3.2.0.jar:3.2.0]
... 12 common frames omitted
My best guess is that we're making some mistake on the deserialization configuration that we miraculously working until now because we use this lib in a non-tipical way. In our infra we have "many kafka brokers" and some applications needs to publish/consume from different brokers, so we have to create configuration for each one.
@Component
class KafkaBrokerPropertiesConfiguration {
@Primary
@Bean("privateKafkaProperties")
@ConfigurationProperties("spring.kafka.private")
fun private() = KafkaProperties()
}
@Configuration
class PrivateKafkaConfiguration(
@Qualifier("privateKafkaProperties")
private val kafkaProperties: KafkaProperties
) {
@Primary
@Bean("privateKafkaTemplate")
fun privateKafkaTemplate(
@Qualifier("privateKafkaAdmin")
kafkaAdmin: KafkaAdmin
): KafkaTemplate<String, String> =
KafkaTemplate(privateProducerFactory()).apply {
setKafkaAdmin(kafkaAdmin)
setObservationEnabled(true)
}
@Primary
@Bean("privateProducerFactory")
fun privateProducerFactory(): ProducerFactory<String, String> = DefaultKafkaProducerFactory(
kafkaProperties.buildProducerProperties(null)
)
@Bean("privateKafkaAdmin")
fun privateKafkaAdmin(): KafkaAdmin = KafkaAdmin(kafkaProperties.buildAdminProperties(null))
.apply { setClusterId("admin-private") }
@Primary
@Bean("privateConsumerFactory")
fun privateConsumerFactory(): ConsumerFactory<String, String> = DefaultKafkaConsumerFactory(
kafkaProperties.buildConsumerProperties(null)
)
@Primary
@Bean("privateListenerContainerFactory")
fun privateListenerContainerFactory(
@Qualifier("privateKafkaTemplate")
kafkaTemplate: KafkaTemplate<String, String>,
objectMapper: ObjectMapper
): KafkaListenerContainerFactory<*> {
val containerFactory = ConcurrentKafkaListenerContainerFactory<String, String>()
val jsonMessageConverter = JsonMessageConverter(objectMapper)
containerFactory.consumerFactory = privateConsumerFactory()
containerFactory.containerProperties.ackMode = kafkaProperties.listener.ackMode
containerFactory.setBatchMessageConverter(BatchMessagingMessageConverter(jsonMessageConverter))
containerFactory.setRecordMessageConverter(jsonMessageConverter)
containerFactory.containerProperties.isObservationEnabled = true
return containerFactory
}
}
@Component
class TestListener {
@KafkaListener(
topics = ["test-topic"],
containerFactory = "privateListenerContainerFactory"
)
fun consume(@Payload message: PojoMessage) {
println(message)
}
}
spring:
application:
name: demo
config:
import:
- classpath:kafka-private-config.yaml
spring:
kafka:
private:
bootstrap-servers: localhost:29091
properties:
security:
protocol: PLAINTEXT
producer:
retries: 3
acks: all
properties:
linger.ms: 50
max.block.ms: 120000
listener:
concurrency: 1
type: single
ack-mode: record
consumer:
group-id: demo-consumer
auto-offset-reset: latest
enable-auto-commit: false
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
max.poll.interval.ms: 3600000
session.timeout.ms: 60000
heartbeat.interval.ms: 3000
allow.auto.create.topics: true
max-poll-records: 5
retry:
topic:
enabled: false
isConversionNeeded()
to be true
but it seems that it tries to convert but also failsorg.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public java.lang.String com.example.demo.TestListener.consume(com.example.demo.PojoMessage)]
Bean [com.example.demo.TestListener@7718ecad]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2869) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2814) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2778) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$53(KafkaMessageListenerContainer.java:2701) ~[spring-kafka-3.2.0.jar:3.2.0]
at io.micrometer.observation.Observation.observe(Observation.java:565) ~[micrometer-observation-1.13.0.jar:1.13.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2699) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2541) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2430) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2085) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1461) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1426) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1296) ~[spring-kafka-3.2.0.jar:3.2.0]
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:1589) ~[na:na]
Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.checkAckArg(MessagingMessageListenerAdapter.java:446) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:424) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invoke(MessagingMessageListenerAdapter.java:384) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:85) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2800) ~[spring-kafka-3.2.0.jar:3.2.0]
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:424) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invoke(MessagingMessageListenerAdapter.java:384) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:85) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2800) ~[spring-kafka-3.2.0.jar:3.2.0]
... 12 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.util.LinkedHashMap] to [com.example.demo.PojoMessage] for GenericMessage [payload={name=sdasd asdasd, age=1}, headers={kafka_offset=3, kafka_consumer=org.springframework.kafka.core.DefaultKafkaConsumerFactory$ExtendedKafkaConsumer@4148e62f, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=asdadas, kafka_receivedTopic=test-topic, kafka_receivedTimestamp=1718203444770, kafka_groupId=demo-consumer}]
at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:151) ~[spring-messaging-6.1.8.jar:6.1.8]
at org.springframework.kafka.listener.adapter.KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaNullAwarePayloadArgumentResolver.java:48) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:118) ~[spring-messaging-6.1.8.jar:6.1.8]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147) ~[spring-messaging-6.1.8.jar:6.1.8]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:115) ~[spring-messaging-6.1.8.jar:6.1.8]
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:70) ~[spring-kafka-3.2.0.jar:3.2.0]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:420) ~[spring-kafka-3.2.0.jar:3.2.0]
... 16 common frames omitted
Upvotes: 2
Views: 879
Reputation: 527
Answered here: https://github.com/spring-projects/spring-kafka/discussions/3296#discussioncomment-9755724
Basically, for kotlin projects, there is a bug on the lib where is checked it the parameter is a kotlin class.
Upvotes: 2