Reputation: 67
I am using spring-kafka 2.8.1 and spring boot 2.6.7
I am able to get spring_kafka_listener metrics but not the kafka_consumer metrics[messages received, lag, offsets etc].
I am creating kafka consumer using DefaultKafkaConsumerFactory as below:
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, config.getConsumerName());
........
props.put(ConsumerConfig.GROUP_ID_CONFIG, config.getGroupId());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, config.getKeyDeserializer());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, config.getValueDeserializer());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, config.getEnableAutoCommit());
return new DefaultKafkaConsumerFactory<>(props);
I did went through the spring-kafka docs, I believe an addition of MicrometerConsumerListener is requried. But I am not able to make it work.
Can someone help in this?
Thanks!
EDIT:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@KafkaListener(topics = "tempTopic", groupId = "tempGroup")
public void listenGroup(String message) {
System.out.println("Received Message: " + message);
}
EDIT: I have tried using this config, but still not coming.
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, config.getConsumerName());
........
props.put(ConsumerConfig.GROUP_ID_CONFIG, config.getGroupId());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, config.getKeyDeserializer());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, config.getValueDeserializer());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, config.getEnableAutoCommit());
ConsumerFactory<Object,Object> cf = new DefaultKafkaConsumerFactory<>(config);
cf.addListener(new MicrometerConsumerListener<>(new SimpleMeterRegistry()));
return cf;
EDIT:
private MeterRegistry meterRegistry;
@Inject
public KafkaConfiguration(MeterRegistry meterRegsitry){
this.meterRegistry = meterRegistry;
}
{ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, config.getConsumerName());
........
props.put(ConsumerConfig.GROUP_ID_CONFIG, config.getGroupId());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, config.getKeyDeserializer());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, config.getValueDeserializer());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, config.getEnableAutoCommit());
ConsumerFactory<Object,Object> cf = new DefaultKafkaConsumerFactory<>(config);
cf.addListener(new MicrometerConsumerListener<>(meterRegistry));
return cf;
}
Upvotes: 0
Views: 1163
Reputation: 174779
cf.addListener(new MicrometerConsumerListener<>(new SimpleMeterRegistry()));
You are adding the meters to a different registry.
Use Boot's auto-configured meter registry instead.
Upvotes: 1