Reputation: 1042
I have an application that consumes from kafka topics and produce to kafka topics. I am trying to set up Grafana dashboard to display kafka metrics. I am trying to expose the kafka metrics to prometheus and so Grafana dashboard can fetch these metrics and display them further.
Below are the libraries and properties I am using.
org.apache.kafka:kafka-clients:jar:2.5.1:compile
io.micrometer:micrometer-registry-prometheus:jar:1.5.4:compile
io.micrometer:micrometer-jersey2:jar:1.5.4:compile
application properties:
spring.jmx.enabled=true
management.endpoints.web.base-path=/
management.endpoint.metrics.enabled=true
management.endpoints.web.exposure.include=prometheus
management.endpoint.prometheus.enabled=true
management.metrics.export.prometheus.enabled=true
Using micrometer, the prometheus is displaying jvm, tomcat related metrics and also the custom metrics. But the kafka metrics are not being exposed. I tried to debug and left with no clue. Any suggestions would be a great help.
As a heads-up, I am not using spring kafka annotations. I am running it as standalone multi-threaded where I fetch records using consumer.poll(1000L)
method.
kafka consumer is created as: new KafkaConsumer<>(getProps())
and kafka producer is created as new KafkaProducer<>(props)
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, config.getConsumerName());
props.put(ConsumerConfig.GROUP_ID_CONFIG, config.getGroupId());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, config.getKeyDeserializer());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, config.getValueDeserializer());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, config.getEnableAutoCommit());
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, config.getReceiveBufferBytes());
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, config.getMaxPartitionFetchBytes());
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, config.getMaxPollRecords());
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
protected KafkaConsumer<byte[], Event> getConsumer(List<String> inputTopics) {
KafkaConsumer<byte[], Event> consumer = new KafkaConsumer<>(props));
consumer.subscribe(inputTopics, new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
logger.info("PARTITIONS revoked: " + partitions);
consumer.commitAsync();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
logger.info("PARTITIONS assigned: " + partitions);
}
});
return consumer;
}
Upvotes: 0
Views: 3372
Reputation: 174484
I already answered you on Gitter - the code above is not using Spring for Apache Kafka, you are creating your own consumer and producer.
If you use Spring for Apache Kafka's DefaultKafkaConsumerFactory
, you can add a MicrometerConsumerListener
to it and Spring will register metrics for each consumer with the meter registry via the KafkaClientMetrics
. If you create your own consumers, you have to do that yourself.
Same thing for producers.
Upvotes: 2