Vineet Pandita
Vineet Pandita

Reputation: 3

OnSubscribe.call method invoked twice

Background: I am looking to invoke a web-service inside of call method of OnSubscribe. There is a custom Subscriber class which is a subscriber of this Observable as well. (Following code is an approximation of same)

Issue: Seeing that OnSubscribe.call method gets called twice.

The relation between 'Observable.create(...)', subscribe(...) and finally converting an observable toBlocking is not clear to me. It seems below line adds behavior and somehow calls OnSubscribe.call again. The toBlocking call I am assuming ultimately needs to be called - just before the result is due to be returned by the webserver (servlet/controller which are non Rx in nature).

observable.toBlocking().forEach(i-> System.out.println(i));

Complete Code Below

 public static void main(String args[]){
   Observable<Integer> observable =  Observable.create(new Observable.OnSubscribe<Integer>() {
       @Override
       public void call(Subscriber<? super Integer> observer) {
           System.out.println("In OnSubscribe Create: "+observer.isUnsubscribed());
           try {
               if (!observer.isUnsubscribed()) {
                   for (int i = 1; i < 5; i++) {
                      observer.onNext(i);
                   }
                   observer.onCompleted();
               }
           } catch (Exception e) {
               observer.onError(e);
           }
       }
});

 observable.subscribe(new Subscriber<Integer>() {
    @Override
    public void onNext(Integer item) {
        System.out.println("Next: " + item);
    }

    @Override
    public void onError(Throwable error) {
        System.err.println("Error: " + error.getMessage());
    }

    @Override
    public void onCompleted() {
        System.out.println("Sequence complete.");
    }
});
    observable.toBlocking().forEach(i-> System.out.println(i));
}

Output

In OnSubscribe Create: false
Next: 1
Next: 2
Next: 3
Next: 4
Sequence complete.
In OnSubscribe Create: false
For Each Printing 1
For Each Printing 2
For Each Printing 3
For Each Printing 4

Answering own question: The way I was processing results via forEach was wrong. That in itself is a subscriber and hence ends up calling the call method again. Correct way to do what I was doing was via a CountDownLatch.

This also clarifies my other doubt - in the context of having a point where we finally wait for the work to complete just before returning a respsonse in a controller/servlet would mean to use a latch with timeout.

Code below.

public static void main(String args[]) throws Exception {
    CountDownLatch latch = new CountDownLatch(4);
    Observable<Integer> observable =  Observable.create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> observer) {
            System.out.println("In OnSubscribe Create: "+observer.isUnsubscribed());
            try {
                if (!observer.isUnsubscribed()) {
                    for (int i = 1; i < 5; i++) {
                        observer.onNext(i);
                        latch.countDown();

                    }
                    observer.onCompleted();
                }
            } catch (Exception e) {
                observer.onError(e);
            }
        }
    });

    observable.subscribe(new Subscriber<Integer>() {
        @Override
        public void onNext(Integer item) {
            System.out.println("Next: " + item);
        }

        @Override
        public void onError(Throwable error) {
            System.err.println("Error: " + error.getMessage());
        }

        @Override
        public void onCompleted() {
            System.out.println("Sequence complete.");
        }
    });

    latch.await();
    System.out.println(" Wait for Latch Over");
}

Upvotes: 0

Views: 1840

Answers (2)

Simon Basl&#233;
Simon Basl&#233;

Reputation: 28351

toBlocking can be dumbed down to making a subscription with a latch, hence your two subscriptions.

But I think that you were right in using toBlocking().forEach(), and should keep that part as the solution.

The problem then is your second subscription, made by the call to subscribe. Is there a real need for this subscribe? If it's just about logging and not actually modifying the flow of data, you can use doOnNext, doOnError and doOnCompleted instead.

If your use case is a bit more complex than reflected in your question and you actually do need to wait on two "parallel" asynchronous processings, then you may need to join and wait using a CountDownLatch (or similar)... Keep in mind that if you want parallelism, RxJava is agnostic in that regard and you need to drive it using Schedulers, subscribeOn/observeOn.

Upvotes: 0

Malt
Malt

Reputation: 30335

This might not be all that relevant since you already solved the problem in your case, but creating observables using raw OnSubscribe is error prone in general. The better approach is using one of the existing helper classes: SyncOnSubscribe/AsyncOnSubscribe (since RxJava 1.0.15) or AbstractOnSubscribe (up to RxJava v1.1.0). They give you a ready made framework that manages the observable's state and backpressure, leaving you to deal (mostly) with your own logic.

Upvotes: 2

Related Questions