discCard
discCard

Reputation: 521

RxJava merge preserve order without sense

I read difference between merge and concat. Concat preserve order, merge not.

But in my case, merge always preserve also order. What can be wrong ? Why my merge waits for previous tasks ? I want achieve random result but in this case I always get A1 A2 A3... B1 B2 B3...

fun doSomething() {
        compositeDisposable.add(
            Observable.merge(getNumber1(), getNumber2())
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(
                    { item -> println(item) },
                    { error -> println(error.message) }
                ))
    }

    private fun getNumber1(): Observable<String> {
        return Observable.create { emitter ->
            for (i in 1..10) {
                emitter.onNext("ObservableA: " + i)
            }
            emitter.onComplete()
        }
    }

    private fun getNumber2(): Observable<String> {
        return Observable.create { emitter ->
            for (i in 1..10) {
                emitter.onNext("ObservableB: " + i)
            }
            emitter.onComplete()
        }
    }

Upvotes: 1

Views: 177

Answers (2)

Buddhabhushan Naik
Buddhabhushan Naik

Reputation: 93

For understanding difference between merge & concat, you can refer to below example where emissions are happened at every 10 milliseconds.

val observableFirst = Observable.interval(10, TimeUnit.MILLISECONDS).takeWhile { it < 10 }.map { "A$it" }
val observableSecond = Observable.interval(10, TimeUnit.MILLISECONDS).takeWhile { it < 10 }.map { "B$it" }

val result = arrayListOf<String>()
Observable.merge(observableFirst, observableSecond)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .doOnComplete { println(result) }
    .subscribe(
        { item -> result.add(item) },
        { error -> println(error.message) }
    )

Output (may differ everytime): [A0, B0, A1, B1, A2, B2, A3, B3, A4, B4, A5, B5, A6, B6, B7, A7, B8, A8, B9, A9]

In your example, for loop causes very fast emission intervals. Thus, you didn't find any differences with respect to its order.

In my example, as you can see, order of emission is not maintained. Also, you can check this by using concat instead of merge.

Output (does not differ): [A0, A1, A2, A3, A4, A5, A6, A7, A8, A9, B0, B1, B2, B3, B4, B5, B6, B7, B8, B9]

Upvotes: 1

Akaki Kapanadze
Akaki Kapanadze

Reputation: 2672

Why my merge waits for previous tasks ?

This is because they’re both executed on the same thread. To both tasks work in parallel subscribeOn should be called on each of the Observables before they’re merged:

Observable.merge(
    getNumber1().subscribeOn(Schedulers.io()),
    getNumber2().subscribeOn(Schedulers.io())
).observeOn(AndroidSchedulers.mainThread())
    ...

Also you can add Thread.sleep(100) after emitter.onNext for better testing

Upvotes: 1

Related Questions