lisak
lisak

Reputation: 21961

Subject that emits events to subscribers in specific order with back-pressure

Imagine a pipe of subscribers that you emit event to and it visits one subscriber after another.

Having a PublishSubject and x subscribers/observables. Normally events are emitted to observers in a specific order but simultaneously regardless of when observers return. Is it possible to do this flow :

  1. emit event to observerA
  2. after osbserverA returns, emit the event to observerB
  3. after observerB returns, emit the event to observerC

I'm using RxScala and Monifu Rx implementations

Monifu even has a back-pressure implementation :

def onNext(elem: T): Future[Ack]

I'd like to see "And Result was : Changed !!" be printed out in this sample:

  val subject = PublishSubject[Int]()

  var result = "Not Changed"
  subject.subscribe { i =>
    Observable.timerOneTime(3.seconds, Continue).asFuture.map { x =>
      result = "Changed !!"
      x.get
    }
  }

  subject.subscribe { i =>
    Observable.timerOneTime(1.seconds, Continue).asFuture.map { x =>
      println("And Result was : " + result)
      x.get
    }
  }

  subject.onNext(1)

Is it possible in RxScala/RxJava or Monifu without extending Subject and overriding onNext implementation? These classes are declared final anyway so it would be rather hacking.

Upvotes: 2

Views: 325

Answers (1)

lisak
lisak

Reputation: 21961

I think the answer is a custom Subject implementation, something like this in Monifu that would feed the observers in a flatMap manner (ignoring the fact that PublishSubject is a final class) :

class PipeSubject extends PublishSubject[RxEvent] {
  override def onNext(elem: RxEvent): Future[Ack] = {
    if (!isCompleted) {
      val observers = subscriptions
      if (observers.nonEmpty)
        pipeThroughMany(observers, elem)
      else
        Continue
    }
    else
      Cancel
  }

 private[this] def pipeThroughMany(array: Array[Observer[T]], elem: T): Future[Continue] = {
    val length = array.length
    def >>>(idx: Int = 0): Future[Continue] = {
      val obs = array(idx)
      obs.onNext(elem).flatMap {
         case Continue =>
           if (idx+1 < length)
              >>>(idx+1)
           else
             Continue
         case _ =>
           removeSubscription(obs)
           Continue
      }
    }
    >>>()
  }
}

Upvotes: 1

Related Questions