Reputation: 325
If i execute such a code:
StopWatch stopWatch = new StopWatch();
stopWatch.start();
Flowable.fromIterable(Lists.newArrayList(1, 2, 3, 4, 5, 6))
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Throwable {
System.out.println(stopWatch + " value:" + integer);
Thread.sleep(1000);
}
});
Thread.sleep(100000);
In output, i get each element just after sleep time, like this:
00:00:00.027 value:1
00:00:01.030 value:2
00:00:02.030 value:3
00:00:03.031 value:4
00:00:04.031 value:5
00:00:05.031 value:6
but as i understand, if i use Schedulers.io(), than i must get all the values parallel, and i expected, that i will get all values immediately, than i will wait 1000 mills just once, and thats all
Like this:
00:00:00.027 value:1
00:00:00.030 value:2
00:00:00.030 value:3
00:00:00.031 value:4
00:00:00.031 value:5
00:00:00.031 value:6
How can i get them all in other threads, NOT one by one?
I dont wanna them to wait each other
I try Schedulers.computation()
and other, but they still arrive one by one
How to get them all immediately?
P.S. There is some text for better search in google. I get it from browser history. rxjava how to many subscriber, rxjava how to post all elements synchronously, only one thread active rxjava, flowable from don't work in many threads
Upvotes: 0
Views: 75
Reputation: 69997
if i use Schedulers.io(), than i must get all the values parallel
No. RxJava flows are sequential by default, which means items are delivered one after the other. If your consumer blocks or sleeps, subsequent items are delayed every time.
How can i get them all in other threads, NOT one by one?
Use parallel
:
StopWatch stopWatch = new StopWatch();
stopWatch.start();
Flowable.fromIterable(Lists.newArrayList(1, 2, 3, 4, 5, 6))
.parallel()
.runOn(Schedulers.io())
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Throwable {
System.out.println(stopWatch + " value:" + integer);
Thread.sleep(1000);
}
})
.sequential()
.subscribe();
Thread.sleep(100000);
Recommended reading: https://github.com/ReactiveX/RxJava#parallel-processing
Upvotes: 1