Reputation: 8360
I am using Spring Cloud (Hoxton.SR8) Stream with Kafka-Streams binder in my project.
Can we check message header before deserialzing the payload? I want to do this to achieve header based filter of incoming messages. Deserialization can spend some CPU, before that happens can we check headers and discard the message if certain value in header doesn't match the condition?
I tried using ListernContainerCustomizer like this.
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer()
{
return (container, dest, group) -> {
container.setRecordInterceptor(record -> {
System.out.println(">>>> Received record, checking headers");
Headers headers = record.headers();
System.out.println(">>>> Header length: "+headers.toArray().length);
for(Header header: headers)
{
if(header.key().equalsIgnoreCase("eventtype"))
{
String value = String.valueOf(header.value());
if(!value.equalsIgnoreCase("PUBLISHED")) {
System.out.println("Event type from header not PUBLISHED, skipping record");
return null;
}
}
}
System.out.println("Processing record");
return record;
});
};
}
But it is not doing anything. I tried printing loaded beans, and this customizer bean does get loaded but it does nothing.
Please help.
Upvotes: 0
Views: 725
Reputation: 181
Instead of use setRecordInterceptor you can use setRecordFilterStrategy:
You can filter by param of message:
factory.setRecordFilterStrategy(message -> {
String messageValue = (String) message.value().getParamName();
if ( // Your business logic) {
return false;
}
LOGGER.info("Message discarded : {}", messageValue);
return true;
});
Or by headers:
factory.recordFilterStrategy(message -> {
Header header = message.headers().lastHeader("XXX");
if(String.valueOf(header.value()).equals(// Your business logic)){
return false;
}
LOGGER.info("Message discarded : {}", String.valueOf(header.value()));
return true;
});
Upvotes: 0