nYmERioN805
nYmERioN805

Reputation: 155

Acknowledgement.acknowledge() throwing exception in spring-kafka @KafkaListener

When I set enable.auto.commit to false and try to manually commit offset using annotation based spring-kafka @KafkaListener, I get a org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message

I have a very simple code as follows:

@KafkaListener(id = "someid", topics = "${demo.topic}", containerFactory = "someContainerFactory")
public void listenFooGroup(String message, Acknowledgement ack) {
    System.out.println("Received Messasge in group 'foo': " + message);

    // TODO: Do something with the message
}

And when I send a message from the producer, I get the following exception:

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message.

Endpoint handler details:

Method [public void com.****.*****.*******.KafkaMessageListener.listenFooGroup(java.lang.String,org.springframework.kafka.support.Acknowledgment)]

Bean [com.****.*****.*******.KafkaMessageListener@5856dbe4]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [org.springframework.kafka.support.Acknowledgment] for GenericMessage [payload=test, headers={kafka_offset=57, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=demotopic}], failedMessage=GenericMessage [payload=test, headers={kafka_offset=57, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=demotopic}]

Please help. TIA.

Upvotes: 15

Views: 28712

Answers (2)

Ajay V
Ajay V

Reputation: 563

Solved this issue in spring boot 2.4.3 by applying the following to application.properties

spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=manual

Set ack-mode to manual or manual_immediate, based on your use case.

Upvotes: 2

Gary Russell
Gary Russell

Reputation: 174689

You have to the set the container factory's containerProperties ackMode to MANUAL or MANUAL_IMMEDIATE to get an Acknowledgment object.

With other ack modes, the container is responsible for committing the offset.

factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE)

Or set the ....ackMode property if using Spring Boot

Upvotes: 35

Related Questions