Jeff Gong
Jeff Gong

Reputation: 1863

Why does my Kafka Consumer consume messages quickly on first run, but slows down considerably in future runs?

I am a student researching and playing around with Kafka. After following the examples on the Apache documentation, I'm playing around with the examples portion in the trunk of their current Github repo.

As of right now, the example implements an 'older' version of their Consumer and does not employ the new KafkaConsumer. Following the documentation, I have written my own version of the KafkaConsumer thinking that it would be faster.

This is a vague question, but on runthrough I produce 5000 simple messages such as "Message_CurrentMessageNumber" to a topic "test" and then use my consumer to fetch these messages and print them to stdout. When I run the example code replacing the provided consumer with the newer KafkaConsumer (v 0.8.2 and up) it works pretty quickly and comparably to the example in its first runthrough, but slows down considerably anytime after that.

I notice that my Kafka Server outputs

Rebalancing group group1 generation 3 (kafka.coordinator.ConsumerCoordinator)

or similar messages often which leads me to believe that Kafka has to do some sort of load balancing that slows stuff down but I was wondering if anyone else had insight as to what I am doing wrong.

public class AlternateConsumer extends Thread {

    private final KafkaConsumer<Integer, String> consumer;
    private final String topic;
    private final Boolean isAsync = false;

     public AlternateConsumer(String topic) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("group.id", "newestGroup");
        properties.put("partition.assignment.strategy", "roundrobin");
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("session.timeout.ms", "30000");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumer = new KafkaConsumer<Integer, String>(properties);
        consumer.subscribe(topic);
        this.topic = topic;
    }

    public void run() {
        while (true) {
            ConsumerRecords<Integer, String> records = consumer.poll(100);
            for (ConsumerRecord<Integer, String> record : records) {
                System.out.println("We received message: " + record.value() + " from topic: " + record.topic());
            }
        }

        // ConsumerRecords<Integer, String> records = consumer.poll(0);
        // for (ConsumerRecord<Integer, String> record : records) {
        //  System.out.println("We received message: " + record.value() + " from topic: " + record.topic());
        // }
        // consumer.close();
    }
} 

To start:

package kafka.examples;

public class KafkaConsumerProducerDemo implements KafkaProperties
{
  public static void main(String[] args) {
    final boolean isAsync = args.length > 0 ? !args[0].trim().toLowerCase().equals("sync") : true;

    Producer producerThread = new Producer("test", isAsync);
    producerThread.start();

    AlternateConsumer consumerThread = new AlternateConsumer("test");
    consumerThread.start();
  } 
}

The producer is the default producer located here: https://github.com/apache/kafka/blob/trunk/examples/src/main/java/kafka/examples/Producer.java

Upvotes: 5

Views: 2790

Answers (1)

MSD
MSD

Reputation: 239

This should not be the case. If the setup is similar between your two consumers you should expect better result with new consumer unless there is issue in the client/consumer implementation, which seems to be the case here.

Can you share your benchmark results and the frequency of reported rebalancing and/or any pattern (i.e. sluggish once at startup, after fixed message consumption, after the queue is drained, etc) you are observing. Also if you can share some details about your consumer implementation.

Upvotes: 1

Related Questions