Prabhakar D
Prabhakar D

Reputation: 1134

how to ensure messages are not lost using the spring KafkaMessageListenerContainer.java

Once I receive a message from kafka I need to run a long running process ( which takes at most 20 seconds), I need to consider a message as successful only if this process completes.

Also I need to ensure that each message is processed at least once.

Thinking of using the KafkaMessageListenerContainer with the following properties:

  1. ThreadPoolTaskExecutor for the listenerTaskExecutor

  2. Using a MessageListener of type AcknowledgingMessageListener

  3. Setting the Acknowledge mode as MANUL_IMMEDIATE.

But the only question I have is what happens if the particular message with offset say 15 is processed successfully first , but message with 14 is still being processed. So in this case my offset will be updated to 15 , even though 14 is not yet processed

How to handle these kind of situations?

Upvotes: 1

Views: 2141

Answers (1)

Gary Russell
Gary Russell

Reputation: 174709

You can't do that; the higher offset will be committed.

If you are using a single partition, you either need to process each request on the same thread or manage the state in your application to avoid committing an offset when there's a gap.

It's the way kafka works.

A simpler solution is to partition your data; offsets are maintained by partition. Use a ConcurrentMessageListenerContainer and the partitions will be distributed across the threads; you must not use an executor in the listener. That way, the container can commit the offset for each partition as it is processed (AckMode.RECORD).

Simply create your topic with at least the number of partitions to satisfy your concurrency requirements - but it's generally better to over-partition the topic.

If you use broker partition assignment, you should be sure to set the session timeout property to be safely greater than your expected 20 seconds max, to avoid a partition rebalance. However, as long as you don't use auto commit, the container will pause the consumer if your listener is taking too long.

Upvotes: 2

Related Questions