Reputation: 21961
Imagine a pipe of subscribers that you emit event to and it visits one subscriber after another.
Having a PublishSubject and x subscribers/observables. Normally events are emitted to observers in a specific order but simultaneously regardless of when observers return. Is it possible to do this flow :
I'm using RxScala and Monifu Rx implementations
Monifu even has a back-pressure implementation :
def onNext(elem: T): Future[Ack]
I'd like to see "And Result was : Changed !!" be printed out in this sample:
val subject = PublishSubject[Int]()
var result = "Not Changed"
subject.subscribe { i =>
Observable.timerOneTime(3.seconds, Continue).asFuture.map { x =>
result = "Changed !!"
x.get
}
}
subject.subscribe { i =>
Observable.timerOneTime(1.seconds, Continue).asFuture.map { x =>
println("And Result was : " + result)
x.get
}
}
subject.onNext(1)
Is it possible in RxScala/RxJava or Monifu without extending Subject and overriding onNext implementation? These classes are declared final anyway so it would be rather hacking.
Upvotes: 2
Views: 325
Reputation: 21961
I think the answer is a custom Subject implementation, something like this in Monifu that would feed the observers in a flatMap manner (ignoring the fact that PublishSubject is a final class) :
class PipeSubject extends PublishSubject[RxEvent] {
override def onNext(elem: RxEvent): Future[Ack] = {
if (!isCompleted) {
val observers = subscriptions
if (observers.nonEmpty)
pipeThroughMany(observers, elem)
else
Continue
}
else
Cancel
}
private[this] def pipeThroughMany(array: Array[Observer[T]], elem: T): Future[Continue] = {
val length = array.length
def >>>(idx: Int = 0): Future[Continue] = {
val obs = array(idx)
obs.onNext(elem).flatMap {
case Continue =>
if (idx+1 < length)
>>>(idx+1)
else
Continue
case _ =>
removeSubscription(obs)
Continue
}
}
>>>()
}
}
Upvotes: 1