Mazen Ezzeddine
Mazen Ezzeddine

Reputation: 822

Kafka consumer : offset lag between consumer and poducer

I am writing a Kafka consumer application where I have one consumer for each partition. The code is shown below

while (true) {  
  ConsumerRecords<String, String> records = consumer.poll(100);   
        for (ConsumerRecord<String, String> record : records) {     
            System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n",        
            record.topic(), record.partition(), record.offset(), record.key(), record.value());   
 }    
  consumer.commitAsync(new OffsetCommitCallback() {       
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {  
           if (e != null)              
              log.error("Commit failed for offsets {}", offsets, e); }}); 
}

Is there a way to programmatically access and print the consumer lag offset, or otherwise said the difference in positions between the offset of the last record read by the consumer and the offset of the last record written into that consumer partition by a certain producer.

What statements should I add to the above to get the lag offset value, knowing that my final goal is to send this value to prometheus for monitoring?

Upvotes: 1

Views: 2136

Answers (2)

Michael Heil
Michael Heil

Reputation: 18475

There are various options that you can do to monitor your consumer lags.

Using KafkaConsumer's endOffsets API

According to the JavaDocs on the KafkaConsumer you can make use of endOffsets method to "Get the end offsets for the given partitions."

Using JMX Metrics

You could make use of Kafka's monitoring capabilities using JMX metrics. In the Documentation on Monitoring Kafka it is explained how to get the "Number of messages the consumer lags behind the producer by. Published by the consumer, not broker."

Upvotes: 1

Mickael Maison
Mickael Maison

Reputation: 26885

If your goal is to have the data in prometheus, you should use the records-lag metrics emitted by Consumers.

In the last table in the Consumer Fetch Metrics section in the docs, you can see that Consumers emit lag (and lead) per topic-partition.

By default, metrics are emitted via JMX, so you could use Prometheus' JMX exporter to do the job for example.

Upvotes: 2

Related Questions