Lazaro R.
Lazaro R.

Reputation: 49

Spring Kafka set logging level on container's CommonErrorHandler

I have a Kafka Consumer application that currently has a custom CommonErrorHandler to handle @KafkaListener exceptions when they are thrown. I have a custom FixedBackOff strategy in place where it will retry up to 3 times and then publish the record to a DLQ topic but every time it retries it prints the entire stack trace of the error and I wanted to know if I can suppress that to maybe a DEBUG level so it does not clutter the console output? This is currently what I have in place (Kotlin):

factory.setCommonErrorHandler(
            DefaultErrorHandler({ record, exception ->
                val thrownException = exception.cause ?: exception.localizedMessage
                log.error(
                    "Kafka record offset=${record.offset()} is being sent to the DLQ due to=" +
                        "$thrownException"
                )
                val producerRecord = ProducerRecord<String, String>(dlqTopic, record.value().toString())
                producerRecord.headers().add("dlq-failure-reason", thrownException.toString().toByteArray())
                kafkaTemplate?.send(producerRecord)
            }, FixedBackOff(0L, 2L))

I have looked here: https://docs.spring.io/spring-kafka/docs/current/reference/html/#error-handlers, and it mentions this, "All of the framework error handlers extend KafkaExceptionLogLevelAware which allows you to control the level at which these exceptions are logged." However, I have tried setting the logging level by extending that class and setting it's log level explicitly but to no avail. Am I missing something?

Upvotes: 0

Views: 2873

Answers (1)

Gary Russell
Gary Russell

Reputation: 174729

You don't need to subclass it, just set the property.

All that KafkaExceptionLogLevelAware does is set the log level on the KafkaException thrown by the error handler; it's the container that then logs it via the exception's selfLog method.

You are logging it yourself at error level (during recovery).

EDIT

Works fine for me...

@SpringBootApplication
public class So71373630Application {

    public static void main(String[] args) {
        SpringApplication.run(So71373630Application.class, args).close();
    }

    @Bean
    DefaultErrorHandler eh() {
        DefaultErrorHandler eh = new DefaultErrorHandler((r, ex) -> System.out.println("Failed:" + r.value()),
                new FixedBackOff(0L, 3L));
        eh.setLogLevel(Level.DEBUG);
        return eh;
    }

    @KafkaListener(id = "so71373630", topics = "so71373630")
    void listen(String in) {
        System.out.println(in);
        throw new RuntimeException("test");
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so71373630").partitions(1).replicas(1).build();
    }

    @Bean
    ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("so71373630", "foo");
            Thread.sleep(5000);
        };
    }

}
2022-03-07 10:10:35.460  INFO 31409 --- [o71373630-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : so71373630: partitions assigned: [so71373630-0]
foo
2022-03-07 10:10:35.492  INFO 31409 --- [o71373630-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-so71373630-1, groupId=so71373630] Seeking to offset 3 for partition so71373630-0
foo
2022-03-07 10:10:35.987  INFO 31409 --- [o71373630-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-so71373630-1, groupId=so71373630] Seeking to offset 3 for partition so71373630-0
foo
2022-03-07 10:10:36.490  INFO 31409 --- [o71373630-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-so71373630-1, groupId=so71373630] Seeking to offset 3 for partition so71373630-0
foo
Failed:foo

Upvotes: 1

Related Questions