aymen0406
aymen0406

Reputation: 127

knowing that kafka has processed all the messages

How to know that Kafka has processed all the messages? is there any command or log file that species Kafka offset under processing and the last Kafka offset?

Upvotes: 1

Views: 1303

Answers (2)

Gary Russell
Gary Russell

Reputation: 174514

If you want to do it programmatically, e.g. from a Spring application:

@Bean
public ApplicationRunner runner(KafkaAdmin admin, ConsumerFactory<String, String> cf) {
    return args -> {
    try (
            AdminClient client = AdminClient.create(admin.getConfig());
            Consumer<String, String> consumer = cf.createConsumer("dummyGroup", "clientId", "");
        ) {
        Collection<ConsumerGroupListing> groups = client.listConsumerGroups()
                .all()
                .get(10, TimeUnit.SECONDS);
        groups.forEach(group -> {
            Map<TopicPartition, OffsetAndMetadata> map = null;
            try {
                map = client.listConsumerGroupOffsets(group.groupId())
                        .partitionsToOffsetAndMetadata()
                        .get(10, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            catch (ExecutionException e) {
                e.printStackTrace();
            }
            catch (TimeoutException e) {
                e.printStackTrace();
            }
            Map<TopicPartition, Long> endOffsets = consumer.endOffsets(map.keySet());
            map.forEach((tp, off) -> {
                System.out.println("group: " + group + " tp: " + tp
                        + " current offset: " + off.offset()
                        + " end offset: " + endOffsets.get(tp));
            });
        });
    }
    };
}

Upvotes: 1

Michael Heil
Michael Heil

Reputation: 18475

You could use the command line tool kafka-consumer-groups.sh to check the consumer lag of your ConsumerGroup. It will show the end offset of the topic the ConsumerGroup is consuming and the last offset the ConsumerGroup committed:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group mygroup
GROUP          TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG       OWNER
mygroup        test-topic      0          5               15              10        xxx
mygroup        test-topic      1          10              15              5         xxx

Upvotes: 2

Related Questions