Reputation: 2665
I have three Integer observers like below :
First Observer :
private Observer<Integer> getFirstObserver() {
return new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(LOG_TAG, "onNext First " + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
}
Second Observer :
private Observer<Integer> getSecondObserver() {
return new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(LOG_TAG, "onNext Second " + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
}
ThirdObserver :
private Observer<Integer> getThirdObserver() {
return new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(LOG_TAG, "onNext Third " + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
}
Now if I fun the following code :
void asyncSubjectDemo1() {
Observable<Integer> observable = Observable.just(1, 2, 3, 4);
PublishSubject<Integer> asyncSubject = PublishSubject.create();
observable.subscribe(asyncSubject);
asyncSubject.subscribe(getFirstObserver());
asyncSubject.subscribe(getSecondObserver());
asyncSubject.subscribe(getThirdObserver());
}
Nothing is printed in the Logcat as expected from the documentation
PublishSubject emits to an observer only those items that are emitted by the source Observable(s) subsequent to the time of the subscription.
But if I add observeOn while creating the Observable like below and run it
void asyncSubjectDemo1() {
Observable<Integer> observable = Observable.just(1, 2, 3, 4).observeOn(AndroidSchedulers.mainThread());
PublishSubject<Integer> asyncSubject = PublishSubject.create();
observable.subscribe(asyncSubject);
asyncSubject.subscribe(getFirstObserver());
asyncSubject.subscribe(getSecondObserver());
asyncSubject.subscribe(getThirdObserver());
}
The following is the output
D/MY_LOG: onNext First 1
D/MY_LOG: onNext Second 1
D/MY_LOG: onNext Third 1
D/MY_LOG: onNext First 2
D/MY_LOG: onNext Second 2
D/MY_LOG: onNext Third 2
Why is there any ambiguity in such a case ?
Upvotes: 1
Views: 1560
Reputation: 69997
Please read the Javadoc of PublishSubject: http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/subjects/PublishSubject.html
"a PublishSubject doesn't retain/cache items, therefore, a new Observer won't receive any past items."
In the first case, you subscribe a PublishSubject
to a synchronous source and thus right at the time, all items go through before the execution even gets to asyncSubject.subscribe(getFirstObserver());
In the second case, the source is now scheduled and when you subscribe a PublishSubject
to it, you create a window or race (depending on where the method executes) so the asyncSubject.subscribe(getFirstObserver());
etc. have a chance to subscribe to the subject in time and thus receive the items later on.
Upvotes: 1