Reputation: 237
I have a kafka listener where if implementation throws exceptions, it will retry a few times. When exception is thrown, it is logged as error. This pollutes our Sentry log. I would like to change Kafka's consumers error level to warn level.
Details:
I have a Kafka listener and some events has to be retried.
@KafkaListener(...)
@RetryableTopic(..)
public void consume(ConsumerRecord<?,?> event) {
service.sayHello(...); //this call can throw UnhappyException
}
In the case UnHappyException is thrown, it is logged as
> Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException:
> Listener method 'public void
> com.company.hello.world.listener.MyListener.consume(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.Object,
> ....)' threw
> exception; nested exception is
> com.hello.world.somepackage.exceptions.UnhappyException:
> Animal id 4444 is Unhappy
Our application will try to process message again (if exception occurs), and at some point the message will either land in a dead letter topic, or be processed. Unfortunately, due to our current architecture, it is common for the animal id to be not present in our service, thus it could throw exceptions a few times until it gets processed.
The message gets logged as "ERROR" and it is creating "false" alerts in the Sentry alert platform.
Is there a way to change the error UnHappyException to warn level, without having to change the sentry configuration?
I tried to look at ConcurrentKafkaListenerContainerFactory and see if I could pass in custom error handler, but it does not hit break point at all.
@Bean
public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory(
ConsumerFactory<Object, Object> consumerFactory,
KafkaTemplate<String, Object> kafkaTemplate) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
/*
// Original implementation
factory.setCommonErrorHandler(new DefaultErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate), backOff));
*/
// here I created CustomErrorHandler and tried to extend DefaultErrorHandler and override methods but they do not get hit by debugger at all.
factory.setCommonErrorHandler(new CustomErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate), backOff));
Upvotes: 1
Views: 1033
Reputation: 237
It turns out retryable topic will not log errors as exception until it hits dead letter topic, so no action was needed.
Upvotes: 2