SledgeHammer
SledgeHammer

Reputation: 7681

Iterate over the same flux twice in parallel

Let's say I have:

flux1 = Flux.fromIterable(List.of(1, 2, 3, 4, 5))

I need to do 2 different things with the flux and then take the result of both and use that. I.e. something like:

return Mono.zip(flux1, flux1).flatMap(tuple -> myService(tuple.t1(), tuple.t2())

Is the correct way to just reference flux1 twice in the Mono.zip? If I add logging to flux1, it doesn't seem to be running them in parallel, it seems to run them sequentially.

Wondering if there is a cleaner way then Mono.zip to avoid having to create a flux1 variable in this case.

EDIT:

To clarify the question, I'm implementing the service here, so psuedo code of what I'm trying to achieve:

myService::someCall(List<Pojo> lst)
  flux = Flux.fromIterable(lst);

  return Mono.zip(
    flux...do some stuff...........collect(Collectors.toList()),
    flux...do different stuff.collect(Collectors.toMap()).flatMap(callService2))
  .flatMap((theList, theResultOfService2) -> callService3(thelist, theResultOfService2.getKey())

from the psuedo code, I need to go over the list twice 1) to do some mapping and eventually collect it to a list 2) to do some different mapping to build a map and pass that map to another service call 3) to take the results of steps 1+2 and use them in another service call.

So the question at the end of the day, is since I'm going to be going over the lst twice, is the way I'm planning to structure it correct? to reuse the variable "flux" twice in a Mono.zip.

Upvotes: 1

Views: 2050

Answers (1)

Martin Tarj&#225;nyi
Martin Tarj&#225;nyi

Reputation: 9937

Usually a Flux is backed by some asynchronous source (e.g. database, web service) and it is expensive to consume it twice. In this case you should consider multicasting or caching.

Caching could be a good solution if the stream is finite and its elements can easily fit into memory:

flux1 = fluxFromExternalService().cache()

Otherwise multicasting is a better solution:

flux1 = fluxFromExternalService().publish().autoConnect(2)

If you have a rare(!) scenario where the Flux is backed by an in-memory list it doesn't make too much difference how it is consumed.

If you would like to enforce parallelism you can use subscribeOn/publishOn operator:

Mono.zip(flux1.subscribeOn(Schedulers.parallel()), flux1.subscribeOn(Schedulers.parallel()))
   .flatMap(tuple -> myService(tuple.t1(), tuple.t2())

Upvotes: 1

Related Questions