Sabareesh Kkanan
Sabareesh Kkanan

Reputation: 1211

How to execute list of Mono sequentialy

I have a list of mono i would like to process but they have to be executed sequentialy and also should execute next one only after previous mono is complete.

private Mono<List<Result>> processGoals(List<> goals,Data data) {
    
    List<Mono<Result>> plans = goals
                              .stream()
                              .map(plan -> processGoal(plan, data))
                              .collect(Collectors.toList());

}

I tried to use

return Flux.concat(plans).subscribeOn(Schedulers.single()).collectList();

But this executes the next mono before previous one is complete.

Upvotes: 1

Views: 4387

Answers (3)

lkatiforis
lkatiforis

Reputation: 6255

Flux#concatMap is the best option for this case.

It will merge each mapped publisher sequentially and fire it one at a time without having to explicitly define concurrency parameter.

Here's a complete example:

Flux.fromIterable(goals))
        .concatMap(goal -> processGoal(goal, data))
        .collectList();

Upvotes: 1

Sabareesh Kkanan
Sabareesh Kkanan

Reputation: 1211

.flatMapSequential(goal -> processGoal(goal, data), 1)

Last parameter concurrency 1 is very important. I have tried this before without concurrency parameter and that didnt work. Kudos to @Michael McFadyen

Upvotes: 2

Mirek Pluta
Mirek Pluta

Reputation: 8033

Instead of mixing two paradigms: Streams and Reactive Streams. You can try to go fully reactive.

Try following:

Mono<List<Result>> res = Flux.just(goals.toArray(Goal[]::new))
                            .flatMapSequential(goal -> processGoal(goal, data))
                            .subscribeOn(Schedulers.single())
                            .collectList();

Upvotes: 0

Related Questions