aran
aran

Reputation: 11900

Kafka Consumer - Poll behaviour

I'm facing some serious problems trying to implement a solution for my needs, regarding KafkaConsumer (>=0.9).

Let's imagine I have a function that has to read just n messages from a kafka topic.

For example: getMsgs(5) --> gets next 5 kafka messages in topic.

So, I have a loop that looks like this. Edited with actual correct parameters. In this case, the consumer's max.poll.records param was set to 1, so the actual loop only iterated once. Different consumers(some of them iterated through many messages) shared an abstract father (this one), that's why it's coded that way. The numMss part was ad-hoc for this consumer.

for (boolean exit= false;!exit;)
{
   Records = consumer.poll(config.pollTime);
   for (Record r:records) 
   {
       processRecord(r); //do my things
       numMss++;
       if (numMss==maximum) //maximum=5
       {   
          exit=true;
          break;
       }
   }
}

Taking this into account, the problem is that the poll() method could get more than 5 messages. For example, if it gets 10 messages, my code will forget forever those other 5 messages, since Kafka will think they're already consumed.

I tried commiting the offset but doesn't seem to work:

    consumer.commitSync(Collections.singletonMap(partition,
    new OffsetAndMetadata(record.offset() + 1)));

Even with the offset configuration, whenever I launch again the consumer, it won't start from the 6th message (remember, I just wanted 5 messages), but from the 11th (since the first poll consumed 10 messages).

Is there any solution for this, or maybe (most surely) am I missing something?

Thanks in advance!!

Upvotes: 19

Views: 70322

Answers (4)

Hussain
Hussain

Reputation: 1015

set auto.offset.reset property as "earliest". Then try consume, you will get the consumed records from the committed offset.

Or you use consumer.seek(TopicPartition, offset) api before poll.

Upvotes: 0

user1870400
user1870400

Reputation: 6364

You can set max.poll.records to whatever number you like such that at most you will get that many records on each poll.

For your use case that you stated in this problem you don't have to commit offsets explicitly by yourself. you can just set enable.auto.commit to trueand set auto.offset.reset to earliest such that it will kick in when there is no consumer group.id (other words when you are about start reading from a partition for the very first time). Once you have a group.id and some consumer offsets stored in Kafka and in case your Kafka consumer process dies it will continue from the last committed offset since it is the default behavior because when a consumer starts it will first look for if there are any committed offsets and if so, will continue from the last committed offset and auto.offset.reset won't kick in.

Upvotes: 21

Amit_Hora
Amit_Hora

Reputation: 708

From Kafka 0.9 the auto.offset.reset parameter names have changed;

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):

earliest: automatically reset the offset to the earliest offset

latest: automatically reset the offset to the latest offset

none: throw exception to the consumer if no previous offset is found for the consumer's group

anything else: throw exception to the consumer.

Upvotes: 0

Deeps
Deeps

Reputation: 2079

Had you disabled auto commit by setting enable.auto.commit to false. You need to disable that if you want to manually commit the offset. Without that next call to poll() will automatically commit the latest offset of the messages you received from previous poll().

Upvotes: 1

Related Questions