Reputation: 26271
I've a specific need to make sure item is delivered to the subscriber. It would be nice if onNext
will return boolean
indicating whether the item has been delivered or not, but it unfortunately returns void
.
I've collection of items which are backed by BlockingDeque
. New items are added to the queue and pooled by subscribers. Subscriber may unsubscribe at any time and the item may be lost. Here's my current implementation:
return Observable.create((Subscriber<? super Item> subscriber) -> {
while (true) {
try {
Item item = items.poll(poolTimeout, poolUnit);
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(item);
} else {
items.addFirst(item);
break;
}
} catch (InterruptedException e) {
if (subscriber.isUnsubscribed()) {
break;
}
}
}
subscriber.onCompleted();
}).subscribeOn(Schedulers.io())
...
Even though I'm checking isUnsubscribed
before calling onNext
there may be situation when Subscriber
has unsubscribed between the check and item delivery. It's common synchronization problem and it's easy to test, for example sometimes my test fails with AssertionError
when checking for received collection size
java.lang.AssertionError:
Expected size:<1000> but was:<999> in:
I'm looking for solution that allows me to deliver items in FIFO order with 100% certainty that no item is lost. Any ideas how to achieve this in RxJava?
Upvotes: 1
Views: 232
Reputation: 70017
If I understand your problem correctly, you need a guaranteed end-to-end delivery with the option to piggyback on rejected items due unsubscription?
Unfortunately, RxJava is not architected this way since many operators and classes simply drop onNext values after they unsubscribed. I see some problems as well:
boolean onNext(T t)
doesn't compose well across async boundaries. We had a similar problem with unsubscription.I can only imagine a setup where there is a concurrent set of items. The source places an item into this set and calls onNext. The final subscriber, when successfully processed the item, removes this item from the set. If this final subscriber unsubscribes, all remaining items can be considered undelivered. Note however, that unsubscription is best effort so the set may indicate undelivered events that will be successfully delivered in a short time after unsubscription, unless you synchronize onNext and an unsubscription action. Here is an example code.
Upvotes: 1