Reputation: 946
I'm new to RxJava, and I've been trying to combine multiple observables in a round-robin way.
So, imagine you have three observables:
o1: --0---1------2--
o2: -4--56----------
o3: -------8---9----
Combining those in a round-robin way would give you something like:
r : --04---815-9-26-
What would be the best way to approach this? Since it looks like RxJava, RxScala etc. pretty much share API, answer in any language should be fine. :)
Thanks, Matija
Upvotes: 1
Views: 916
Reputation: 1510
There is an approach that is very simple to implement and does almost exactly what you want - just zip the three source observables, and than emit the three values from the zipped observable each time a new triplet arrives.
Translated to RxScala
val o1 = Observable.just(1, 2, 3)
val o2 = Observable.just(10, 20, 30)
val o3 = Observable.just(100, 200, 300)
val roundRobinSource = Observable
.zip(Observable.just(o1, o2, o3))
.flatMap(Observable.from[Int])
roundRobinSource.subscribe(println, println)
gives you
1
10
100
2
20
200
3
30
300
Which is precisely what you want.
The problem with this approach is that it will block until a value from each of the three sources arrives, but if your cool with that, I think this is by far the simplest solution. I'm curious, what is your use case?
This is actually a fun question. Here is another take, that will trade one drawback for another.
import rx.lang.scala.{Subject, Observable}
val s1 = Subject[Int]()
val s2 = Subject[Int]()
val s3 = Subject[Int]()
val roundRobinSource3 = s1.publish(po1 ⇒ s2.publish(po2 ⇒ s3.publish(po3 ⇒ {
def oneRound: Observable[Int] = po1.take(1) ++ po2.take(1) ++ po3.take(1)
def all: Observable[Int] = oneRound ++ Observable.defer(all)
all
})))
roundRobinSource3.subscribe(println, println, () ⇒ println("Completed"))
println("s1.onNext(1)")
s1.onNext(1)
println("s2.onNext(10)")
s2.onNext(10)
println("s3.onNext(100)")
s3.onNext(100)
println("s2.onNext(20)")
s2.onNext(20)
println("s1.onNext(2)")
s1.onNext(2)
println("s3.onNext(200)")
s3.onNext(200)
println("s1.onCompleted()")
s1.onCompleted()
println("s2.onCompleted()")
s2.onCompleted()
println("s3.onCompleted()")
s3.onCompleted()
println("Done...")
Gives you
s1.onNext(1)
1
s2.onNext(10)
10
s3.onNext(100)
100
s2.onNext(20)
s1.onNext(2)
2
20
s3.onNext(200)
200
s1.onCompleted()
s2.onCompleted()
s3.onCompleted()
Done...
It doesn't block, it round robins, but... it also doesn't complete :( You could make it complete in a stateful manner using a takeUntil
, Subject
and doOnComplete
if you need it, though..
As for the mechanism, it uses the to me somehow mysterious behavior of publish
, that keeps track of things already emitted. I have been originally pointed to it by @lopar when he answered my own questiong Implementing a turnstile-like operator with RxJava.
The behavior of publish
is actually such a mystery to me, that I have posted a question about it here: https://github.com/ReactiveX/RxJava/issues/2775. If you are curious, you can follow it.
Upvotes: 1
Reputation: 69997
RxJava doesn't have such operator by default. The closest thing is using merge with well paced sources because it uses round-robin to collect values, but this property can't be relied upon. Why do you need this round-robin behavior?
The best bet is to implement this behavior manually. Here is an example without backpressure support.
Upvotes: 3