Reputation: 4064
Is there a more efficient/simpler way of getting the size / latest offsets of a topic/partitions using the newest Kafka client 2.4 APIs in Java? And then, calculate a Lag for a consumer group by comparing that group's offsets with the size of the topic...
I know this question has been asked for older Kafka versions and there is also a way to get this info from JMX metrics exposed by Kafka, but I am stuck with a legacy app that needs to do it in Java but with latest 2.4 Kafka libs.
The usual way of getting this info , as far as I understand is:
public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options)
consumer.seekToEnd(...)
consumer.position(...)
Thus, determining the last offset is a pretty heavy operation ... So my question is: is there a more efficient way of getting the last offsets for a topic without using the dummy consumer, maybe in the latest 2.4 APIs? The topic/partition size info is really independent of any consumers, so it seems logical to be able to get it without the use of consumers...
Thank you!
Marina
Upvotes: 4
Views: 10083
Reputation: 4793
Using pure AdminClient
Version of apache api that I'm using:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.5.0</version>
</dependency>
example of offsets and lag:
public void printOffsets(String groupId) {
try {
// Get offsets committed by the group
Map<TopicPartition, OffsetAndMetadata> offsets = adminClient.listConsumerGroupOffsets(groupId).partitionsToOffsetAndMetadata().get();
Map<TopicPartition, OffsetSpec> requestLatestOffsets = new HashMap<>();
Map<TopicPartition, OffsetSpec> requestEarliestOffsets = new HashMap<>();
// For all topics and partitions that have offsets committed by the group, get their latest offsets, earliest offsets
for(TopicPartition tp: offsets.keySet()) {
requestLatestOffsets.put(tp, OffsetSpec.latest());
requestEarliestOffsets.put(tp, OffsetSpec.earliest());
}
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> latestOffsets = adminClient.listOffsets(requestLatestOffsets).all().get();
for (Map.Entry<TopicPartition, OffsetAndMetadata> e: offsets.entrySet()) {
String topic = e.getKey().topic();
int partition = e.getKey().partition();
long committedOffset = e.getValue().offset();
long latestOffset = latestOffsets.get(e.getKey()).offset();
// System.out.println("Topic: " + entry.getKey().topic() + " Partition: " + entry.getKey().partition() + " Offset: " + entry.getValue());
System.out.println("Topic: " + topic + " Partition: " + partition + " consumed: " + committedOffset + " produced: " + latestOffset + " Lag: " + (latestOffset - committedOffset));
}
} catch (Exception e) {
e.printStackTrace();
}
}
I got it from this:
Upvotes: 1
Reputation: 24202
externally to the kafka consuming application you are correct, your options are to look at partition end offsets vs the latest checkpointed positions of the consumer group (assuming the consumers in question even use kafka to store offsets).
there are tools that will monitor this for you, such as burrow.
However, if you have access to the consuming application itself there is a more accurate way. here's a list of all consumer sensors (exposed either via API or jmx by default) https://kafka.apache.org/documentation/#consumer_fetch_monitoring.
there is a per-partition records-lag
metric. its updated every time poll() is called so is more accurate and lower latency than committed offsets. the only complication is you'd need to sum the values of these sensors across all partitions the consumer is assigned.
here's how to get at it via KafkaConsumer.metrics():
private long calcTotalLag(Map<MetricName, ? extends Metric> metrics) {
long totalLag = 0;
for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
MetricName metricName = entry.getKey();
Metric metric = entry.getValue();
Map<String, String> tags = metricName.tags();
if (metricName.name().equals("records-lag") && tags.containsKey("partition")) {
totalLag += ((Number) metric.metricValue()).longValue();
}
}
return totalLag;
}
Upvotes: 3