Reputation: 49
I have a Kafka Consumer application that currently has a custom CommonErrorHandler to handle @KafkaListener
exceptions when they are thrown. I have a custom FixedBackOff strategy in place where it will retry up to 3 times and then publish the record to a DLQ topic but every time it retries it prints the entire stack trace of the error and I wanted to know if I can suppress that to maybe a DEBUG level so it does not clutter the console output? This is currently what I have in place (Kotlin):
factory.setCommonErrorHandler(
DefaultErrorHandler({ record, exception ->
val thrownException = exception.cause ?: exception.localizedMessage
log.error(
"Kafka record offset=${record.offset()} is being sent to the DLQ due to=" +
"$thrownException"
)
val producerRecord = ProducerRecord<String, String>(dlqTopic, record.value().toString())
producerRecord.headers().add("dlq-failure-reason", thrownException.toString().toByteArray())
kafkaTemplate?.send(producerRecord)
}, FixedBackOff(0L, 2L))
I have looked here: https://docs.spring.io/spring-kafka/docs/current/reference/html/#error-handlers, and it mentions this, "All of the framework error handlers extend KafkaExceptionLogLevelAware
which allows you to control the level at which these exceptions are logged." However, I have tried setting the logging level by extending that class and setting it's log level explicitly but to no avail. Am I missing something?
Upvotes: 0
Views: 2873
Reputation: 174729
You don't need to subclass it, just set the property.
All that KafkaExceptionLogLevelAware
does is set the log level on the KafkaException
thrown by the error handler; it's the container that then logs it via the exception's selfLog
method.
You are logging it yourself at error
level (during recovery).
EDIT
Works fine for me...
@SpringBootApplication
public class So71373630Application {
public static void main(String[] args) {
SpringApplication.run(So71373630Application.class, args).close();
}
@Bean
DefaultErrorHandler eh() {
DefaultErrorHandler eh = new DefaultErrorHandler((r, ex) -> System.out.println("Failed:" + r.value()),
new FixedBackOff(0L, 3L));
eh.setLogLevel(Level.DEBUG);
return eh;
}
@KafkaListener(id = "so71373630", topics = "so71373630")
void listen(String in) {
System.out.println(in);
throw new RuntimeException("test");
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so71373630").partitions(1).replicas(1).build();
}
@Bean
ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("so71373630", "foo");
Thread.sleep(5000);
};
}
}
2022-03-07 10:10:35.460 INFO 31409 --- [o71373630-0-C-1] o.s.k.l.KafkaMessageListenerContainer : so71373630: partitions assigned: [so71373630-0]
foo
2022-03-07 10:10:35.492 INFO 31409 --- [o71373630-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-so71373630-1, groupId=so71373630] Seeking to offset 3 for partition so71373630-0
foo
2022-03-07 10:10:35.987 INFO 31409 --- [o71373630-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-so71373630-1, groupId=so71373630] Seeking to offset 3 for partition so71373630-0
foo
2022-03-07 10:10:36.490 INFO 31409 --- [o71373630-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-so71373630-1, groupId=so71373630] Seeking to offset 3 for partition so71373630-0
foo
Failed:foo
Upvotes: 1