Deepak
Deepak

Reputation: 1042

Enable kafka client metrics in spring app prometheus

I have an application that consumes from kafka topics and produce to kafka topics. I am trying to set up Grafana dashboard to display kafka metrics. I am trying to expose the kafka metrics to prometheus and so Grafana dashboard can fetch these metrics and display them further.

Below are the libraries and properties I am using.

org.apache.kafka:kafka-clients:jar:2.5.1:compile
io.micrometer:micrometer-registry-prometheus:jar:1.5.4:compile
io.micrometer:micrometer-jersey2:jar:1.5.4:compile

application properties:

spring.jmx.enabled=true
management.endpoints.web.base-path=/
management.endpoint.metrics.enabled=true
management.endpoints.web.exposure.include=prometheus
management.endpoint.prometheus.enabled=true
management.metrics.export.prometheus.enabled=true

Using micrometer, the prometheus is displaying jvm, tomcat related metrics and also the custom metrics. But the kafka metrics are not being exposed. I tried to debug and left with no clue. Any suggestions would be a great help.

As a heads-up, I am not using spring kafka annotations. I am running it as standalone multi-threaded where I fetch records using consumer.poll(1000L) method.

kafka consumer is created as: new KafkaConsumer<>(getProps()) and kafka producer is created as new KafkaProducer<>(props)

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, config.getConsumerName());

props.put(ConsumerConfig.GROUP_ID_CONFIG, config.getGroupId());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, config.getKeyDeserializer());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, config.getValueDeserializer());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, config.getEnableAutoCommit());
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, config.getReceiveBufferBytes());
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, config.getMaxPartitionFetchBytes());
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, config.getMaxPollRecords());
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());

protected KafkaConsumer<byte[], Event> getConsumer(List<String> inputTopics) {
    KafkaConsumer<byte[], Event> consumer = new KafkaConsumer<>(props));
    consumer.subscribe(inputTopics, new ConsumerRebalanceListener() {
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            logger.info("PARTITIONS revoked: " + partitions);
            consumer.commitAsync();
        }

        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            logger.info("PARTITIONS assigned: " + partitions);
        }
    });
    return consumer;
}

Upvotes: 0

Views: 3372

Answers (1)

Gary Russell
Gary Russell

Reputation: 174484

I already answered you on Gitter - the code above is not using Spring for Apache Kafka, you are creating your own consumer and producer.

If you use Spring for Apache Kafka's DefaultKafkaConsumerFactory, you can add a MicrometerConsumerListener to it and Spring will register metrics for each consumer with the meter registry via the KafkaClientMetrics. If you create your own consumers, you have to do that yourself.

Same thing for producers.

Upvotes: 2

Related Questions