nitinsridar
nitinsridar

Reputation: 696

Listener method could not be invoked with the incoming message and Backoff none exhausted for ConsumerRecord

Below is My method defination for kafka listener and if receive null or empty string for payload i guess I'm getting below error... Can you please help.

 @KafkaListener(topics = "${kafka.consumer-topic-name.reservation}", groupId = "${kafka.consumer-group-id.test}",
        containerFactory = "kafkaListenerContainerFactory",autoStartup = "${kafka.auto-start.consumer.tets}")
public void consumeReservation(String payload, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                               @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String kafkaKey) {}

[org.springframework.kafka.KafkaListenerEndpointContainer #0-0-C-1] ERROR o.s.k.l.SeekToCurrentErrorHandler - Backoff none exhausted for ConsumerRecord(topic = test_topic, partition = 0, leaderEpoch = 2, offset = 453473, CreateTime = 1601962346576, serialized key size = 41, serialized value size = -1, headers = RecordHeaders(headers = [RecordHeader(key = OPERATION, value = [68, 69, 76, 69, 84, 69]), RecordHeader(key = __Key_TypeId__, value = [99, 108, 75, 101, 121])], isReadOnly = false), key = {
"orgId": "1",
"orderId": "U4000024004"}, value = null)

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message Endpoint handler details: Method[public void com.demo.test.analytics.testanalytics.consumer.FLReservationKafkaConsumer.consumeReservation(java.lang.String, java.lang.String, java.lang.String)] Bean[com.demo.test.analytics.testanalytics.consumer.FLReservationKafkaConsumer @731702d1]; nested exception is org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException: Could not resolve method parameter at index 0 in public void com.demo.test.analytics.testanalytics.consumer.FLReservationKafkaConsumer.consumeReservation(java.lang.String, java.lang.String, java.lang.String): 1 error(s): [Error in object 'payload': codes[];arguments[]; default message[Payload value must not be empty] ], failedMessage = GenericMessage[payload = org.springframework.kafka.support.KafkaNull @2d99561c, headers = { Key_TypeId = [B @19a2dc5f, kafka_offset = 453473, OPERATION = [B @7d75c01a, kafka_consumer = org.apache.kafka.clients.consumer.KafkaConsumer @363f44ef, kafka_timestampType = CREATE_TIME, kafka_receivedPartitionId = 0, kafka_receivedMessageKey = { "orgId": "1", "orderId": "U4000024004" }, kafka_receivedTopic = test_1order, kafka_receivedTimestamp = 1601962346576, kafka_groupId = reservation_group_id }];nested exception is org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException: Could not resolve method parameter at index 0 in public void com.demo.test.analytics.testanalytics.consumer.FLReservationKafkaConsumer.consumeReservation(java.lang.String, java.lang.String, java.lang.String): 1 error(s): [Error in object 'payload': codes[];arguments[]; default message[Payload value must not be empty] ], failedMessage = GenericMessage[payload = org.springframework.kafka.support.KafkaNull @2d99561c, headers = { Key_TypeId = [B @19a2dc5f, kafka_offset = 453473, OPERATION = [B @7d75c01a, kafka_consumer = org.apache.kafka.clients.consumer.KafkaConsumer @363f44ef, kafka_timestampType = CREATE_TIME, kafka_receivedPartitionId = 0, kafka_receivedMessageKey = { "orgId": "1", "orderId": "U4000024004" }, kafka_receivedTopic = test_1order, kafka_receivedTimestamp = 1601962346576, kafka_groupId = reservation_group_id }] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java: 1925) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java: 1913) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java: 1812) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java: 1739) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java: 1636) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java: 1366) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java: 1082) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java: 990) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java: 511) at java.util.concurrent.FutureTask.run(FutureTask.java: 266) at java.lang.Thread.run(Thread.java: 748) Caused by: org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException: Could not resolve method parameter at index 0 in public void com.demo.test.analytics.testanalytics.consumer.FLReservationKafkaConsumer.consumeReservation(java.lang.String, java.lang.String, java.lang.String): 1 error(s): [Error in object 'payload': codes[];arguments[]; default message[Payload value must not be empty] ] at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java: 122) at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor$KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaListenerAnnotationBeanPostProcessor.java: 901) at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java: 117) at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java: 148) at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java: 116) at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java: 48) at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java: 329) at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java: 86) at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java: 51) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java: 1880) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java: 1862) at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java: 1799) ...8 common frames omitt

Upvotes: 2

Views: 9461

Answers (2)

Ssssst
Ssssst

Reputation: 1

I met the same issue here are some solutions I searched.

  1. Set the spring.listener.ack-mode as manual/manua-immediate in yaml

  2. If the solution above is invalid,try to add containerFactory in annotation @KafkaListener like "@KafkaListener(topics = {"xxxx"},groupId = "xxxx",containerFactory = "batchFactory")" and create a same name bean.

    public KafkaListenerContainerFactory<?> batchFactory(){
        ConcurrentKafkaListenerContainerFactory<String,String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(batchConsumerConfigs()));
        factory.setBatchListener(true); 
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
        return factory;
    }
    

Upvotes: 0

Gary Russell
Gary Russell

Reputation: 174729

You need to specify that the payload is not required.

@Payload(required = false) String payload, ...

Upvotes: 4

Related Questions