Reputation: 1083
Recently I have been working on RxJava 2 and I have tested the Observable.interval()
subscription = Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
subscription.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
//binding.appBar.mainContent.msg.setText(aLong+"");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
Observable is started after activity's onCreate
method. And I am logging the output through onNext()
method. And I have a Stop Button. When it is triggered I want to stop subscription flow.
Even after the stop button is clicked the log keeps on going.
stop.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
if (subscription != null) {
subscription.unsubscribeOn(Schedulers.io());
}
}
});
Upvotes: 1
Views: 1830
Reputation: 62189
You have subscribed with an Observer
, which means you have to keep a reference to the actual Disposable
from onSubscribe(Disposable)
callback, and later perform Disposable#dispose()
on that object.
private Disposable disposable;
...
Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
new Observer() {
@Override public void onSubscribe(Disposable d) {
disposable = d;
}
// other callbacks here
});
disposable.dispose();
Instead you can change your subscription to following:
Disposable disposable = Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer() {
@Override public void accept(Long aLong) throws Exception {
// onNext
}
}, new Consumer() {
@Override public void accept(Throwable throwable) throws Exception {
// onError
}
}, new Action() {
@Override public void run() throws Exception {
// onComplete
}
});
disposable.dispose();
Upvotes: 3