user4671628
user4671628

Reputation:

How to handle disposing in Observable RxJava

I am implementing Observable it should just download a file from network divided into multiple parts.

Here is my implementation

 private static class MultipartDownloadObservable extends Observable<String> implements Cancellable {
     @Override
        protected void subscribeActual(Observer<? super String> observer) {
            // Start loading
            subscriber = observer;
            progressBundle = new ProgressBundle<>();
            startDownloading();
        }

        @Override
        public void cancel() throws Exception {
            if (!lastDownloadingInfo.isFinish()) {
                okHttpClient.dispatcher().queuedCalls().stream().filter(call -> call.request().tag().equals(url)).forEach(Call::cancel);
            }
            if (subscriber != null) {
                subscriber.onComplete();
            }
        }
}

In my code I am creating an instance of this class and then call the dispose method on a Disposable object return from subscribe method.

 downloadSubscription.dispose();

However it throws an error.

java.io.InterruptedIOException: thread interrupted

How can I implement disposing of Observable correctly ? I've tried to implement Canceble interface, however the cancel method is not called.

Upvotes: 0

Views: 2919

Answers (1)

nhoxbypass
nhoxbypass

Reputation: 10152

Note that java.io.InterruptedIOException: thread interrupted not come from rx. It's an exception of java.io. For your dispose() action, rxJava is forced to un-subscribe and that interrupted thread during network request execution.

You can't really tell an observable to cancel a network request as rxJava has no knowledge of the code that is running inside of the Observable. What you can do is use the doOnUnsubscribe to check whether the network request is still running and if it is cancel it, then just unsubscribe when you want to cancel.

Upvotes: 2

Related Questions