pradosh nair
pradosh nair

Reputation: 945

Execute a task on completion of ParallelFlux without executing sequential (introduce a side affect)

I have a ParallelFlux, want to execute a side effect action when all the components in all the rails are consumed. I was trying to use .then().

But unable to understand how to use it.

Can anybody share its usage or a way to execute a side affect after all the elements go through OnError,OnComplete across rails.

Indicative code :

RunTransformation provides a Parallel Flux in transformation,

OnCompletion mark record as completed in a separate registry.

RunAction does some action for each transformed record (independent of the other).

RunError handles error.

Here I want to run RunCompletion only on final completion, but have to do sequential though consumers can be done in parallel.

   Mono.just(record)
       .flatMap(RunTransformation::tranformParallel)   //gives back ParallelFlux running on Schedulers.random()
       .sequential()
       .doOnTerminate(OnCompletion::markRecordProcessed)
       .subscribe(
            RunAction::execute, 
            RunError::handleError);

Upvotes: 1

Views: 1423

Answers (2)

Yateen Gupta
Yateen Gupta

Reputation: 68

By using .then() as follows.

Mono.just(record)
        .flatMap(RunTransformation::tranformParallel)   //gives back ParallelFlux running on Schedulers.random()
        .doOnNext(RunAction::execute)
        .doOnError(RunError::handleError)
        .then()
        .doOnTerminate(() -> {System.out.println("all rails completed");})
        .subscribe(); 

Upvotes: 1

Toerktumlare
Toerktumlare

Reputation: 14772

taken from the documentation

If, once you process your sequence in parallel, you want to revert back to a “normal” Flux and apply the rest of the operator chain in a sequential manner, you can use the sequential() method on ParallelFlux.

i think doOnComplete is what you are looking for.

Flux.range(1, 10)
        .parallel(3)
        .runOn(Schedulers.parallel())
        .doOnNext(i -> System.out.println(Thread.currentThread().getName() + " -> " + i))
        .sequential()
        .doOnComplete(() -> System.out.println("All parallel work is done"))
        .subscribe()

This produces the output:

parallel-1 -> 1
parallel-2 -> 2
parallel-3 -> 3
parallel-2 -> 5
parallel-3 -> 6
parallel-1 -> 4
parallel-1 -> 7
parallel-2 -> 8
parallel-3 -> 9
parallel-1 -> 10
All parallel work is done

Reactor documentation on parallel flux

Upvotes: 1

Related Questions