Reputation: 160
I have set up kafka consumer with below properties
Properties consumerProperties = new Properties();
consumerProperties.put("bootstrap.servers",server);
consumerProperties.put("group.id",groupId);
consumerProperties.put("security.protocol", "SASL_PLAINTEXT");
consumerProperties.put("sasl.mechanism", "PLAIN");
consumerProperties.put("enable.auto.commit", "false");
consumerProperties.put("acks", "all");
consumerProperties.put("request.timeout.ms", 12000);
consumerProperties.put("max.block.ms",500);
consumerProperties.put("session.timeout.ms", 11000);
consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//Object creation from above properties
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);
// i have try catch blocks but exceptions aren't being thrown.
It goes in an infinte wait while polling if the server details aren't correct - like the server name is wrong or port is incorrect
try{
LOGGER.info("Subscribing to topic.");
consumer.subscribe(Arrays.asList(topic));
LOGGER.info("Subscribed to topic successfully.");
LOGGER.info("Start of polling records for consumer. ");
***records = consumer.poll(100);***
//CODE GETS STUCK IN ABOVE LINE FOR INFINITE TIME AND DOESNT COMES OUT
LOGGER.info("Returning records to microservice.");
}
catch(InterruptException interruptException) {
LOGGER.error("interrupt exception "+interruptException);
}
catch(TimeoutException timeoutException) {
LOGGER.error("Time out exception "+timeoutException);
}
catch (KafkaException kafkaException) {
LOGGER.error("Kafka Exception occurred while consuming records by consumer. Message: "+kafkaException.getMessage());
}
catch(Exception exception){
LOGGER.error("Exception occured while creating consumer object "+exception);
}
please suggest what changes i need to make to interrupt the infinite wait on poll for incorrect servers?
Upvotes: 2
Views: 1499
Reputation: 12853
As others have pointed out, Kafka's internal ConsumerCoordinator currently has a built-in timeout of 9223372036854775807ms while attempting to ensure coordinator readiness.
If you're just looking to make sure your host/port information is correct before attempting to poll the consumer, a simple call to consumer.listTopics()
should do the trick. It will throw a org.apache.kafka.common.errors.TimeoutException
if it can't connect.
Upvotes: 2
Reputation: 26950
This is a known issue. See the JIRA issue: https://issues.apache.org/jira/browse/KAFKA-3834
The only way to get out of the infinite loop is to call wakeup()
from another thread.
Upvotes: 1
Reputation: 1973
I don't think there is much you can do about that currently apart from externally checking how long your first poll is running and interrupting it after a certain time. The best way for doing this is probably using an ExecutorService like discussed in this SO answer.
There are a few tickets in the Kafka jira around this that you can watch for any development around this(KAFKA-1894, KAFKA-2391, KAFKA-3834) but there hasn't been a lot of active discussion on the topic lately.
Upvotes: 1