Reputation: 151
I have this very simple RxJava example
List<Integer> arrayIntegers = new ArrayList<>(Arrays.asList(1, 2, 3, 4, 5));
Observable.fromIterable(arrayIntegers).map(i -> {
Log.d("RxJava", "map i = " + i);
return i;
}).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).
subscribe(new DisposableObserver<Integer>() {
@Override
public void onNext(Integer i) {
Log.d("RxJava", "next i = " + i);
}
@Override
public void onError(Throwable e) {}
@Override
public void onComplete() {
Log.d("RxJava", "Completed");
}
});
Which gives this result..
D/RxJava: map i = 1
D/RxJava: map i = 2
D/RxJava: map i = 3
D/RxJava: map i = 4
D/RxJava: map i = 5
D/RxJava: next i = 1
D/RxJava: next i = 2
D/RxJava: next i = 3
D/RxJava: next i = 4
D/RxJava: next i = 5
What I was expecting though is something more like this
D/RxJava: map i = 1
D/RxJava: next i = 1
D/RxJava: map i = 2
D/RxJava: next i = 2
D/RxJava: map i = 3
D/RxJava: next i = 3
D/RxJava: map i = 4
D/RxJava: next i = 4
D/RxJava: map i = 5
D/RxJava: next i = 5
Could someone explain what I am doing wrong which is causing my order to be incorrect?
Upvotes: 1
Views: 811
Reputation: 30335
Your Rx pipeline operates in two different threads.
The first thread (Schedulers.newThread()
), does the fromIterable
and map
. The second calls DisposableObserver
.
Between the two threads there's (an invisible) buffer inside the observeOn()
operator. So what happens is that the first thread gets some cpu time, quickly does all of its work and puts the results in the observeOn
buffer. This probably happens during a single spin of that thread on the CPU.
Then, the second thread is launched, it takes the items from the buffer and continues the processing by calling your DisposableObserver
.
I think that what you're looking for here is using a single thread. Remove the observeOn
operator, see if it does what you want it to do.
Edit: Just tried removing the observeOn
operator and running it on my machine. Here's the result:
RxJava map i = 1
RxJava next i = 1
RxJava map i = 2
RxJava next i = 2
RxJava map i = 3
RxJava next i = 3
RxJava map i = 4
RxJava next i = 4
RxJava map i = 5
RxJava next i = 5
For future reference, it might be useful for debugging purposes to print the name of the thread that reaches the various Rx operators. You can get the name using Thread.currentThread().getName()
.
Upvotes: 2