Reputation: 1374
I have a service with which I have registered a call back, and now I want to expose it as Flowable
with certain requirements/limitations:
Below is what I have currently
class MyBroadcaster {
private PublishProcessor<Packet> packets = PublishProcessor.create();
private Flowable<Packet> backpressuredPackets = packets.onBackpressureLatest();
public MyBroadcaster() {
//this is actually different to my exact use but same conceptually
registerCallback(packets::onNext);
}
public Flowable<Packet> observeAllPacketsOn(Scheduler scheduler) {
return backpressuredPackets.observeOn(scheduler);
}
}
I'm not sure if this actually fits my requirements. There's a note on the onBackpressureLatest
javadoc regarding observeOn
that I don't understand:
Note that due to the nature of how backpressure requests are propagated through subscribeOn/observeOn, requesting more than 1 from downstream doesn't guarantee a continuous delivery of onNext events
And I have other questions:
onBackpressureLatest
call make it so that the items are no longer multicasted?Upvotes: 0
Views: 353
Reputation: 69997
I'm not sure if this actually fits my requirements.
It does not. Apply either onBackpressureLatest
or onBackpressureBuffer
followed by observeOn
in the observeSomePacketsOn
and observeAllPacketsOn
respectively.
Does the onBackpressureLatest call make it so that the items are no longer multicasted?
The multicasting is done by PublishProcessor
and different subscribers will establish a channel to it independently where the onBackpressureXXX
and observeOn
operators take effect on an individual subscriber basis.
How can I test my requirements?
Subscribe through the lossy or lossless Flowable
with a TestSubscriber
(Flowable.test()
), feed a known set of Packets into packets
and see if all of them arrived either via TestSubscriber.assertValueCount()
or TestSubscriber.values()
. The lossy one should be 1 .. N and the lossless one should have N values after a grace period.
Bonus: If I have multiple such publishers (in same class or elsewhere) , what is the best way to make the same pattern reusable. Create my own Flowable with delegation/extra methods?
You could turn the observeAllPacketsOn
into a FlowableTransformer
and instead of a method call on MyBroadcaster, use compose
, for example:
class MyTransformers {
public static FlowableTransformer<T, T> lossyObserveOn(Scheduler s) {
return f -> f.onBackpressureLatest().observeOn(s);
}
}
new MyBroadcaster().getPacketFlow()
.compose(MyTransformers.lossyObserveOn(scheduler))
.subscribe(/* ... */);
Upvotes: 2