Reputation: 822
I am writing a Kafka consumer application where I have one consumer for each partition. The code is shown below
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
consumer.commitAsync(new OffsetCommitCallback() {
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
if (e != null)
log.error("Commit failed for offsets {}", offsets, e); }});
}
Is there a way to programmatically access and print the consumer lag offset, or otherwise said the difference in positions between the offset of the last record read by the consumer and the offset of the last record written into that consumer partition by a certain producer.
What statements should I add to the above to get the lag offset value, knowing that my final goal is to send this value to prometheus for monitoring?
Upvotes: 1
Views: 2136
Reputation: 18475
There are various options that you can do to monitor your consumer lags.
According to the JavaDocs on the KafkaConsumer you can make use of endOffsets
method to "Get the end offsets for the given partitions."
You could make use of Kafka's monitoring capabilities using JMX metrics. In the Documentation on Monitoring Kafka it is explained how to get the "Number of messages the consumer lags behind the producer by. Published by the consumer, not broker."
Upvotes: 1
Reputation: 26885
If your goal is to have the data in prometheus, you should use the records-lag
metrics emitted by Consumers.
In the last table in the Consumer Fetch Metrics section in the docs, you can see that Consumers emit lag (and lead) per topic-partition.
By default, metrics are emitted via JMX, so you could use Prometheus' JMX exporter to do the job for example.
Upvotes: 2