user2715182
user2715182

Reputation: 723

Kafka: How to give consumer poll time out value?

I am using Kafka 0.10.2 and to receive records from Kafka, I have a consumer poll loop like below:

while (true) {
    ConsumerRecords<Long, String> records = consumer.poll(2000);
    int count= 0;
    long lasttimestamp = 0;
    long lastOffset = 0;
    for (ConsumerRecord<Long, String> record : records) {
        if (record.timestamp() >= end_time_Stamp) {
            reachedEnd = true;
            break;
        }
        result.add(record);
    }

    if (reachedEnd) break;
    if (records == null || records.isEmpty()) break; // dont wait for records
}

Here before the poll loop we seek to start time stamp by finding the offset using the "offsetsForTimes" API. We get the records till we get to the end timestamp.

Records are fetched using the consumer.poll API. How to know what is the vale that should be passed for consumer poll timeout?. Currently we just do it via trial and error, seeing which would work. I think there should be a better way.

Questions:

  1. how to know what is the ideal timeout value that can be given to the consumer.poll API? what does it depend on? Should it taken as run time parameter?
  2. Sometimes the timeout value needed is larger. what can cause the sudden spike in the timeouts required ? (If the ingestion rates are too high in kafka, does that affect the required consumer poll timeout configuration?)
  3. How to give up? when there are no records we should just break out of the loop. how to reliably know how to break out of the loop and not prematurely?

Upvotes: 0

Views: 8416

Answers (1)

Alex Ott
Alex Ott

Reputation: 87319

  1. timeout in poll depends on your application - you can wait longer for data if you can, but if you need to perform some actions in the meantime, then it makes no sense to wait too long
  2. This could be caused by multiple reasons, including rebalancing, etc.
  3. I suggest that you don't break up at the first moment when the list of records is empty, but instead define poll to 1000ms, and then count how many times straight the list of records was empty, and break out if it was empty 10 times (10s) or something like this:
int counter=0;
while(true) {
   consumer.poll(1000);
   for (...) {
   }
   if (records == null || records.isEmpty()) {
     counter++;
      if (counter > 10)
         break;
   } else {
     counter = 0;
   }
}

Upvotes: 1

Related Questions