Reputation: 43
I have run into a behavior of Scala Observables that has surprised me. Consider my example below:
object ObservablesDemo extends App {
val oFast = Observable.interval(3.seconds).map(n => s"[FAST] ${n*3}")
val oSlow = Observable.interval(7.seconds).map(n => s"[SLOW] ${n*7}")
val oBoth = (oFast merge oSlow).take(8)
oBoth.subscribe(println(_))
oBoth.toBlocking.toIterable.last
}
The code demonstrates emitting elements from two observables. One of them emits its elements in a "slow" way (every 7 seconds), the other in a "fast" way (every 3 seconds). For the sake of the question assume we want to define those observables with the use of the map
function and map the numbers from the interval
appropriately as seen above (as opposed to another possible approach which would be emitting items at the same rate from both observables and then filter
ing out as needed).
The output of the code seems counterintuitive to me:
[FAST] 0
[FAST] 3
[SLOW] 0
[FAST] 6
[FAST] 9 <-- HERE
[SLOW] 7 <-- HERE
[FAST] 12
[FAST] 15
The problematic part is when the [FAST]
observable emits 9
before the [SLOW]
observable emits 7
. I would expect 7
to be emitted before 9
as whatever is emitted on the seventh second should come before what is emitted on the ninth second.
How should I modify the code to achieve the intended behavior? I have looked into the RxScala documentation and have started my search with topics such as the different interval
functions and the Scheduler
classes but I'm not sure if it's the right place to search for the answer.
Upvotes: 3
Views: 83
Reputation: 992
That looks like the way it should work. Here it is listing out the seconds and the events. You can verify with TestObserver
and TestScheduler
if that is available in RXScala. RXScala was EOL in 2019, so keep that in mind too.
Secs Event
-----------------
1
2
3 [Fast] 0
4
5
6 [Fast] 3
7 [Slow] 0
8
9 [Fast] 6
10
11
12 [Fast] 9
13
14 [Slow] 7
15 [Fast] 12
16
17
18 [Fast] 15
19
20
21 [Fast] 18
Upvotes: 2