Ardiel
Ardiel

Reputation: 31

how to get kafka lag using java

I have currently developed a code that would display the topic, partition, and log offset. But I am currently stuck on how to get the Lag of a partition. I know that there is a kafka offset command that does this function but what I need is a java code.

public static void main(String[] args) throws Exception {
    System.out.println("START CONSUMER");final Properties props = new Properties();
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUPID);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

    // Create the consumer using props.
    final Consumer<Long, String> consumer =  new KafkaConsumer<>(props);

    // Subscribe to the topic.
    int i = 0;
    ArrayList<TopicPartition> partitions = new ArrayList<TopicPartition>();
    for (i=0;i<consumer.partitionsFor(TOPIC).size();i++)
    {
        TopicPartition partitiontemp = new TopicPartition(TOPIC, i);
        partitions.add(partitiontemp);
    }
    consumer.assign(partitions);
    consumer.seekToEnd(partitions);

    for (i=0;i<consumer.partitionsFor(TOPIC).size();i++)
    {
        System.out.printf("Topic: %s partitionID: %d log offset: %d \n", TOPIC, i, consumer.position(partitions.get(i)));
    }

    System.out.printf("CREATE CONSUMER DONE");
    consumer.close();

This is the output of my code

What I need to do is to output the topic, partition, current offset, log offset, and lag. How can I get the lag for my code or how can I get the current offset for my code. (see image for needed output).

Needed output

NOTE: I cannot use the (foreach record) functionality because I must not read each record in the input file.

Upvotes: 2

Views: 5069

Answers (3)

user22399844
user22399844

Reputation: 1

private static long calculateTopicLag(Properties properties, String topic, String consumerGroup) {
        try (AdminClient adminClient = AdminClient.create(properties)) {
            ListConsumerGroupOffsetsResult groupOffsetsResult = adminClient.listConsumerGroupOffsets(consumerGroup);
            Map<TopicPartition, OffsetAndMetadata> offsets = groupOffsetsResult.partitionsToOffsetAndMetadata().get();

            long topicLag = 0;
            for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
                TopicPartition topicPartition = entry.getKey();
                long consumerOffset = entry.getValue().offset();

                KafkaConsumer<?, ?> consumer = new KafkaConsumer<>(properties);
                consumer.assign(Collections.singletonList(topicPartition));
                long endOffset = consumer.endOffsets(Collections.singletonList(topicPartition)).get(topicPartition);
                consumer.close();

                long partitionLag = endOffset - consumerOffset;
                topicLag += partitionLag;
            }

            return topicLag;
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
            return -1;
        }
    }

Upvotes: 0

GSK
GSK

Reputation: 563

You can have LAG by getting EndOffset from consumer

Set<TopicPartition> partitionSet = consumer.assignment();
Map<TopicPartition, Long> endOffsets =consumer.endOffsets(consumer.assignment());

then iterate where over set

for(TopicPartition tp : partitionSet) { LOG.info("Topic :: {} ,EndOffset :: {}, currentOffset {}",tp.topic(),beginningOffsets.get(tp),endOffsets.get(tp), consumer.position(tp)); }

consumer.position(tp) -- will get you current offset, subtract this from endoffset , you get LAG

Upvotes: 1

Mickael Maison
Mickael Maison

Reputation: 26885

To reproduce the kafka-consumer-groups functionality, you need both a Consumer and an AdminClient instance.

First, use the AdminClient, you can call listConsumerGroupOffsets() to retrieve the list of topic-partitions and the committed offsets for a specific group.

Then use a Consumer to get the End offsets for these partitions. The method you used is inefficient, there's no need to assign and seek to the end offset. You can simply call endOffsets().

That is enough for reproducing the data contained in your screenshot.

kafka-consumer-groups also uses AdminClient.describeConsumerGroups() to print the group member assigned (if any) to each partition.

Upvotes: 6

Related Questions