Reputation: 127
How to know that Kafka has processed all the messages? is there any command or log file that species Kafka offset under processing and the last Kafka offset?
Upvotes: 1
Views: 1303
Reputation: 174514
If you want to do it programmatically, e.g. from a Spring application:
@Bean
public ApplicationRunner runner(KafkaAdmin admin, ConsumerFactory<String, String> cf) {
return args -> {
try (
AdminClient client = AdminClient.create(admin.getConfig());
Consumer<String, String> consumer = cf.createConsumer("dummyGroup", "clientId", "");
) {
Collection<ConsumerGroupListing> groups = client.listConsumerGroups()
.all()
.get(10, TimeUnit.SECONDS);
groups.forEach(group -> {
Map<TopicPartition, OffsetAndMetadata> map = null;
try {
map = client.listConsumerGroupOffsets(group.groupId())
.partitionsToOffsetAndMetadata()
.get(10, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
e.printStackTrace();
}
catch (ExecutionException e) {
e.printStackTrace();
}
catch (TimeoutException e) {
e.printStackTrace();
}
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(map.keySet());
map.forEach((tp, off) -> {
System.out.println("group: " + group + " tp: " + tp
+ " current offset: " + off.offset()
+ " end offset: " + endOffsets.get(tp));
});
});
}
};
}
Upvotes: 1
Reputation: 18475
You could use the command line tool kafka-consumer-groups.sh
to check the consumer lag of your ConsumerGroup. It will show the end offset of the topic the ConsumerGroup is consuming and the last offset the ConsumerGroup committed:
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group mygroup
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER
mygroup test-topic 0 5 15 10 xxx
mygroup test-topic 1 10 15 5 xxx
Upvotes: 2