Reputation: 1021
How to compare List to Flux in non blocking way
Below the code in blocking way
public static void main(String[] args) {
List<String> all = List.of("A", "B", "C", "D");
Flux<String> valid = Flux.just("A", "D");
Map<Boolean, List<String>> collect = all.stream()
.collect(Collectors.groupingBy(t -> valid.collectList().block().contains(t)));
System.out.println(collect.get(Boolean.TRUE));
System.out.println(collect.get(Boolean.FALSE));
}
how to get it working in non-blocking way? Above is an example of what i am trying to do in web application. I receive list of object which is List all. Then i query database which return Flux . Flux returned by database will be subset of List all. I need to prepare two lists. List of items which are present in Flux of valid and List of items which are not present in Flux of valid
EDIT: I converted Flux to Mono and List to Mono,
public static void main(String[] args) {
Mono<List<String>> all = Mono.just(List.of("A", "B", "C", "D"));
Mono<List<String>> valid = Mono.just(List.of("A", "D"));
var exist = all.flatMap(a -> valid.map(v -> a.stream().collect(Collectors.groupingBy(v::contains))));
System.out.println(exist.block().get(Boolean.TRUE));
System.out.println(exist.block().get(Boolean.FALSE));
}
Upvotes: 0
Views: 254
Reputation: 16364
There is no straightforward way of achieve this in reactive programming without breaking some of its semantics.
If you reflect back on what reactive programming tris to achieve and your problem statement, you should notice that those won't play well that much together.
Reactive programming, as the name suggests, is about reacting to events which in your case would be valid
items emitted from your datastore. In a typical situation, you should have been programming your statement to compute some assertions around the emitted valid
items then emit these (or some other transformations downstream). Unfortunately, you won't be able to compute the all
and valid
items intersection and diversion without stopping at some point (otherwise how would you know that an item you assumed non-valid is not emitted at some point by the valid
publisher).
Though, to achieve the desired behavior, you will lean on memory to buffer items then trigger your validations.
Retrieving valid items should be achievable using the filterWhen
operator paired with the hasElement
one:
Flux<String> validItems = Flux.fromIterable(all)
.filterWhen(valid::hasElement);
To retrieve the invalid items, you can collect all
and validItems
merge
d together then filter out elements that do appear more than once:
Flux<String> inValidItems = Flux.fromIterable(all)
.mergeWith(validItems)
.collectList()
.flatMapIterable(list -> list.stream().filter(item -> Collections.frequency(list, item) == 1).collect(Collectors.toList()));
Upvotes: 1