Reputation: 1823
My consumer configured as follows:
@Bean
public Consumer<Message<List<Foo>>> consumer() {
return message -> {
message.getPayload().forEach(it -> {
// process
})
};
}
I've also configured ErrorHandlingDeserializer
which may produce KafkaNull
values in message payload collection. The problem that I cannot filter such values because accessing collection with forEach()
method produces ClassCastException:
java.lang.ClassCastException: class org.springframework.kafka.support.KafkaNull cannot be cast to class Foo
How can I exclude KafkaNull values from processing (without changing signature of consumer to Consumer<Message<List<Object>>>
)?
Upvotes: 2
Views: 1182
Reputation: 6126
So we don't have a filtering function at the moment. I did raise the issue - https://github.com/spring-cloud/spring-cloud-function/issues/736
But for now the best approach for you would be to do something like this:
public Consumer<Message<List<Object>>> consumer() {
return message -> {
message.getPayload().stream().filter(it -> it instanceof Foo).forEach(it -> {
// process
})
};
}
Upvotes: 3