Reputation: 2171
while trying to get familiar with Flux I want to have a stream of Data within a Flux.
the .map()
function should simulate a workload which requires some random processing time per Magician
object and is happening in separate Threads (as you can see in the output at Thread id
to make use of Multi-Threading.
Finally the stream should return the results in order. But no matter what I do, e.g. using .sequential()
, or where it always ends up in in being unordered.
Can someone tell me how I can make use of Multi-Threading (using .map()
or any other sequence as long as the result is the one asked) and having the results in order please? Thank you
P.S. The business logic behind this question is that I am calling an Endpoint that delivers a Flux<byte[]>
These bytes I need to process in parallel (decryption) and therefor forward in order to another consumer.
public class FluxRunner {
public void getWorkerStream() throws InterruptedException {
final int[] counter = new int[1];
// create workers
final Flux<Magician[]> workersFlux = Flux.range(0, 10)
.map(integer -> {
final Magician[] worker = new Magician[1];
worker[0] = new Magician(counter[0], counter[0]++);
return worker;
});
final Disposable disposable = workersFlux
.parallel()
.runOn(Schedulers.parallel())
.map(workers -> {
System.out.println("Thread id: " + Thread.currentThread().getId());
workers[0].calculate();
return workers;
})
.sequential() // no effect, doOnNext is unordered
.doOnNext(workers -> System.out.println(workers[0].getId()))
.subscribe();
while (!disposable.isDisposed()) {
sleep(500);
}
}
}
@Data
class Magician {
final int id;
int number;
Magician(final int id, final int number) {
this.id = id;
this.number = number;
}
public void calculate() {
int timeToSleep = (int) (Math.random() * 3000);
System.out.println("Sleep for " + timeToSleep + " seconds");
try {
sleep(timeToSleep);
} catch (InterruptedException e) {
e.printStackTrace();
}
number++;
}
}
result will be
Thread id: 33
Thread id: 24
Thread id: 25
Thread id: 31
Thread id: 29
Thread id: 30
Thread id: 27
Thread id: 32
Thread id: 28
Thread id: 26
Sleep for 2861 seconds
Sleep for 2811 seconds
Sleep for 711 seconds
Sleep for 2462 seconds
Sleep for 1858 seconds
Sleep for 601 seconds
Sleep for 126 seconds
Sleep for 359 seconds
Sleep for 2014 seconds
Sleep for 2356 seconds
4
5
7
9
8
3
0
1
6
2
Upvotes: 0
Views: 1512
Reputation: 9947
You can use flatMapSequential
operator along with .subscribeOn(Schedulers.parallel())
to get the desired result:
final Disposable disposable = workersFlux
.flatMapSequential(workers -> Mono.fromCallable(() -> {
System.out.println("Thread id: " + Thread.currentThread().getId());
workers[0].calculate();
return workers;
}).subscribeOn(Schedulers.parallel()))
.doOnNext(workers -> System.out.println(workers[0].getId()))
.subscribe();
Upvotes: 1