Reputation: 15345
I'm just trying to grasp the concepts between a hot and a cold observable and trying out the Monifu library. My understanding is that the following code should result in only one of the subscriber getting the events emitted by the Observable, but it is not!
scala> :paste
// Entering paste mode (ctrl-D to finish)
import monifu.reactive._
import scala.concurrent.duration._
import monifu.concurrent.Implicits.globalScheduler
val obs = Observable.interval(1.second).take(10)
val x = obs.foreach(a => println(s"from x ${a}"))
val y = obs.foreach(a => println(s"from y ${a}"))
// Exiting paste mode, now interpreting.
from x 0
from y 0
import monifu.reactive._
import scala.concurrent.duration._
import monifu.concurrent.Implicits.globalScheduler
obs: monifu.reactive.Observable[Long] = monifu.reactive.Observable$$anon$5@2c3c615d
x: Unit = ()
y: Unit = ()
scala> from x 1
from y 1
from x 2
from y 2
from x 3
from y 3
from x 4
from y 4
from x 5
from y 5
from x 6
from y 6
from x 7
from y 7
from x 8
from y 8
from x 9
from y 9
So, this to me looks like the Observable is publishing events to all interested subscribers?
Upvotes: 2
Views: 376
Reputation: 8069
I'm the primary author of Monifu.
A cold observable means that its subscribe function initiates a new data-source for each subscriber (on each subscribe()
call), whereas a hot observable is sharing that same data-source between multiple subscribers.
As an example, consider a file to be the data-source. Lets model a simple Observable that emits the lines from a file:
def fromFile(file: File): Observable[String] = {
// this is the subscribe function that
// we are passing to create ;-)
Observable.create { subscriber =>
// executing things on our thread-pool
subscriber.scheduler.execute {
val source = try {
Observable.fromIterable(scala.io.Source
.fromFile(file).getLines().toIterable)
}
catch {
// subscribe functions must be protected
case NonFatal(ex) =>
Observable.error(ex)
}
source.unsafeSubscribe(subscriber)
}
}
}
This function creates a cold observable. What it means is that it will open a new file handle for each subscribed observer and then read and emit the lines for each subscribed observer.
But we can turn it into a hot observable:
// NOTE: publish() turns a cold observable into a hot one
val hotObservable = fromFile(file).publish()
And then the difference will be when you do this:
val x = observable.subscribe()
val y = observable.subscribe()
If the observable is hot:
connect()
on itconnect()
, the same file is opened and both will receive exactly the same eventsIf the observable is cold:
subscribe()
, so no need to wait for a connect()
Some references that also apply to Monifu:
Upvotes: 1