Raj
Raj

Reputation: 1083

Stop Observable from loop

Recently I have been working on RxJava 2 and I have tested the Observable.interval()

subscription = Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io()).observeOn(AndroidSchedulers.mainThread());

subscription.subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Long aLong) {
                //binding.appBar.mainContent.msg.setText(aLong+"");
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

Observable is started after activity's onCreate method. And I am logging the output through onNext() method. And I have a Stop Button. When it is triggered I want to stop subscription flow.

Even after the stop button is clicked the log keeps on going.

stop.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                if (subscription != null) {
                    subscription.unsubscribeOn(Schedulers.io());
                }
            }
        });

Upvotes: 1

Views: 1830

Answers (1)

azizbekian
azizbekian

Reputation: 62189

You have subscribed with an Observer, which means you have to keep a reference to the actual Disposable from onSubscribe(Disposable) callback, and later perform Disposable#dispose() on that object.



    private Disposable disposable;

    ...

    Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(
            new Observer() {
              @Override public void onSubscribe(Disposable d) {
                disposable = d;
              }

            // other callbacks here

            });

    disposable.dispose();


Instead you can change your subscription to following:



    Disposable disposable = Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer() {
          @Override public void accept(Long aLong) throws Exception {
            // onNext
          }
        }, new Consumer() {
          @Override public void accept(Throwable throwable) throws Exception {
            // onError
          }
        }, new Action() {
          @Override public void run() throws Exception {
            // onComplete
          }
        });

    disposable.dispose();


Upvotes: 3

Related Questions