Reputation: 1044
I have a BehaviorSubject that has three listeners that are subscribed prior to any emissions. I .onNext()
two things: A followed by B.
Two of the listeners appropriately receive A and then B. But the third listener gets B, A. What could possibly explain this behavior? This is all on the same thread.
Here is some sample code (in Kotlin) that reproduces the results. Let me know if you need a Java version:
@Test
fun `rxjava test`() {
val eventHistory1 = ArrayList<String>()
val eventHistory2 = ArrayList<String>()
val eventHistory3 = ArrayList<String>()
val behaviorSubject = BehaviorSubject.create<String>()
behaviorSubject.subscribe {
eventHistory1.add(it)
}
behaviorSubject.subscribe {
eventHistory2.add(it)
if (it == "A") behaviorSubject.onNext("B")
}
behaviorSubject.subscribe {
eventHistory3.add(it)
}
behaviorSubject.onNext("A")
println(eventHistory1)
println(eventHistory2)
println(eventHistory3)
assert(eventHistory1 == eventHistory2)
assert(eventHistory2 == eventHistory3)
}
And here is the output from the test:
[A, B]
[A, B]
[B, A]
Upvotes: 0
Views: 216
Reputation: 70007
Subjects are not re-entrant thus calling onNext on the same subject that is currently servicing onNexts is an undefined behavior. The javadoc warns about this case:
Calling onNext(Object), onError(Throwable) and onComplete() is required to be serialized (called from the same thread or called non-overlappingly from different threads through external means of serialization). The Subject.toSerialized() method available to all Subjects provides such serialization and also protects against reentrance (i.e., when a downstream Observer consuming this subject also wants to call onNext(Object) on this subject recursively).
In your particular case, signaling "B" happens first for the 3rd observer while it was about to signal "A" to it, hence the swapped order.
Use toSerialized
on the subject to make sure this doesn't happen.
val behaviorSubject = BehaviorSubject.create<String>().toSerialized()
Upvotes: 2