x__dos
x__dos

Reputation: 1823

spring kafka: filtering KafkaNull values from payload

My consumer configured as follows:

    @Bean
    public Consumer<Message<List<Foo>>> consumer() {
        return message ->  {
            message.getPayload().forEach(it -> {
              // process
            })
        };
    }

I've also configured ErrorHandlingDeserializer which may produce KafkaNull values in message payload collection. The problem that I cannot filter such values because accessing collection with forEach() method produces ClassCastException: java.lang.ClassCastException: class org.springframework.kafka.support.KafkaNull cannot be cast to class Foo

How can I exclude KafkaNull values from processing (without changing signature of consumer to Consumer<Message<List<Object>>>)?

Upvotes: 2

Views: 1182

Answers (1)

Oleg Zhurakousky
Oleg Zhurakousky

Reputation: 6126

So we don't have a filtering function at the moment. I did raise the issue - https://github.com/spring-cloud/spring-cloud-function/issues/736

But for now the best approach for you would be to do something like this:

public Consumer<Message<List<Object>>> consumer() {
        return message ->  {
            message.getPayload().stream().filter(it -> it instanceof Foo).forEach(it -> {
              // process
            })
        };
}

Upvotes: 3

Related Questions