app
app

Reputation: 783

How to filter Kafka messages before consumer consume in spring Kafka

I am using spring Kafka in my project and I want to filter messages before consumer consumes based on key and value.

Is it possible?

Upvotes: 11

Views: 27437

Answers (2)

Ryuzaki L
Ryuzaki L

Reputation: 40088

Yes in Spring Kafka you can filter messages before consumer consumes, there is an interface public interface RecordFilterStrategy<K,V> and a method in that interface boolean filter(org.apache.kafka.clients.consumer.ConsumerRecord<K,V> consumerRecord)

so you need to override this filter method and if it returns false then the consumer will consume the message, otherwise, if it returns true then the message will not be consumed (it will be filtered out)

You can apply this filtration on message key, value, or headers

consumerRecord.key() // will return key of message
consumerRecord.value() // will return the message
consumerRecord.headers() // will return the headers

Example code:

 @Bean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConcurrency(Integer.parseInt(threads));
    factory.setBatchListener(true);
    factory.setConsumerFactory(kafkaConsumerFactory());
    factory.getContainerProperties().setPollTimeout(Long.parseLong(pollTimeout));
    factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.BATCH);
    
    if(true) {
        factory.setRecordFilterStrategy(new RecordFilterStrategy<String, String>() {
            
            @Override
            public boolean filter(ConsumerRecord<String, String> consumerRecord) {
                if(consumerRecord.key().equals("ETEST")) {
                return false;
                }
            else {
                return true;
                 }
            }   
        });
    }
    
    return factory;
}

Upvotes: 16

Sreenivasa D
Sreenivasa D

Reputation: 111

adding to @Deadpool comments. It will work fine but it will not commit offset. so we will keep getting same message again but it will not consume. we need to set factory.setAckDiscarded(true); before setting factory.setRecordFilterStrategy() so that it will discard and commit offset.

Upvotes: 11

Related Questions