Robert Lewis
Robert Lewis

Reputation: 1907

Behavior of Observables and Observers in RxJava2

It's confusing enough that the .subscribe() method returns void if you pass it an Observer, but Disposable when passed anything else. I realize this has to do with the Reactive-Streams spec, but still…

Observer provides the .onSubscribe( Disposable ) method, but as I read the ReactiveX Observable contract, this method may or may not be called when the Observer subscribes. Is this true for RxJava2? [It seems that it is only required to be called by Flowable, which uses it to notify the Subscriber that it's ready to accept requests.]

I've read that .subscribeWith( Observer ) somehow addresses this issue but I'm having trouble seeing how. Evidently you can pass DisposableObserver, which implements Disposable, but what exactly is the .dispose() method supposed to do?

Upvotes: 1

Views: 335

Answers (1)

akarnokd
akarnokd

Reputation: 70017

this method may or may not be called when the Observer subscribes. Is this true for RxJava2?

The protocol definitions in each RxJava base class are quite clear:

Flowable via Publisher:

onSubscribe onNext* (onError | onComplete)?

Observable:

onSubscribe onNext* (onError | onComplete)?

Single:

onSubscribe (onSuccess | onError)?

Maybe:

onSubscribe (onSuccess | onError | onComplete)?

Completable:

onSubscribe (onError | onComplete)?

onSubscribe is mandatory, even in never().

I've read that .subscribeWith( Observer ) somehow addresses this issue

The definition is S subscribeWith(S observer) where S extends Observer<? super T>. It simply returns the observer or subclass of an observer provided to it.

but what exactly is the .dispose() method supposed to do?

Disposes the Disposable sent through Observer.onSubscribe in a thread-safe manner. In alignment, the DisposableSubscriber cancels the Subscription received through Subscriber.onSubscribe.

Upvotes: 1

Related Questions