ericdemo07
ericdemo07

Reputation: 481

Kafka consumer goes into an unending loop

I am using Kafka queue to hold some objects that is to be retreived by consumer app and perform some operation on it.

Problem : If the processing by consumer takes more than ~2 hrs kafka seems to give back the same object again and again

Code :

  private static Queue queue = new LinkedList();
  while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("\n\n\n Kafka has :[" + record.offset());
                queue.add(record.value());
            }
            System.out.println("\n\n\n Kafka has :[" + records.count());
            if (queue != null) {
                maintainQueue();
            }
        }

Upvotes: 2

Views: 2880

Answers (1)

ericdemo07
ericdemo07

Reputation: 481

I am using kafka version 0.10.1.0

We have resolved this issue by updating ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to "false" and adding consumer.commitSync(); after consumer.poll(Long.MAX_VALUE);

Reference :

Kafka - How to commit offset after every message using High-Level consumer?

http://www.slideshare.net/jjkoshy/offset-management-in-kafka

Upvotes: 2

Related Questions