Lorenzo Panetta
Lorenzo Panetta

Reputation: 85

Spring Kafka - Centralized @KafkaListener logging

I'd like to centralize all the @KafkaTemplate message logging, when a message arrives, when the processing success, and when an exception is thrown by a @KafkaListener

I don't want to use AOP, due to the performance impact, i would prefer to use a RecordInterceptor like this: Log the exceptions thrown in spring kafka listener

I'm using the KafkaAutoConfiguration (all configured via properties), so the auto configuration is creating a ConcurrentKafkaListenerContainerFactory bean without a RecordInterceptor configured

I thought that having a RecordInterceptor bean in the spring context would be enough, but it's not the case, it doesn't intercept anything (the @KafkaListener keep receiveing messages correctly)

@Slf4j
@Component
public class LoggingRecordInterceptor implements RecordInterceptor<String, Object> {

    @Override
    public ConsumerRecord<String, Object> intercept(ConsumerRecord record) {
        log.info("sample logging");
        return null;
    }

    @Override
    public ConsumerRecord<String, Object> intercept(ConsumerRecord record, Consumer consumer) {
        log.info("sample logging");
        return RecordInterceptor.super.intercept(record, consumer);
    }

    @Override
    public void success(ConsumerRecord record, Consumer consumer) {
        log.info("sample logging");
        RecordInterceptor.super.success(record, consumer);
    }

    @Override
    public void failure(ConsumerRecord record, Exception exception, Consumer consumer) {
        log.info("sample logging");
        RecordInterceptor.super.failure(record, exception, consumer);
    }
}

What am i missing? How can i configure it using KafkaAutoConfiguration?

Thanks in advance

Upvotes: 1

Views: 1781

Answers (1)

Lorenzo Panetta
Lorenzo Panetta

Reputation: 85

Solved. I think it's a Spring bug.

KafkaAnnotationDrivenConfiguration constructor has the following signature:

KafkaAnnotationDrivenConfiguration(...,
            ObjectProvider<RecordInterceptor<Object, Object>> recordInterceptor) {}

My LoggingRecordInterceptor was implementing RecordInterceptor<String, Object>

I thought it was supposed to work (since String obviously extends Object), but actually spring was not able to inject it correctly.

So i changed my LoggingRecordInterceptor as the following, and now works fine.

But i think it's a spring bug, it should handle generics, since the RecordInterceptor interface suggests it.

@Slf4j
@Component
public class LoggingRecordInterceptor implements RecordInterceptor<Object, Object> {

    @Override
    public ConsumerRecord<Object, Object> intercept(@NonNull ConsumerRecord<Object, Object> consumerRecord) {
        log.info("Topic: {} - Message received - Payload: {}",
                consumerRecord.topic(), consumerRecord.value());
        return consumerRecord;
    }

    @Override
    public void success(@NonNull ConsumerRecord<Object, Object> consumerRecord,
                        @NonNull Consumer<Object, Object> consumer) {
        log.info("Topic: {} - Message processed - Payload: {}",
                consumerRecord.topic(), consumerRecord.value());
        RecordInterceptor.super.success(consumerRecord, consumer);
    }

    @Override
    public void failure(@NonNull ConsumerRecord<Object, Object> consumerRecord,
                        @NonNull Exception exception,
                        @NonNull Consumer<Object, Object> consumer) {
        log.error("Topic: {} - Error processing a message - Payload: {}",
                consumerRecord.topic(), consumerRecord.value());
        RecordInterceptor.super.failure(consumerRecord, exception, consumer);
    }
}

Upvotes: 0

Related Questions