Reputation: 3
Background: I am looking to invoke a web-service inside of call method of OnSubscribe. There is a custom Subscriber class which is a subscriber of this Observable as well. (Following code is an approximation of same)
Issue: Seeing that OnSubscribe.call method gets called twice.
The relation between 'Observable.create(...)', subscribe(...) and finally converting an observable toBlocking is not clear to me. It seems below line adds behavior and somehow calls OnSubscribe.call again. The toBlocking call I am assuming ultimately needs to be called - just before the result is due to be returned by the webserver (servlet/controller which are non Rx in nature).
observable.toBlocking().forEach(i-> System.out.println(i));
Complete Code Below
public static void main(String args[]){
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> observer) {
System.out.println("In OnSubscribe Create: "+observer.isUnsubscribed());
try {
if (!observer.isUnsubscribed()) {
for (int i = 1; i < 5; i++) {
observer.onNext(i);
}
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
});
observable.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});
observable.toBlocking().forEach(i-> System.out.println(i));
}
Output
In OnSubscribe Create: false
Next: 1
Next: 2
Next: 3
Next: 4
Sequence complete.
In OnSubscribe Create: false
For Each Printing 1
For Each Printing 2
For Each Printing 3
For Each Printing 4
Answering own question: The way I was processing results via forEach was wrong. That in itself is a subscriber and hence ends up calling the call method again. Correct way to do what I was doing was via a CountDownLatch.
This also clarifies my other doubt - in the context of having a point where we finally wait for the work to complete just before returning a respsonse in a controller/servlet would mean to use a latch with timeout.
Code below.
public static void main(String args[]) throws Exception {
CountDownLatch latch = new CountDownLatch(4);
Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> observer) {
System.out.println("In OnSubscribe Create: "+observer.isUnsubscribed());
try {
if (!observer.isUnsubscribed()) {
for (int i = 1; i < 5; i++) {
observer.onNext(i);
latch.countDown();
}
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
});
observable.subscribe(new Subscriber<Integer>() {
@Override
public void onNext(Integer item) {
System.out.println("Next: " + item);
}
@Override
public void onError(Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onCompleted() {
System.out.println("Sequence complete.");
}
});
latch.await();
System.out.println(" Wait for Latch Over");
}
Upvotes: 0
Views: 1840
Reputation: 28351
toBlocking
can be dumbed down to making a subscription with a latch, hence your two subscriptions.
But I think that you were right in using toBlocking().forEach()
, and should keep that part as the solution.
The problem then is your second subscription, made by the call to subscribe
. Is there a real need for this subscribe
? If it's just about logging and not actually modifying the flow of data, you can use doOnNext
, doOnError
and doOnCompleted
instead.
If your use case is a bit more complex than reflected in your question and you actually do need to wait on two "parallel" asynchronous processings, then you may need to join and wait using a CountDownLatch
(or similar)... Keep in mind that if you want parallelism, RxJava is agnostic in that regard and you need to drive it using Schedulers
, subscribeOn
/observeOn
.
Upvotes: 0
Reputation: 30335
This might not be all that relevant since you already solved the problem in your case, but creating observables using raw OnSubscribe is error prone in general. The better approach is using one of the existing helper classes: SyncOnSubscribe/AsyncOnSubscribe (since RxJava 1.0.15) or AbstractOnSubscribe (up to RxJava v1.1.0). They give you a ready made framework that manages the observable's state and backpressure, leaving you to deal (mostly) with your own logic.
Upvotes: 2