Artyom
Artyom

Reputation: 35

Filter Flux<String> publisher by values from another Flux<String> publisher

I want to filter rangeFlux by the values in the filterFlux. Is there any way how to achieve this without blocking in the line 2 with toStream()?

        Flux<String> filterFlux = Flux.just("2", "3", "4", "5");
        List<String> collect = filterFlux.toStream().collect(Collectors.toList());

        Flux<Integer> rangeFlux = Flux.range(1, 10);
        StepVerifier.create(rangeFlux.map(String::valueOf)
                .filter(new Predicate<String>() {
                    @Override
                    public boolean test(String s) {
                        return !collect.contains(s);
                    }
                }))
            .expectNext("1")
            .expectNext("6")
            .expectNext("7")
            .expectNext("8")
            .expectNext("9")
            .expectNext("10")
            .expectComplete()
            .verify();

Upvotes: 0

Views: 638

Answers (1)

cpigeon
cpigeon

Reputation: 455

Ah, this is a tricky one.

You could use the filterWhen function on Flux - it tests each value emitted by the given flux against a Boolean publisher.

filterFlux can be used with the filterWhen function as the source publisher for the filtering.

So to put that into code, it would look something like this:

Flux<String> filterFlux = Flux
    .just("2", "3", "4", "5");

Flux<String> rangeFlux = Flux
    .range(1, 10)
    .map(String::valueOf);

Flux<String> filteredFlux = rangeFlux
    .filterWhen(s1 -> filterFlux.all(s2 -> !s1.equals(s2)));

StepVerifier
    .create(filteredFlux)
    .expectNext("1")
    .expectNext("6")
    .expectNext("7")
    .expectNext("8")
    .expectNext("9")
    .expectNext("10")
    .expectComplete()
    .verify();

Edit: Updated with suggestion from Patrick Hooijer

Upvotes: 2

Related Questions