Reputation: 5271
I have a use-case where it is paramount to not continue until all consumer records in a KafkaConsumer
have been fetched. In this use-case there will be nothing going into the pipeline. What is the proper way to assure that there absolutely positively is nothing at all left to fetch?
Upvotes: 2
Views: 2788
Reputation: 2758
If you are using kafka-console-consumer
, you can specify timeout-ms
argument to define how long it will wait until it is considered to be no more message coming.
--timeout-ms <Integer: timeout_ms> If specified, exit if no message is
available for consumption for the
specified interval.
Upvotes: 1
Reputation: 87359
Kafka is designed to handle infinite streams of data, so the "consume all" means only that nobody sends any data for some period of time (1 minute), 1 hour, etc. - it's up to you.
You can use something like (pseudocode):
int emptyCount = 0;
while (true) {
records = Consumer.poll(500);
if (records.empty()) {
emptyCount++;
if (emptyCount >= 100) {
break;
}
continue;
}
emptyCount = 0;
...process records...
}
you can tune timeout in poll & number of empty cycles to reach necessary wait period.
Upvotes: 3