Reputation: 945
I have a ParallelFlux
, want to execute a side effect action when all the components in all the rails are consumed. I was trying to use .then()
.
But unable to understand how to use it.
Can anybody share its usage or a way to execute a side affect after all the elements go through OnError,OnComplete across rails.
Indicative code :
RunTransformation provides a Parallel Flux in transformation,
OnCompletion mark record as completed in a separate registry.
RunAction does some action for each transformed record (independent of the other).
RunError handles error.
Here I want to run RunCompletion only on final completion, but have to do sequential though consumers can be done in parallel.
Mono.just(record)
.flatMap(RunTransformation::tranformParallel) //gives back ParallelFlux running on Schedulers.random()
.sequential()
.doOnTerminate(OnCompletion::markRecordProcessed)
.subscribe(
RunAction::execute,
RunError::handleError);
Upvotes: 1
Views: 1423
Reputation: 68
By using .then() as follows.
Mono.just(record)
.flatMap(RunTransformation::tranformParallel) //gives back ParallelFlux running on Schedulers.random()
.doOnNext(RunAction::execute)
.doOnError(RunError::handleError)
.then()
.doOnTerminate(() -> {System.out.println("all rails completed");})
.subscribe();
Upvotes: 1
Reputation: 14772
taken from the documentation
If, once you process your sequence in parallel, you want to revert back to a “normal” Flux and apply the rest of the operator chain in a sequential manner, you can use the sequential() method on ParallelFlux.
i think doOnComplete
is what you are looking for.
Flux.range(1, 10)
.parallel(3)
.runOn(Schedulers.parallel())
.doOnNext(i -> System.out.println(Thread.currentThread().getName() + " -> " + i))
.sequential()
.doOnComplete(() -> System.out.println("All parallel work is done"))
.subscribe()
This produces the output:
parallel-1 -> 1
parallel-2 -> 2
parallel-3 -> 3
parallel-2 -> 5
parallel-3 -> 6
parallel-1 -> 4
parallel-1 -> 7
parallel-2 -> 8
parallel-3 -> 9
parallel-1 -> 10
All parallel work is done
Reactor documentation on parallel flux
Upvotes: 1