Aarjav
Aarjav

Reputation: 1374

Proper handling of Backpressure and Concurrency with RxJava2

I have a service with which I have registered a call back, and now I want to expose it as Flowable with certain requirements/limitations:

  1. Thread receiving callback should not be blocked (work should be handed off to a different thread/scheduler specified by the observer)
  2. There should not be any exceptions thrown due to consumers being slow down stream
  3. Multiple consumers can subscribe to it independent of each other
  4. consumers can choose to buffer all the items so that none of them are lost, however they should not be buffered in the 'producer' class

Below is what I have currently

class MyBroadcaster {
    private PublishProcessor<Packet> packets = PublishProcessor.create();
    private Flowable<Packet> backpressuredPackets = packets.onBackpressureLatest();

    public MyBroadcaster() {
       //this is actually different to my exact use but same conceptually
       registerCallback(packets::onNext);
    }

    public Flowable<Packet> observeAllPacketsOn(Scheduler scheduler) {
        return backpressuredPackets.observeOn(scheduler);
    }
}

I'm not sure if this actually fits my requirements. There's a note on the onBackpressureLatest javadoc regarding observeOn that I don't understand:

Note that due to the nature of how backpressure requests are propagated through subscribeOn/observeOn, requesting more than 1 from downstream doesn't guarantee a continuous delivery of onNext events

And I have other questions:

Upvotes: 0

Views: 353

Answers (1)

akarnokd
akarnokd

Reputation: 69997

I'm not sure if this actually fits my requirements.

It does not. Apply either onBackpressureLatest or onBackpressureBuffer followed by observeOn in the observeSomePacketsOn and observeAllPacketsOn respectively.

Does the onBackpressureLatest call make it so that the items are no longer multicasted?

The multicasting is done by PublishProcessor and different subscribers will establish a channel to it independently where the onBackpressureXXX and observeOn operators take effect on an individual subscriber basis.

How can I test my requirements?

Subscribe through the lossy or lossless Flowable with a TestSubscriber (Flowable.test()), feed a known set of Packets into packets and see if all of them arrived either via TestSubscriber.assertValueCount() or TestSubscriber.values(). The lossy one should be 1 .. N and the lossless one should have N values after a grace period.

Bonus: If I have multiple such publishers (in same class or elsewhere) , what is the best way to make the same pattern reusable. Create my own Flowable with delegation/extra methods?

You could turn the observeAllPacketsOn into a FlowableTransformer and instead of a method call on MyBroadcaster, use compose, for example:

class MyTransformers {
    public static FlowableTransformer<T, T> lossyObserveOn(Scheduler s) {
        return f -> f.onBackpressureLatest().observeOn(s);
    }
}

new MyBroadcaster().getPacketFlow()
.compose(MyTransformers.lossyObserveOn(scheduler))
.subscribe(/* ... */);

Upvotes: 2

Related Questions