Reputation: 723
I am using Kafka 0.10.2 and to receive records from Kafka, I have a consumer poll loop like below:
while (true) {
ConsumerRecords<Long, String> records = consumer.poll(2000);
int count= 0;
long lasttimestamp = 0;
long lastOffset = 0;
for (ConsumerRecord<Long, String> record : records) {
if (record.timestamp() >= end_time_Stamp) {
reachedEnd = true;
break;
}
result.add(record);
}
if (reachedEnd) break;
if (records == null || records.isEmpty()) break; // dont wait for records
}
Here before the poll loop we seek to start time stamp by finding the offset using the "offsetsForTimes" API. We get the records till we get to the end timestamp.
Records are fetched using the consumer.poll API. How to know what is the vale that should be passed for consumer poll timeout?. Currently we just do it via trial and error, seeing which would work. I think there should be a better way.
Questions:
Upvotes: 0
Views: 8416
Reputation: 87319
int counter=0;
while(true) {
consumer.poll(1000);
for (...) {
}
if (records == null || records.isEmpty()) {
counter++;
if (counter > 10)
break;
} else {
counter = 0;
}
}
Upvotes: 1