Punter Vicky
Punter Vicky

Reputation: 16992

Spring Kafka Consumer - Print Kafka Lag Info

I have created a spring kafka consumer that reads from a topic. Is there a way to print lag information similar to how we print partition info?

Upvotes: 4

Views: 6676

Answers (2)

Felipe Dias
Felipe Dias

Reputation: 101

Whereas no source code has been provided, I am under the assumption that you implemented your consumer through the @KafkaListener annotation. I have overcome the same problem you described making use of the org.apache.kafka.clients.consumer.Consumer interface, as stated here. It can be declared as a parameter in the consumer method under the @KafkaListener annotation. This interface provides the metrics() method which contains the consumer lag information stored in the records-max-lag property.

private static final Logger LOGGER = LoggerFactory.getLogger(YourClass.class);

@KafkaListener(topics = "your-topic", groupId = "your-group-id", id = "your-client-id", containerFactory = "kafkaListenerContainerFactory")
public void listenerExample(List<String> msgs, @Header(KafkaHeaders.OFFSET) List<Long> offsets, Acknowledgment ack,
        Consumer<?, ?> consumer) {



    String lag = consumer.metrics().values().stream().filter(m -> "records-lag-max".equals(m.metricName().name()))
            .map(Metric::metricValue).map(Object::toString).distinct()
            .collect(Collectors.joining("", "[Kafka current consumer lag]", " records"));


    LOGGER.info(lag);


}

In this case, I explicitly chose the records-lag-max property. You could have chosen any other consumer metric, the list is at Confluent Docs.

The above code snippet will have the following output: [Kafka current consumer lag] X records Where X is the maximum lag in terms of number of records for any partition in this window.

Important:

I am using version 2.3.3.RELEASE of the Spring Kafka library

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.3.3.RELEASE</version>
</dependency>

Upvotes: 7

Gary Russell
Gary Russell

Reputation: 174574

There's a command line tool...

$ kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group myGroup

TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID                                       HOST                           CLIENT-ID
myTopic                        0          66              66              0          -                                           

EDIT

You could run the command line tool and capture the output...

Process process = new ProcessBuilder()
        .command("/usr/local/bin/kafka-consumer-groups", "--bootstrap-server", "localhost:9092",
                "--describe", "--group", "siTestGroup")
        .start();
InputStream inputStream = process.getInputStream();
process.waitFor(10, TimeUnit.SECONDS);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
FileCopyUtils.copy(inputStream, baos);
System.out.println(new String(baos.toByteArray()));

Upvotes: 1

Related Questions