j2emanue
j2emanue

Reputation: 62519

RxJava - flatmap vs concatMap - why is ordering the same on subscription?

According to this thread conCatMap and flatmap only differ by the order in which items are emitted. So i did a test and created a simple stream of integers and wanted to see in what order they would be emitted. I made a small observable that would take in numbers in the range from 1-5 and multiple them by two. Easy.

Here is the code with flatmap:

myObservable.flatMap(new Func1<Integer, Observable<Integer>>() {
        @Override
        public Observable<Integer> call(Integer integer) {
            return Observable.just(integer * 2);
        }
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onCompleted() {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(Integer integer) {
        Log.v("myapp","from flatMap:"+integer);
        }
    });

and the exact same code using concatMap:

myObservable.concatMap(new Func1<Integer, Observable<Integer>>() {
        @Override
        public Observable<Integer> call(Integer integer) {
            return Observable.just(integer * 2);
        }
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onCompleted() {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(Integer integer) {
        Log.v("myapp","from concatmap:"+integer);
        }
    });

when i saw print out in the logs the ordering was the same for both, why ? I thought only concatMap will preserve order ?

Upvotes: 3

Views: 3002

Answers (1)

Egor Neliuba
Egor Neliuba

Reputation: 15054

What you are seeing is a coincidence. Every time your flatMap returns a value it does so on the same thread as the previous one.

I have modified your example to take advantage of multithreading:

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        .flatMap(integer -> Observable.just(integer)
                .observeOn(Schedulers.computation())
                .flatMap(i -> {
                    try {
                        Thread.sleep(new Random().nextInt(1000));
                        return Observable.just(2 * i);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                        return Observable.error(e);
                    }
                }))
        .subscribe(System.out::println,
                Throwable::printStackTrace,
                () -> System.out.println("onCompleted"));

I am delaying each 2 * i value by a random delay to force different order. Also, I have added observeOn(Schedulers.computation()) before this so the next operator (flatMap) runs on the computation thread pool -- this does the multithreading magic.

This is the output I get for my example (on Android):

I/System.out: 6
I/System.out: 4
I/System.out: 12
I/System.out: 14
I/System.out: 8
I/System.out: 2
I/System.out: 16
I/System.out: 20
I/System.out: 10
I/System.out: 18
I/System.out: onCompleted

If I replace flatMap after the just with concatMap, then I get a properly ordered output.

There's a great post by Thomas Nield with a proper explanation.

Upvotes: 10

Related Questions