Peter K.
Peter K.

Reputation: 587

RxJava : Consumers of Hot Observables

I am trying to get to grip with the intricacies of RxJava, but hit a newbie problem:

I am trying to create a hot observable from a cold one, and subscribe two consumers which process pushed events at different speeds. Here is a code snippet:

ConnectableObservable<Long> ob = Observable.interval(200, TimeUnit.MILLISECONDS)
    .publish();
ob.connect();    

Consumer<Long> withSleep = (Long t) -> {
    System.out.println("Second : " + t);
    sleep(1);
};

Consumer<Long> noSleep = (Long t) -> {
    System.out.println("First : " + t);
};

sleep(2);

ob.observeOn(Schedulers.newThread()).subscribe(noSleep);

ob.observeOn(Schedulers.newThread()).subscribe(withSleep);

sleep(5);

The sleep(2) is just to see whether the observable already started firing. And indeed, this prints initially as expected.

  1. First : 10
  2. Second : 10
  3. First : 11
  4. First : 12
  5. First : 13
  6. First : 14
  7. Second : 11
  8. First : 15
  9. First : 16
  10. First : 17

But then the second consumer (the one with the longer processing time, simulated by the 1 second sleep), picks up the event in sequence (output line 7), rather than the current event (no. 14) as I would expect from a hot observable. Isn't the idea of a hot observable to just keep firing, irrespective of subscribers, and subscribers to pick up whatever is pushed at the very moment (assuming no specific, explicit backpressure strategy)?

What would I need to change to have the second consumer simply pick up whatever is produced at the moment (i.e. displaying 14 instead of 11 in the example above)?

Any help would be greatly appreciated.

Upvotes: 0

Views: 147

Answers (1)

akarnokd
akarnokd

Reputation: 69997

Hot or cold, operators such as publish and observeOn stay continuous once subscribed, thus you'll get all events even if processing takes longer than the emission rate.

To avoid processing old entries in the second case, you have to chain operators that drop events and don't buffer too much either:

ob.toFlowable(BackpressureStrategy.DROP)
  .delay(0, TimeUnit.MILLISECONDS, Schedulers.newThread())
  .rebatchRequests(1)
  .subscribe(withSleep);

or

ob.toFlowable(BackpressureStrategy.LATEST)
  .delay(0, TimeUnit.MILLISECONDS, Schedulers.newThread())
  .rebatchRequests(1)
  .subscribe(withSleep);

Upvotes: 2

Related Questions