mr_Stat1c
mr_Stat1c

Reputation: 125

Kafka consumer poll gets messages only the second time

I has consumer with configuration:

 public static Consumer<String, TransactionDataAvro> createConsumer() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleAvroConsumer");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
        props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
        props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8000");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return new KafkaConsumer<>(props);
    }

He subscribed on topic

 this.consumer.subscribe(Collections.singletonList("fss-fsstransdata"));

Everytime I try to get messages (this.consumer.poll(Duration.ofMillis(500))), it stably returns only to the second method call

code

  log.info("COUNT:" + this.consumer.poll(Duration.ofMillis(1000)).count());
  log.info("COUNT:" + this.consumer.poll(Duration.ofMillis(1000)).count());
  log.info("COUNT:" + this.consumer.poll(Duration.ofMillis(1000)).count());
  log.info("COUNT:" + this.consumer.poll(Duration.ofMillis(1000)).count());
  log.info("COUNT:" + this.consumer.poll(Duration.ofMillis(1000)).count());

logs:

| 17:47:40.688 | main                | INFO  | IaStepsDefinitions               | COUNT:0
| 17:47:40.689 | main                | INFO  | IaStepsDefinitions               | COUNT:1
| 17:47:41.690 | main                | INFO  | IaStepsDefinitions               | COUNT:0
| 17:47:42.691 | main                | INFO  | IaStepsDefinitions               | COUNT:0
| 17:47:43.692 | main                | INFO  | IaStepsDefinitions               | COUNT:0

Please explain to me why so

Upvotes: 1

Views: 2030

Answers (1)

Eddie Jamsession
Eddie Jamsession

Reputation: 1006

Consumer.poll() has really lots of things going on under the hood except for actually polling the data.

  1. Trying to find the group coordinator
  2. Connecting to this group coordinator node
  3. Starting heart beat thread
  4. Joining the group and being assigned by partitions
  5. Reset offsets to commited or earliest/latest if not found
  6. Fetch the records

And all these steps are bound by the Duration object you pass to the method poll(), you can see that things are going to work even worse if you have Duration = 1ms.

In my opinion it's misleading and incorrect to put this logic to the poll() method, give the poll do the polling, do the rest in the background threads and/or subscribe method.

When you do the poll, you don't expect the system to do the following:

            if (!updateAssignmentMetadataIfNeeded(timer)) {
                return ConsumerRecords.empty();
            }

The poll is a client facing logic, if it gets 0 records it should mean the broker is empty.

If you call REST service and get empty the response you know server's empty. If you call PreparedStatement.execute() you get correct result or exception. If you call RabbitMQ.basicGet() you get an empty response it means queue is empty.

So long story short, in your case, just increase timeout for the first poll and you should get the updates right away.

Upvotes: 2

Related Questions