tomrozb
tomrozb

Reputation: 26271

Make sure item is delivered to onNext listener in RxJava

I've a specific need to make sure item is delivered to the subscriber. It would be nice if onNext will return boolean indicating whether the item has been delivered or not, but it unfortunately returns void.

I've collection of items which are backed by BlockingDeque. New items are added to the queue and pooled by subscribers. Subscriber may unsubscribe at any time and the item may be lost. Here's my current implementation:

return Observable.create((Subscriber<? super Item> subscriber) -> {
    while (true) {
        try {
            Item item = items.poll(poolTimeout, poolUnit);
            if (!subscriber.isUnsubscribed()) {
                subscriber.onNext(item);
            } else {
                items.addFirst(item);
                break;
            }
        } catch (InterruptedException e) {
            if (subscriber.isUnsubscribed()) {
                break;
            }
        }
    }
    subscriber.onCompleted();
}).subscribeOn(Schedulers.io())
   ...

Even though I'm checking isUnsubscribed before calling onNext there may be situation when Subscriber has unsubscribed between the check and item delivery. It's common synchronization problem and it's easy to test, for example sometimes my test fails with AssertionError when checking for received collection size

java.lang.AssertionError:
Expected size:<1000> but was:<999> in:

I'm looking for solution that allows me to deliver items in FIFO order with 100% certainty that no item is lost. Any ideas how to achieve this in RxJava?

Upvotes: 1

Views: 232

Answers (1)

akarnokd
akarnokd

Reputation: 70017

If I understand your problem correctly, you need a guaranteed end-to-end delivery with the option to piggyback on rejected items due unsubscription?

Unfortunately, RxJava is not architected this way since many operators and classes simply drop onNext values after they unsubscribed. I see some problems as well:

  • Which subscriber can tell if the item was consumed successfully?
  • boolean onNext(T t) doesn't compose well across async boundaries. We had a similar problem with unsubscription.
  • returning boolean implies the need to block if downstream uses observeOn.

I can only imagine a setup where there is a concurrent set of items. The source places an item into this set and calls onNext. The final subscriber, when successfully processed the item, removes this item from the set. If this final subscriber unsubscribes, all remaining items can be considered undelivered. Note however, that unsubscription is best effort so the set may indicate undelivered events that will be successfully delivered in a short time after unsubscription, unless you synchronize onNext and an unsubscription action. Here is an example code.

Upvotes: 1

Related Questions