Filip Allberg
Filip Allberg

Reputation: 5271

Kafka - Consume until empty

I have a use-case where it is paramount to not continue until all consumer records in a KafkaConsumer have been fetched. In this use-case there will be nothing going into the pipeline. What is the proper way to assure that there absolutely positively is nothing at all left to fetch?

Upvotes: 2

Views: 2788

Answers (2)

cakraww
cakraww

Reputation: 2758

If you are using kafka-console-consumer, you can specify timeout-ms argument to define how long it will wait until it is considered to be no more message coming.

--timeout-ms <Integer: timeout_ms>      If specified, exit if no message is    
                                          available for consumption for the    
                                          specified interval.  

Upvotes: 1

Alex Ott
Alex Ott

Reputation: 87359

Kafka is designed to handle infinite streams of data, so the "consume all" means only that nobody sends any data for some period of time (1 minute), 1 hour, etc. - it's up to you.

You can use something like (pseudocode):

int emptyCount = 0;
while (true) {
   records = Consumer.poll(500);
   if (records.empty()) {
      emptyCount++;
      if (emptyCount >= 100) {
         break;
      }
      continue;
   }
   emptyCount = 0;
   ...process records...
}

you can tune timeout in poll & number of empty cycles to reach necessary wait period.

Upvotes: 3

Related Questions