Reputation: 55
Sometimes messages can be filtered out before the deserialization based on header values . Are there any existing patterns for this scenario using spring kafka. I am thinking implementing similar to ErrorHandlingDeserializer in addition to delegate take filter predicate also as property. Any suggestions? thanks.
Upvotes: 3
Views: 5886
Reputation: 174729
Yes, you can use the same technique used by the ErrorHandlingDeserializer
to return a "marker" object instead of doing the deserialization, then add a RecordFilterStrategy
, that filters records with such objects, to the listener (container factory when using @KafkaListener
or use a filtering adapter for an explicit listener).
EDIT
Spring Boot and adding a filter...
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
kafkaConsumerFactory.setRecordFilterStrategy(myFilter());
return factory;
}
Upvotes: 2