Reputation: 783
I am using spring Kafka in my project and I want to filter messages before consumer consumes based on key and value.
Is it possible?
Upvotes: 11
Views: 27437
Reputation: 40088
Yes in Spring Kafka you can filter messages before consumer consumes, there is an interface public interface RecordFilterStrategy<K,V>
and a method in that interface boolean filter(org.apache.kafka.clients.consumer.ConsumerRecord<K,V> consumerRecord)
so you need to override this filter
method and if it returns false then the consumer will consume the message, otherwise, if it returns true then the message will not be consumed (it will be filtered out)
You can apply this filtration on message key, value, or headers
consumerRecord.key() // will return key of message
consumerRecord.value() // will return the message
consumerRecord.headers() // will return the headers
Example code:
@Bean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(Integer.parseInt(threads));
factory.setBatchListener(true);
factory.setConsumerFactory(kafkaConsumerFactory());
factory.getContainerProperties().setPollTimeout(Long.parseLong(pollTimeout));
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.BATCH);
if(true) {
factory.setRecordFilterStrategy(new RecordFilterStrategy<String, String>() {
@Override
public boolean filter(ConsumerRecord<String, String> consumerRecord) {
if(consumerRecord.key().equals("ETEST")) {
return false;
}
else {
return true;
}
}
});
}
return factory;
}
Upvotes: 16
Reputation: 111
adding to @Deadpool comments. It will work fine but it will not commit offset. so we will keep getting same message again but it will not consume. we need to set factory.setAckDiscarded(true);
before setting factory.setRecordFilterStrategy()
so that it will discard and commit offset.
Upvotes: 11