Reputation: 696
Below is My method defination for kafka listener and if receive null or empty string for payload i guess I'm getting below error... Can you please help.
@KafkaListener(topics = "${kafka.consumer-topic-name.reservation}", groupId = "${kafka.consumer-group-id.test}",
containerFactory = "kafkaListenerContainerFactory",autoStartup = "${kafka.auto-start.consumer.tets}")
public void consumeReservation(String payload, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String kafkaKey) {}
[org.springframework.kafka.KafkaListenerEndpointContainer #0-0-C-1] ERROR o.s.k.l.SeekToCurrentErrorHandler - Backoff none exhausted for ConsumerRecord(topic = test_topic, partition = 0, leaderEpoch = 2, offset = 453473, CreateTime = 1601962346576, serialized key size = 41, serialized value size = -1, headers = RecordHeaders(headers = [RecordHeader(key = OPERATION, value = [68, 69, 76, 69, 84, 69]), RecordHeader(key = __Key_TypeId__, value = [99, 108, 75, 101, 121])], isReadOnly = false), key = {
"orgId": "1",
"orderId": "U4000024004"}, value = null)
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message Endpoint handler details: Method[public void com.demo.test.analytics.testanalytics.consumer.FLReservationKafkaConsumer.consumeReservation(java.lang.String, java.lang.String, java.lang.String)] Bean[com.demo.test.analytics.testanalytics.consumer.FLReservationKafkaConsumer @731702d1]; nested exception is org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException: Could not resolve method parameter at index 0 in public void com.demo.test.analytics.testanalytics.consumer.FLReservationKafkaConsumer.consumeReservation(java.lang.String, java.lang.String, java.lang.String): 1 error(s): [Error in object 'payload': codes[];arguments[]; default message[Payload value must not be empty] ], failedMessage = GenericMessage[payload = org.springframework.kafka.support.KafkaNull @2d99561c, headers = { Key_TypeId = [B @19a2dc5f, kafka_offset = 453473, OPERATION = [B @7d75c01a, kafka_consumer = org.apache.kafka.clients.consumer.KafkaConsumer @363f44ef, kafka_timestampType = CREATE_TIME, kafka_receivedPartitionId = 0, kafka_receivedMessageKey = { "orgId": "1", "orderId": "U4000024004" }, kafka_receivedTopic = test_1order, kafka_receivedTimestamp = 1601962346576, kafka_groupId = reservation_group_id }];nested exception is org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException: Could not resolve method parameter at index 0 in public void com.demo.test.analytics.testanalytics.consumer.FLReservationKafkaConsumer.consumeReservation(java.lang.String, java.lang.String, java.lang.String): 1 error(s): [Error in object 'payload': codes[];arguments[]; default message[Payload value must not be empty] ], failedMessage = GenericMessage[payload = org.springframework.kafka.support.KafkaNull @2d99561c, headers = { Key_TypeId = [B @19a2dc5f, kafka_offset = 453473, OPERATION = [B @7d75c01a, kafka_consumer = org.apache.kafka.clients.consumer.KafkaConsumer @363f44ef, kafka_timestampType = CREATE_TIME, kafka_receivedPartitionId = 0, kafka_receivedMessageKey = { "orgId": "1", "orderId": "U4000024004" }, kafka_receivedTopic = test_1order, kafka_receivedTimestamp = 1601962346576, kafka_groupId = reservation_group_id }] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java: 1925) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java: 1913) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java: 1812) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java: 1739) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java: 1636) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java: 1366) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java: 1082) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java: 990) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java: 511) at java.util.concurrent.FutureTask.run(FutureTask.java: 266) at java.lang.Thread.run(Thread.java: 748) Caused by: org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException: Could not resolve method parameter at index 0 in public void com.demo.test.analytics.testanalytics.consumer.FLReservationKafkaConsumer.consumeReservation(java.lang.String, java.lang.String, java.lang.String): 1 error(s): [Error in object 'payload': codes[];arguments[]; default message[Payload value must not be empty] ] at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java: 122) at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor$KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaListenerAnnotationBeanPostProcessor.java: 901) at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java: 117) at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java: 148) at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java: 116) at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java: 48) at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java: 329) at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java: 86) at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java: 51) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java: 1880) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java: 1862) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java: 1799) ...8 common frames omitt
Upvotes: 2
Views: 9461
Reputation: 1
I met the same issue here are some solutions I searched.
Set the spring.listener.ack-mode as manual/manua-immediate in yaml
If the solution above is invalid,try to add containerFactory in annotation @KafkaListener like "@KafkaListener(topics = {"xxxx"},groupId = "xxxx",containerFactory = "batchFactory")" and create a same name bean.
public KafkaListenerContainerFactory<?> batchFactory(){
ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(batchConsumerConfigs()));
factory.setBatchListener(true);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
}
Upvotes: 0
Reputation: 174729
You need to specify that the payload is not required.
@Payload(required = false) String payload, ...
Upvotes: 4