Reputation: 16992
I have created a spring kafka consumer that reads from a topic. Is there a way to print lag information similar to how we print partition info?
Upvotes: 4
Views: 6676
Reputation: 101
Whereas no source code has been provided, I am under the assumption that you implemented your consumer through the @KafkaListener annotation. I have overcome the same problem you described making use of the org.apache.kafka.clients.consumer.Consumer interface, as stated here. It can be declared as a parameter in the consumer method under the @KafkaListener annotation. This interface provides the metrics() method which contains the consumer lag information stored in the records-max-lag property.
private static final Logger LOGGER = LoggerFactory.getLogger(YourClass.class);
@KafkaListener(topics = "your-topic", groupId = "your-group-id", id = "your-client-id", containerFactory = "kafkaListenerContainerFactory")
public void listenerExample(List<String> msgs, @Header(KafkaHeaders.OFFSET) List<Long> offsets, Acknowledgment ack,
Consumer<?, ?> consumer) {
String lag = consumer.metrics().values().stream().filter(m -> "records-lag-max".equals(m.metricName().name()))
.map(Metric::metricValue).map(Object::toString).distinct()
.collect(Collectors.joining("", "[Kafka current consumer lag]", " records"));
LOGGER.info(lag);
}
In this case, I explicitly chose the records-lag-max property. You could have chosen any other consumer metric, the list is at Confluent Docs.
The above code snippet will have the following output:
[Kafka current consumer lag] X records
Where X is the maximum lag in terms of number of records for any partition in this window.
I am using version 2.3.3.RELEASE of the Spring Kafka library
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.3.3.RELEASE</version>
</dependency>
Upvotes: 7
Reputation: 174574
There's a command line tool...
$ kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group myGroup
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
myTopic 0 66 66 0 -
EDIT
You could run the command line tool and capture the output...
Process process = new ProcessBuilder()
.command("/usr/local/bin/kafka-consumer-groups", "--bootstrap-server", "localhost:9092",
"--describe", "--group", "siTestGroup")
.start();
InputStream inputStream = process.getInputStream();
process.waitFor(10, TimeUnit.SECONDS);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
FileCopyUtils.copy(inputStream, baos);
System.out.println(new String(baos.toByteArray()));
Upvotes: 1