codependent
codependent

Reputation: 24442

RxJava - ConnectableObservable can't notify its observers more than 128 times when using observeOn and subscribeOn simultaneously

I have an application that uses a ConnectableObservable that runs for a long time. Mysteriously after some time its observer stopped getting notifications in its onNext() method.

I have written the following test that simplifies the example. It's just a ConnectableObservable with an infinite loop, with one subscriber using both observeOn and subscribeon. After 128 s.onNext(1) calls it stops notifying the observer.

@Test
public void testHotObservable() throws InterruptedException{

    CountDownLatch latch = new CountDownLatch(1);

    ConnectableObservable<Integer> observable = Observable.<Integer>create( (s) -> {
        while(true){
            try {
                Thread.sleep(500);
            } catch (Exception e) {
                e.printStackTrace();
            }
            s.onNext(1);
        }
    })
    .observeOn(Schedulers.io())
    .subscribeOn(Schedulers.io())
    .publish();

    Observer<Integer> observer = new Observer<Integer>() {
        @Override
        public void onNext(Integer i) {
            System.out.println("got "+i);
        }
        @Override
        public void onCompleted() {
            System.out.println("completed");
        }

        @Override
        public void onError(Throwable e) {
            e.printStackTrace();
        }
    };

    observable.subscribe(observer);
    observable.connect();

    latch.await();
}

This is what I've seen debugging RxJava's code I have found out the reason why it doesn't call the Observer's onNext() method but I don't understand it:

1.- s.onNext(1); is called:

2.- The execution gets to rx.internal.operators.OperatorObserveOn.ObserveOnSubscriber.pollQueue():

void pollQueue() {
    int emitted = 0;
    final AtomicLong localRequested = this.requested;
    final AtomicLong localCounter = this.counter;
    do {
        localCounter.set(1);
        long produced = 0;
        long r = localRequested.get();            
        for (;;) {
            ...
            System.out.println("R: "+r);
            if (r > 0) {
                Object o = queue.poll();
                if (o != null) {
                    child.onNext(on.getValue(o));
                    r--;

The problem is the value of r. The first time it executes its value is always 128. After each call it decrements by 1 (r--). This means that ConnectableObservable can only notify its observers 128 times when using both observeOn and subscribeOn. If I remove subscribeOn, r's value starts over each iteration and it works.

UPDATE:

I found a solution: the problem was caused by the order of the .observerOn().subscribeOn(). If I reverse it to .subscribeOn().observeOn() it works (I can see that the value of r is always reset to 128).

Anyway I'd appreciate an explanation.

Upvotes: 0

Views: 410

Answers (1)

akarnokd
akarnokd

Reputation: 69997

Many async operators use internal, fixed size buffers and rely on subscribers requesting requently. In your case, something doesn't request properly which I can't say what it is. I suggest trying your use case with standard components to see what could be wrong, i.e., you can replace your custom Observable with a PublishSubject + sample:

Subject<Integer, Integer> source = PublishSubject.<Integer>create().toSerialized();

ConnectableObservable<Integer> co = source.sample(
    500, TimeUnit.MILLISECONDS, Schedulers.io())
.onBackpressureBuffer().publish();

co.subscribe(yourSubscriber);
co.connect();

source.onNext(1);

Upvotes: 1

Related Questions