kamal
kamal

Reputation: 55

Filter messages before deserialization based on headers

Sometimes messages can be filtered out before the deserialization based on header values . Are there any existing patterns for this scenario using spring kafka. I am thinking implementing similar to ErrorHandlingDeserializer in addition to delegate take filter predicate also as property. Any suggestions? thanks.

Upvotes: 3

Views: 5886

Answers (1)

Gary Russell
Gary Russell

Reputation: 174729

Yes, you can use the same technique used by the ErrorHandlingDeserializer to return a "marker" object instead of doing the deserialization, then add a RecordFilterStrategy, that filters records with such objects, to the listener (container factory when using @KafkaListener or use a filtering adapter for an explicit listener).

EDIT

Spring Boot and adding a filter...

    @Bean
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        kafkaConsumerFactory.setRecordFilterStrategy(myFilter());
        return factory;
    }

Upvotes: 2

Related Questions