Reputation: 35
I want to filter rangeFlux by the values in the filterFlux. Is there any way how to achieve this without blocking in the line 2 with toStream()?
Flux<String> filterFlux = Flux.just("2", "3", "4", "5");
List<String> collect = filterFlux.toStream().collect(Collectors.toList());
Flux<Integer> rangeFlux = Flux.range(1, 10);
StepVerifier.create(rangeFlux.map(String::valueOf)
.filter(new Predicate<String>() {
@Override
public boolean test(String s) {
return !collect.contains(s);
}
}))
.expectNext("1")
.expectNext("6")
.expectNext("7")
.expectNext("8")
.expectNext("9")
.expectNext("10")
.expectComplete()
.verify();
Upvotes: 0
Views: 638
Reputation: 455
Ah, this is a tricky one.
You could use the filterWhen
function on Flux
- it tests each value emitted by the given flux against a Boolean
publisher.
filterFlux
can be used with the filterWhen
function as the source publisher for the filtering.
So to put that into code, it would look something like this:
Flux<String> filterFlux = Flux
.just("2", "3", "4", "5");
Flux<String> rangeFlux = Flux
.range(1, 10)
.map(String::valueOf);
Flux<String> filteredFlux = rangeFlux
.filterWhen(s1 -> filterFlux.all(s2 -> !s1.equals(s2)));
StepVerifier
.create(filteredFlux)
.expectNext("1")
.expectNext("6")
.expectNext("7")
.expectNext("8")
.expectNext("9")
.expectNext("10")
.expectComplete()
.verify();
Edit: Updated with suggestion from Patrick Hooijer
Upvotes: 2