Reputation: 275
I have an application using spring boot & spring kafka that gets the delivery attempt header in a record interceptor so that I can include it in log messages. It has been working well until I upgraded to spring boot 2.6.3 and spring kafka 2.8.2 (from 2.5.5/2.7.7)
Now when I try to read the delivery attempt header it is not available. If I try and do the exact same thing within a message listener then it works fine so the header is clearly there.
This is what a simplified record interceptor and the listener container factory look like:
@Bean
public RecordInterceptor<Object, Object> recordInterceptor() {
return record -> {
int delivery = ByteBuffer.wrap(record.headers().lastHeader(KafkaHeaders.DELIVERY_ATTEMPT).value()).getInt();
log.info("delivery " + delivery);
return record;
};
}
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.getContainerProperties().setDeliveryAttemptHeader(true);
factory.setRecordInterceptor(recordInterceptor());
return factory;
}
I can't see anything in the spring docs suggesting the behaviour should have changed. Any ideas?
Upvotes: 0
Views: 1881
Reputation: 174729
This is a bug; I opened an issue. https://github.com/spring-projects/spring-kafka/issues/2082
Upvotes: 1