Reputation: 6195
I have a Flux and I want to convert it to List. How can I do that?
Flux<Object> getInstances(String serviceId); // Current one
List<Object> getInstances(String serviceId); // Demanded one
Java 8 or reactive components have a prepared method to map or convert it to List ??
I should use .map()
final List<ServiceInstance> sis = convertedStringList.parallelStream()
.map( this.reactiveDiscoveryClient::getInstances )
// It should be converted to List<Object>
Upvotes: 7
Views: 10576
Reputation: 1166
If you want to convert Project Reactor Flux into java.util.List without blocking the thread and terminate async process, you can do it this way
Flux.fromIterable("Str1","Str2","Str3") //Flux<String>
.collectList() //Mono<List<String>>
.flatMap(listStrs -> {Do whatever you want with the list})
The key function is collectionList()
that gather all result into single Mono<List<String>>
Note: collectionList()
does not block the thread BUT introduces a form of "wait" to collect all the elements from the previous operation and return them into a list
Upvotes: 0
Reputation: 2639
A fair warning before diving into anything else: Converting a Flux
to a List
/Stream
makes the whole thing not reactive in the strict sense of the concept because you are leaving the push domain and trading it with a pull domain. You may or may not want this (usually you don't) depending on the use-case. Just wanted to leave the note.
According to the Flux documentation, the collectList
method will return a Mono<List<T>>
. It will return immediately, but it's not the resulting list itself, but a lazy structure, the Mono
, that promises the result will eventually be there when the sequence is completed.
According to the Mono documentation, the block
method will return the contents of the Mono when it completes. Keep in mind that block
may return null.
Combining both, you could use someFlux.collectList().block()
. Provided that someFlux
is a Flux<Object>
, the result would be a List<Object>
.
The block
method won't return anything if the Flux is infinite. As an example, the following will return a list with two words:
Flux.fromArray(new String[]{"foo", "bar"}).collectList().block()
But the following will never return:
Flux.interval(Duration.ofMillis(1000)).collectList().block()
To prevent blocking indefinitely or for too long, you may pass a Duration
argument to block
, but that will timeout with an exception when the subscription does not complete on time.
According to the Flux documentation, the toStream
method converts a Flux<T>
into a Stream<T>
. This is more friendly to operators such as flatMap
. Mind this simple example, for the sake of demonstration:
Stream.of("f")
.flatMap(letter ->
Flux.fromArray(new String[]{"foo", "bar"})
.filter(word -> word.startsWith(letter)).toStream())
.collect(Collectors.toList())
One could simply use .collectList().block().stream()
, but not only it's less readable, but it could also result in NPE if block
returned null. This approach does not finish for an infinite Flux as well, but because this is a stream of unknown size, you can still use some operations on it before it's complete, without blocking.
Upvotes: 10