Raghav
Raghav

Reputation: 67

spring-kafka metrics not available

I am using spring-kafka 2.8.1 and spring boot 2.6.7
I am able to get spring_kafka_listener metrics but not the kafka_consumer metrics[messages received, lag, offsets etc].
I am creating kafka consumer using DefaultKafkaConsumerFactory as below:

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, config.getConsumerName());
........
props.put(ConsumerConfig.GROUP_ID_CONFIG, config.getGroupId());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, config.getKeyDeserializer());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, config.getValueDeserializer());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, config.getEnableAutoCommit());
return new DefaultKafkaConsumerFactory<>(props);

I did went through the spring-kafka docs, I believe an addition of MicrometerConsumerListener is requried. But I am not able to make it work.
Can someone help in this? Thanks!

EDIT:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> 
  kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, String> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}         

    @KafkaListener(topics = "tempTopic", groupId = "tempGroup")
public void listenGroup(String message) {
    System.out.println("Received Message: " + message);
}

EDIT: I have tried using this config, but still not coming.

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, config.getConsumerName());
........
props.put(ConsumerConfig.GROUP_ID_CONFIG, config.getGroupId());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, config.getKeyDeserializer());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, config.getValueDeserializer());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, config.getEnableAutoCommit());

ConsumerFactory<Object,Object> cf = new DefaultKafkaConsumerFactory<>(config);
cf.addListener(new MicrometerConsumerListener<>(new SimpleMeterRegistry()));
return cf;

EDIT:

private MeterRegistry meterRegistry;

@Inject
public KafkaConfiguration(MeterRegistry meterRegsitry){
this.meterRegistry = meterRegistry;
}
 {   props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
    props.put(ConsumerConfig.CLIENT_ID_CONFIG, config.getConsumerName());
    ........
    props.put(ConsumerConfig.GROUP_ID_CONFIG, config.getGroupId());
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, config.getKeyDeserializer());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, config.getValueDeserializer());
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, config.getEnableAutoCommit());
    
ConsumerFactory<Object,Object> cf = new DefaultKafkaConsumerFactory<>(config);
cf.addListener(new MicrometerConsumerListener<>(meterRegistry));
return cf;
}

Upvotes: 0

Views: 1163

Answers (1)

Gary Russell
Gary Russell

Reputation: 174779

cf.addListener(new MicrometerConsumerListener<>(new SimpleMeterRegistry()));

You are adding the meters to a different registry.

Use Boot's auto-configured meter registry instead.

Upvotes: 1

Related Questions