Reputation: 522
For my own purposes, I need the Spring Boot application to stop if the Kafka Consumer can't connect to the broker. I mean, when Kafka Consumer trying to pool messages, we can see the following logs:
[Consumer clientId=consumer-ddddd-1, groupId=ddddd] Bootstrap broker localhost:9094 (id: -1 rack: null) disconnected
[Consumer clientId=consumer-ddddd-1, groupId=ddddd] Connection to node -1 (localhost/127.0.0.1:9094) could not be established. Broker may not be available.
It`s standard behaviour when topic or broker is not available. As a result - application will not going to stop. But I need.
I'm trying to add the following properties, but it's not work:
spring.kafka.consumer.fetch-max-wait=1000
spring.kafka.admin.fail-fast=true
spring.kafka.session.timeout.ms=1000
In generally I want to get behaviour like: IF CONSUMER CAN'T CONNECT - SHUTDOWN APPLICATION
Example of Kafka Polling:
consumer.poll(Duration.ofMinutes(5));
Upvotes: 0
Views: 1719
Reputation: 15388
My solution to the problem:
@Bean
@Profile("local")
public Boolean localKafkaAvailabilityValidator() {
KafkaAdmin kafkaAdmin = new KafkaAdmin(kafkaProperties.buildAdminProperties());
kafkaAdmin.setFatalIfBrokerNotAvailable(true);
var future = CompletableFuture.runAsync(kafkaAdmin::clusterId);
try {
future.get(2, TimeUnit.SECONDS);
} catch (Exception e) {
throw new RuntimeException("Kafka is not available. Please start it before starting the application.", e);
}
return true;
}
Upvotes: 0
Reputation: 522
As one of the approaches - we can use from the comment above. Or I've just created validation based on the following code and it works
public void validate() {
try {
consumer.listTopics(Duration.ofSeconds(10));
} catch (TimeoutException e) {
logger.error("Topics doesn't exist OR unavailable broker");
System.exit(1);
}
}
Upvotes: 0