Reputation: 521
I read difference between merge and concat. Concat preserve order, merge not.
But in my case, merge always preserve also order. What can be wrong ? Why my merge waits for previous tasks ? I want achieve random result but in this case I always get A1 A2 A3... B1 B2 B3...
fun doSomething() {
compositeDisposable.add(
Observable.merge(getNumber1(), getNumber2())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
{ item -> println(item) },
{ error -> println(error.message) }
))
}
private fun getNumber1(): Observable<String> {
return Observable.create { emitter ->
for (i in 1..10) {
emitter.onNext("ObservableA: " + i)
}
emitter.onComplete()
}
}
private fun getNumber2(): Observable<String> {
return Observable.create { emitter ->
for (i in 1..10) {
emitter.onNext("ObservableB: " + i)
}
emitter.onComplete()
}
}
Upvotes: 1
Views: 177
Reputation: 93
For understanding difference between merge & concat, you can refer to below example where emissions are happened at every 10 milliseconds.
val observableFirst = Observable.interval(10, TimeUnit.MILLISECONDS).takeWhile { it < 10 }.map { "A$it" }
val observableSecond = Observable.interval(10, TimeUnit.MILLISECONDS).takeWhile { it < 10 }.map { "B$it" }
val result = arrayListOf<String>()
Observable.merge(observableFirst, observableSecond)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnComplete { println(result) }
.subscribe(
{ item -> result.add(item) },
{ error -> println(error.message) }
)
Output (may differ everytime): [A0, B0, A1, B1, A2, B2, A3, B3, A4, B4, A5, B5, A6, B6, B7, A7, B8, A8, B9, A9]
In your example, for loop causes very fast emission intervals. Thus, you didn't find any differences with respect to its order.
In my example, as you can see, order of emission is not maintained. Also, you can check this by using concat instead of merge.
Output (does not differ): [A0, A1, A2, A3, A4, A5, A6, A7, A8, A9, B0, B1, B2, B3, B4, B5, B6, B7, B8, B9]
Upvotes: 1
Reputation: 2672
Why my merge waits for previous tasks ?
This is because they’re both executed on the same thread. To both tasks work in parallel subscribeOn
should be called on each of the Observables before they’re merged:
Observable.merge(
getNumber1().subscribeOn(Schedulers.io()),
getNumber2().subscribeOn(Schedulers.io())
).observeOn(AndroidSchedulers.mainThread())
...
Also you can add Thread.sleep(100)
after emitter.onNext
for better testing
Upvotes: 1