Reputation: 103
I have a DisposableSubscriber to a Flowable. The Flowable runs for some timeUntilTimeout, but in some situations I need to kill it earlier. Right now I call .dispose() on the DisposableSubscriber but the Flowable continues to emit events. Eventually the Flowable times out and .doOnCancel() is called.
I have the following code:
private Disposable mDisposableSubscription = null;
public void start() {
mDisposableSubscription = getFlowable()
.timeout(timeUntilTimeout, TimeUnit.MILLISECONDS)
.subscribeWith(new DisposableSubscriber<T>() {
@Override
public void onComplete() {
}
@Override
public void onError(Throwable throwable) {
dispose();
}
@Override
public void onNext(T t) {
// Do something
}
});
}
public void stop() {
// Log "dispose"
mDisposableSubscription.dispose();
}
private Flowable<T> getFlowable() {
return Flowable.create(new FlowableOnSubscribe<T>() {
public void subscribe(FlowableEmitter<T> emitter) {
startSomeAsyncProcess();
}
}).doOnCancel(new Action() {
public void run() {
// Log "do on cancel"
stopSomeAsyncProcess();
}
});
}
Calling stop() to dispose of the DisposableSubscriber before the Flowable times out means events emitted by the Flowable are no longer handled, but the Flowable continues emitting events and the async process continues running. I was under the impression that calling .dispose() downstream of the Flowable kills the Flowable by calling .doOnCancel(), but this does not appear to be the case. What am I missing?
Upvotes: 0
Views: 1987
Reputation: 7058
The flowable is getting disposed, but you are not checking it on your Flowable.create
function, so what happens is that the startSomeAsyncProcess()
ignores it and keeps going.
To solve the issue, you should check the emitter.isDisposed()
flag to know if you should stop emitting.
Example:
Flowable<T> getFlowable() {
return Flowable.create(new FlowableOnSubscribe<T>() {
public void subscribe(FlowableEmitter<T> emitter) {
while(!emitter.isDisposed()) {
emitter.onNext(...);
}
}
});
}
If that startSomeAsyncProcess()
function doesn't allow you to check the flag, surely there is some way to cancel it. Then you can attach a cancellable:
Flowable<T> getFlowable() {
return Flowable.create(new FlowableOnSubscribe<T>() {
public void subscribe(FlowableEmitter<T> emitter) {
startSomeAsyncProcess();
emitter.setCancellable(() -> stopSomeAsyncProcess());
// I don't remember if it's setCancellable() or setDisposable()
}
});
}
Update: the methods setCancellable(...)
and setDisposable(...)
should behave equally, they just take different arguments.
Upvotes: 1