Reputation: 125
I has consumer with configuration:
public static Consumer<String, TransactionDataAvro> createConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleAvroConsumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true");
props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8000");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new KafkaConsumer<>(props);
}
He subscribed on topic
this.consumer.subscribe(Collections.singletonList("fss-fsstransdata"));
Everytime I try to get messages (this.consumer.poll(Duration.ofMillis(500))
), it stably returns only to the second method call
code
log.info("COUNT:" + this.consumer.poll(Duration.ofMillis(1000)).count());
log.info("COUNT:" + this.consumer.poll(Duration.ofMillis(1000)).count());
log.info("COUNT:" + this.consumer.poll(Duration.ofMillis(1000)).count());
log.info("COUNT:" + this.consumer.poll(Duration.ofMillis(1000)).count());
log.info("COUNT:" + this.consumer.poll(Duration.ofMillis(1000)).count());
logs:
| 17:47:40.688 | main | INFO | IaStepsDefinitions | COUNT:0
| 17:47:40.689 | main | INFO | IaStepsDefinitions | COUNT:1
| 17:47:41.690 | main | INFO | IaStepsDefinitions | COUNT:0
| 17:47:42.691 | main | INFO | IaStepsDefinitions | COUNT:0
| 17:47:43.692 | main | INFO | IaStepsDefinitions | COUNT:0
Please explain to me why so
Upvotes: 1
Views: 2030
Reputation: 1006
Consumer.poll()
has really lots of things going on under the hood except for actually polling the data.
And all these steps are bound by the Duration object you pass to the method poll(), you can see that things are going to work even worse if you have Duration = 1ms.
In my opinion it's misleading and incorrect to put this logic to the poll() method, give the poll do the polling, do the rest in the background threads and/or subscribe method.
When you do the poll, you don't expect the system to do the following:
if (!updateAssignmentMetadataIfNeeded(timer)) {
return ConsumerRecords.empty();
}
The poll is a client facing logic, if it gets 0 records it should mean the broker is empty.
If you call REST service and get empty the response you know server's empty. If you call PreparedStatement.execute() you get correct result or exception. If you call RabbitMQ.basicGet() you get an empty response it means queue is empty.
So long story short, in your case, just increase timeout for the first poll and you should get the updates right away.
Upvotes: 2