Reputation: 481
I am using Kafka queue to hold some objects that is to be retreived by consumer app and perform some operation on it.
Problem : If the processing by consumer takes more than ~2 hrs kafka seems to give back the same object again and again
Code :
private static Queue queue = new LinkedList();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<String, String> record : records) {
System.out.println("\n\n\n Kafka has :[" + record.offset());
queue.add(record.value());
}
System.out.println("\n\n\n Kafka has :[" + records.count());
if (queue != null) {
maintainQueue();
}
}
Upvotes: 2
Views: 2880
Reputation: 481
I am using kafka version 0.10.1.0
We have resolved this issue
by updating
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to "false"
and adding consumer.commitSync();
after consumer.poll(Long.MAX_VALUE);
Reference :
Kafka - How to commit offset after every message using High-Level consumer?
http://www.slideshare.net/jjkoshy/offset-management-in-kafka
Upvotes: 2