Arpan Parikh
Arpan Parikh

Reputation: 83

How to get number of consumers for a particular topic through java in kafka

I have a requirement to start kafka consumer automatically on application startup. But before doing this, I would like to know if there are any other consumers already polling to that topic.
I can see these details from kafka confluent site, but I would like to get these details through java code.
I know there are some .sh files in bin folder of kafka, but I would like to do this in production environment and through java code.

Upvotes: 1

Views: 1115

Answers (1)

Gary Russell
Gary Russell

Reputation: 174514

You can use the AdminClient to describe the consumer groups:

@SpringBootApplication
public class So67668813Application {

    public static void main(String[] args) {
        SpringApplication.run(So67668813Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so67668813").partitions(4).replicas(1).build();
    }

    @KafkaListener(id = "so67668813", topics = "so67668813")
    public void listen(String in) {
        System.out.println(in);
    }


    @Bean
    public ApplicationRunner runner(KafkaAdmin admin) {
        return args -> {
            Thread.sleep(2000);
            try (AdminClient client = AdminClient.create(admin.getConfigurationProperties())) {
                Map<String, ConsumerGroupDescription> groups =
                        client.describeConsumerGroups(Collections.singletonList("so67668813"))
                                .all()
                                .get(10, TimeUnit.SECONDS);
                System.out.println(groups);
            }
        };
    }

}

{so67668813=(groupId=so67668813, isSimpleConsumerGroup=false, members=(memberId=consumer-so67668813-1-620a9dd7-c995-461d-bb7d-c141457a5799, groupInstanceId=null, clientId=consumer-so67668813-1, host=/127.0.0.1, assignment=(topicPartitions=so67668813-3,so67668813-1,so67668813-2,so67668813-0)), partitionAssignor=range, state=Stable, coordinator=localhost:9092 (id: 0 rack: null), authorizedOperations=null)}

Upvotes: 1

Related Questions