Reputation: 83
I have a requirement to start kafka consumer automatically on application startup. But before doing this, I would like to know if there are any other consumers already polling to that topic.
I can see these details from kafka confluent site, but I would like to get these details through java code.
I know there are some .sh files in bin folder of kafka, but I would like to do this in production environment and through java code.
Upvotes: 1
Views: 1115
Reputation: 174514
You can use the AdminClient
to describe the consumer groups:
@SpringBootApplication
public class So67668813Application {
public static void main(String[] args) {
SpringApplication.run(So67668813Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so67668813").partitions(4).replicas(1).build();
}
@KafkaListener(id = "so67668813", topics = "so67668813")
public void listen(String in) {
System.out.println(in);
}
@Bean
public ApplicationRunner runner(KafkaAdmin admin) {
return args -> {
Thread.sleep(2000);
try (AdminClient client = AdminClient.create(admin.getConfigurationProperties())) {
Map<String, ConsumerGroupDescription> groups =
client.describeConsumerGroups(Collections.singletonList("so67668813"))
.all()
.get(10, TimeUnit.SECONDS);
System.out.println(groups);
}
};
}
}
{so67668813=(groupId=so67668813, isSimpleConsumerGroup=false, members=(memberId=consumer-so67668813-1-620a9dd7-c995-461d-bb7d-c141457a5799, groupInstanceId=null, clientId=consumer-so67668813-1, host=/127.0.0.1, assignment=(topicPartitions=so67668813-3,so67668813-1,so67668813-2,so67668813-0)), partitionAssignor=range, state=Stable, coordinator=localhost:9092 (id: 0 rack: null), authorizedOperations=null)}
Upvotes: 1