DJ180
DJ180

Reputation: 19854

Kafka: Intermittent slowness when consuming first message from topic

I am using Kafka 0.9.0.1.

The first time I start up my application it takes 20-30 seconds to retrieve the "latest" message from the topic

I've used different Kafka brokers (with different configs) yet I still see this behaviour. There is usually no slowness for subsequent messages.

Is this expected behaviour? you can clearly see this below by running this sample application and changing the broker/topic name to your own settings

public class KafkaProducerConsumerTest {

    public static final String KAFKA_BROKERS = "...";
    public static final String TOPIC = "...";

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        new KafkaProducerConsumerTest().run();
    }

    public void run() throws ExecutionException, InterruptedException {
        Properties consumerProperties = new Properties();
        consumerProperties.setProperty("bootstrap.servers", KAFKA_BROKERS);
        consumerProperties.setProperty("group.id", "Test");
        consumerProperties.setProperty("auto.offset.reset", "latest");
        consumerProperties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumerProperties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        MyKafkaConsumer kafkaConsumer = new MyKafkaConsumer(consumerProperties, TOPIC);
        Executors.newFixedThreadPool(1).submit(() -> kafkaConsumer.consume());

        Properties producerProperties = new Properties();
        producerProperties.setProperty("bootstrap.servers", KAFKA_BROKERS);
        producerProperties.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producerProperties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        MyKafkaProducer kafkaProducer = new MyKafkaProducer(producerProperties, TOPIC);
        kafkaProducer.publish("Test Message");
    }
}


class MyKafkaConsumer {
    private final Logger logger = LoggerFactory.getLogger(MyKafkaConsumer.class);
    private KafkaConsumer<String, Object> kafkaConsumer;

    public MyKafkaConsumer(Properties properties, String topic) {
        kafkaConsumer = new KafkaConsumer<String, Object>(properties);
        kafkaConsumer.subscribe(Lists.newArrayList(topic));
    }

    public void consume() {
        while (true) {
            logger.info("Started listening...");
            ConsumerRecords<String, Object> consumerRecords = kafkaConsumer.poll(Long.MAX_VALUE);
            logger.info("Received records {}", consumerRecords.iterator().next().value());
        }
    }
}

class MyKafkaProducer {
    private KafkaProducer<String, Object> kafkaProducer;
    private String topic;

    public MyKafkaProducer(Properties properties, String topic) {
        this.kafkaProducer = new KafkaProducer<String, Object>(properties);
        this.topic = topic;
    }

    public void publish(Object object) throws ExecutionException, InterruptedException {
        ProducerRecord<String, Object> producerRecord = new ProducerRecord<>(topic, "key", object);
        Future<RecordMetadata> response = kafkaProducer.send(producerRecord);
        response.get();
    }

}

Upvotes: 2

Views: 2430

Answers (3)

evanwoods
evanwoods

Reputation: 79

According to this link:

Try setting group_id=None in your consumer, or call consumer.close() before ending script, or use assign() not subscribe (). Otherwise you are rejoining an existing group that has known but unresponsive members. The group coordinator will wait until those members checkin/leave/timeout. Since the consumers no longer exist (it's your prior script runs) they have to timeout. And consumer.poll() blocks during group rebalance.

So it is correct behavior if you join group with unresponsively members (maybe you terminate the application ungracefully).

Please confirm you call "consumer.close()" before exiting your application.

Upvotes: 1

Harald
Harald

Reputation: 5103

Just tried your code with minimal logging additions now many times. Here is a typical log output:

2016-07-24 15:12:51,417 Start polling...|INFO|KafkaProducerConsumerTest
2016-07-24 15:12:51,604 producer has send message|INFO|KafkaProducerConsumerTest
2016-07-24 15:12:51,619 producer got response, exiting|INFO|KafkaProducerConsumerTest
2016-07-24 15:12:51,679 Received records [Test Message]|INFO|KafkaProducerConsumerTest
2016-07-24 15:12:51,679 Start polling...|INFO|KafkaProducerConsumerTest
2016-07-24 15:12:54,680 returning on empty poll result|INFO|KafkaProducerConsumerTest

The sequence of events is as expected and in a timely manner. The consumer starts polling, the producer sends the message and receives a result, the consumer receives the message and all this with 300ms. Then the consumer starts polling again and is thrown out 3 seconds later as I change the poll timeout respectively.

I am using Kafka 0.9.0.1 for broker and client libraries. The connection is on localhost and it is a test environment with no load at all.

For completeness, here is the log form the server that was triggered by the exchange above.

[2016-07-24 15:12:51,599] INFO [GroupCoordinator 0]: Preparing to restabilize group Test with old generation 0 (kafka.coordinator.GroupCoordinator)
[2016-07-24 15:12:51,599] INFO [GroupCoordinator 0]: Stabilized group Test generation 1 (kafka.coordinator.GroupCoordinator)
[2016-07-24 15:12:51,617] INFO [GroupCoordinator 0]: Assignment received from leader for group Test for generation 1 (kafka.coordinator.GroupCoordinator)
[2016-07-24 15:13:24,635] INFO [GroupCoordinator 0]: Preparing to restabilize group Test with old generation 1 (kafka.coordinator.GroupCoordinator)
[2016-07-24 15:13:24,637] INFO [GroupCoordinator 0]: Group Test generation 1 is dead and removed (kafka.coordinator.GroupCoordinator)

You may want to compare with your server logs for the same exchange.

Upvotes: 0

Manav Garg
Manav Garg

Reputation: 522

The first message should take longer than the rest because when you start a new consumer in the consumer group specified by the statement consumerProperties.setProperty("group.id", "Test");, Kakfka will balance the partitions such that each partition is consumed by atmost one consumer and will distribute the partitions for the topic across multiple consumer processes.

Also, with Kafka 0.9, there is a seperate __consumer_offsets topic which Kafka uses to manage the offsets for each consumer in a consumer group. It is likely that when you start the consumer for the first time, it looks at this topic to fetch the latest offset (there might have been a consumer consuming from this topic earlier which would have got killed, therefore it is necessary to fetch from the correct offset).

These 2 factors will cause a higher latency in the consumption of first set of messages. I can't comment on the exact latency of 20-30 seconds, but I guess this should be the default behaviour.

PS: The exact number might also depend upon other secondary factors like whether you are running the broker & the consumers on the same machine (where there would be no network latency) or on different ones where they would be communicating using TCP.

Upvotes: 4

Related Questions