Reputation:
I am implementing Observable
it should just download a file from network divided into multiple parts.
Here is my implementation
private static class MultipartDownloadObservable extends Observable<String> implements Cancellable {
@Override
protected void subscribeActual(Observer<? super String> observer) {
// Start loading
subscriber = observer;
progressBundle = new ProgressBundle<>();
startDownloading();
}
@Override
public void cancel() throws Exception {
if (!lastDownloadingInfo.isFinish()) {
okHttpClient.dispatcher().queuedCalls().stream().filter(call -> call.request().tag().equals(url)).forEach(Call::cancel);
}
if (subscriber != null) {
subscriber.onComplete();
}
}
}
In my code I am creating an instance of this class and then call the dispose
method on a Disposable
object return from subscribe method.
downloadSubscription.dispose();
However it throws an error.
java.io.InterruptedIOException: thread interrupted
How can I implement disposing of Observable
correctly ? I've tried to implement Canceble
interface, however the cancel
method is not called.
Upvotes: 0
Views: 2919
Reputation: 10152
Note that java.io.InterruptedIOException: thread interrupted
not come from rx. It's an exception of java.io. For your dispose()
action, rxJava is forced to un-subscribe and that interrupted thread during network request execution.
You can't really tell an observable to cancel a network request as rxJava has no knowledge of the code that is running inside of the Observable. What you can do is use the doOnUnsubscribe
to check whether the network request is still running and if it is cancel it, then just unsubscribe when you want to cancel.
Upvotes: 2