Matija Folnovic
Matija Folnovic

Reputation: 946

Round-robin combining observables

I'm new to RxJava, and I've been trying to combine multiple observables in a round-robin way.

So, imagine you have three observables:

o1: --0---1------2--
o2: -4--56----------
o3: -------8---9----

Combining those in a round-robin way would give you something like:

r : --04---815-9-26-

What would be the best way to approach this? Since it looks like RxJava, RxScala etc. pretty much share API, answer in any language should be fine. :)

Thanks, Matija

Upvotes: 1

Views: 916

Answers (2)

Tomáš Dvořák
Tomáš Dvořák

Reputation: 1510

There is an approach that is very simple to implement and does almost exactly what you want - just zip the three source observables, and than emit the three values from the zipped observable each time a new triplet arrives.

Translated to RxScala

val o1 = Observable.just(1, 2, 3)
val o2 = Observable.just(10, 20, 30)
val o3 = Observable.just(100, 200, 300)

val roundRobinSource = Observable
    .zip(Observable.just(o1, o2, o3))
    .flatMap(Observable.from[Int])

roundRobinSource.subscribe(println, println)

gives you

1
10
100
2
20
200
3
30
300

Which is precisely what you want.

The problem with this approach is that it will block until a value from each of the three sources arrives, but if your cool with that, I think this is by far the simplest solution. I'm curious, what is your use case?

Update, Take #2

This is actually a fun question. Here is another take, that will trade one drawback for another.

import rx.lang.scala.{Subject, Observable}

val s1 = Subject[Int]()
val s2 = Subject[Int]()
val s3 = Subject[Int]()

val roundRobinSource3 = s1.publish(po1 ⇒ s2.publish(po2 ⇒ s3.publish(po3 ⇒ {
  def oneRound: Observable[Int] = po1.take(1) ++ po2.take(1) ++ po3.take(1)
  def all: Observable[Int] = oneRound ++ Observable.defer(all)
  all
})))

roundRobinSource3.subscribe(println, println, () ⇒ println("Completed"))

println("s1.onNext(1)")
s1.onNext(1)
println("s2.onNext(10)")
s2.onNext(10)
println("s3.onNext(100)")
s3.onNext(100)
println("s2.onNext(20)")
s2.onNext(20)
println("s1.onNext(2)")
s1.onNext(2)
println("s3.onNext(200)")
s3.onNext(200)
println("s1.onCompleted()")
s1.onCompleted()
println("s2.onCompleted()")
s2.onCompleted()
println("s3.onCompleted()")
s3.onCompleted()
println("Done...")

Gives you

s1.onNext(1)
1
s2.onNext(10)
10
s3.onNext(100)
100
s2.onNext(20)
s1.onNext(2)
2
20
s3.onNext(200)
200
s1.onCompleted()
s2.onCompleted()
s3.onCompleted()
Done...

It doesn't block, it round robins, but... it also doesn't complete :( You could make it complete in a stateful manner using a takeUntil, Subject and doOnComplete if you need it, though..

As for the mechanism, it uses the to me somehow mysterious behavior of publish, that keeps track of things already emitted. I have been originally pointed to it by @lopar when he answered my own questiong Implementing a turnstile-like operator with RxJava.

The behavior of publish is actually such a mystery to me, that I have posted a question about it here: https://github.com/ReactiveX/RxJava/issues/2775. If you are curious, you can follow it.

Upvotes: 1

akarnokd
akarnokd

Reputation: 69997

RxJava doesn't have such operator by default. The closest thing is using merge with well paced sources because it uses round-robin to collect values, but this property can't be relied upon. Why do you need this round-robin behavior?

The best bet is to implement this behavior manually. Here is an example without backpressure support.

Upvotes: 3

Related Questions