Reputation: 2839
I've got a problem with understanding why the following code doesn't work. Am I doing something wrong or is it some kind of bug in RxJava2 implementation?
private Disposable savedDisposable;
@Test
public void test() {
final TestObserver<Integer> observer = new TestObserver<>();
Observable<Integer> t = Observable.just(10)
.delay(100, TimeUnit.MILLISECONDS)
.doOnSubscribe(disposable -> savedDisposable = disposable);
t.subscribe(observer);
savedDisposable.dispose(); //this doesn't work
//observer.dispose(); //this works
assertTrue(observer.isDisposed());
}
Upvotes: 12
Views: 1885
Reputation: 69997
To answer the posted question:
You are disposing in the middle thus the end Disposable
can't know about its upstream has been disposed because dispose()
calls always travel upstream.
There are the DisposableObserver
, ResourceObserver
, subscribeWith
and the lambda-subscribe()
methods that will get you a Disposable
object at the very end which you can dispose via dispose()
.
On the issue list though, it turned out the OP wanted an Observer
and Disposable
to be present on the consumer type and discovered that this can be achieved via constrained generics, for example:
public static <T, K extends Observer<T> & Disposable> K subscribe(
Observable<T> o, K observer) {
o.subscribe(observer);
return observer;
}
Upvotes: 13