masztalski
masztalski

Reputation: 212

Cancel execuction of Observable (RxJava 1)

I have created Observable with subscripiton:

private Subscription mSearchSubscription = null;

mSearchSubscription = Observable.fromCallable(() -> {
    many network requests
}).subscribe(doing sth);

When my activity is destroyed I am executing following line:

 if (mSearchSubscription != null){
        mSearchSubscription.unsubscribe();
    }

According to informations found in internet this line should cancel all code execution from this block. But it doesn't :( All network requests inside this block are wrapped Observables with subscribe so when one request is finished next one is executing and so on. As I noticed after unsubscribe is executed and activity is destroyed code inside Observable is still executing.

Is there any better way to stop it? I am using in this project RxJava1

Update: I have MVC architecture with controller Injected into Activity. By overriding onDestroy from activity I call method in controller to unsubscribe. This is whole code. Every network request inside block fromCallable(()-> ) is like this mWebService.getSth(body parameters).subscribe(doSth) (in interface this return Observable ) . I am not expecting to stop during execution of internal observable, but stop executing this block of code, just like return. This is wrapped because of many requests needed to be executed one by one and return result to activity when everything is ready.

Upvotes: 0

Views: 491

Answers (1)

akarnokd
akarnokd

Reputation: 69997

You haven't given much details on how and why you do that, so my best guess-suggestion is to use create:

Observable.create(emitter -> {
    CompositeSubscription resources = new CompositeSubscription();
    emitter.setSubscription(resources);

    if (resources.isUnsubscribed()) {
        return;
    }

    // add these if you want to cancel the inner sources
    // resources.add(

    mWebService.getFirst(/* params */).subscribe(/* postprocess */);

    // );

    if (resources.isUnsubscribed()) {
        return;
    }
    mWebService.getSecond(/* params */).subscribe(/* postprocess */);

    if (resources.isUnsubscribed()) {
        return;
    }

    mWebService.getThird(/* params */).subscribe(/* postprocess */);

    emitter.onNext(/* the result object*/
    emitter.onComplete();
}, Emitter.BackpressureMode.BUFFER)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(/* work with the data */);

Upvotes: 1

Related Questions