surbhi bakshi
surbhi bakshi

Reputation: 160

kafka consumer goes in infinite wait when server or port details are wrong

I have set up kafka consumer with below properties

    Properties consumerProperties = new Properties();
    consumerProperties.put("bootstrap.servers",server);
    consumerProperties.put("group.id",groupId);
    consumerProperties.put("security.protocol", "SASL_PLAINTEXT");
    consumerProperties.put("sasl.mechanism", "PLAIN");
    consumerProperties.put("enable.auto.commit", "false");
    consumerProperties.put("acks", "all");
    consumerProperties.put("request.timeout.ms", 12000);
    consumerProperties.put("max.block.ms",500);
    consumerProperties.put("session.timeout.ms", 11000);
    consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    consumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 //Object creation from above properties
 KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);
 // i have try catch blocks but exceptions aren't being thrown.

It goes in an infinte wait while polling if the server details aren't correct - like the server name is wrong or port is incorrect

            try{
                LOGGER.info("Subscribing to topic.");
                consumer.subscribe(Arrays.asList(topic));
                LOGGER.info("Subscribed to topic successfully.");
                LOGGER.info("Start of polling records for consumer. ");
                ***records = consumer.poll(100);***
         //CODE GETS STUCK IN ABOVE LINE FOR INFINITE TIME AND DOESNT COMES OUT
                LOGGER.info("Returning records to microservice.");
            }
            catch(InterruptException interruptException) {
                LOGGER.error("interrupt exception "+interruptException);
            }
            catch(TimeoutException timeoutException) {
                LOGGER.error("Time out exception "+timeoutException);
            }
            catch (KafkaException kafkaException) {
                LOGGER.error("Kafka Exception occurred while consuming records by consumer. Message: "+kafkaException.getMessage());
            }
            catch(Exception exception){
                LOGGER.error("Exception occured while creating consumer object "+exception);
            }

please suggest what changes i need to make to interrupt the infinite wait on poll for incorrect servers?

Upvotes: 2

Views: 1499

Answers (4)

Clover
Clover

Reputation: 569

Restarting both zookeeper and kafka worked for me.

Upvotes: 0

kellanburket
kellanburket

Reputation: 12853

As others have pointed out, Kafka's internal ConsumerCoordinator currently has a built-in timeout of 9223372036854775807ms while attempting to ensure coordinator readiness.

If you're just looking to make sure your host/port information is correct before attempting to poll the consumer, a simple call to consumer.listTopics() should do the trick. It will throw a org.apache.kafka.common.errors.TimeoutException if it can't connect.

Upvotes: 2

Mickael Maison
Mickael Maison

Reputation: 26950

This is a known issue. See the JIRA issue: https://issues.apache.org/jira/browse/KAFKA-3834

The only way to get out of the infinite loop is to call wakeup() from another thread.

Upvotes: 1

S&#246;nke Liebau
S&#246;nke Liebau

Reputation: 1973

I don't think there is much you can do about that currently apart from externally checking how long your first poll is running and interrupting it after a certain time. The best way for doing this is probably using an ExecutorService like discussed in this SO answer.

There are a few tickets in the Kafka jira around this that you can watch for any development around this(KAFKA-1894, KAFKA-2391, KAFKA-3834) but there hasn't been a lot of active discussion on the topic lately.

Upvotes: 1

Related Questions