Reputation: 1211
I have a list of mono i would like to process but they have to be executed sequentialy and also should execute next one only after previous mono is complete.
private Mono<List<Result>> processGoals(List<> goals,Data data) {
List<Mono<Result>> plans = goals
.stream()
.map(plan -> processGoal(plan, data))
.collect(Collectors.toList());
}
I tried to use
return Flux.concat(plans).subscribeOn(Schedulers.single()).collectList();
But this executes the next mono before previous one is complete.
Upvotes: 1
Views: 4387
Reputation: 6255
Flux#concatMap
is the best option for this case.
It will merge each mapped publisher sequentially and fire it one at a time without having to explicitly define concurrency
parameter.
Here's a complete example:
Flux.fromIterable(goals))
.concatMap(goal -> processGoal(goal, data))
.collectList();
Upvotes: 1
Reputation: 1211
.flatMapSequential(goal -> processGoal(goal, data), 1)
Last parameter concurrency 1 is very important. I have tried this before without concurrency parameter and that didnt work. Kudos to @Michael McFadyen
Upvotes: 2
Reputation: 8033
Instead of mixing two paradigms: Streams and Reactive Streams. You can try to go fully reactive.
Try following:
Mono<List<Result>> res = Flux.just(goals.toArray(Goal[]::new))
.flatMapSequential(goal -> processGoal(goal, data))
.subscribeOn(Schedulers.single())
.collectList();
Upvotes: 0