purcha
purcha

Reputation: 371

Thread sleep into Schedulers.io - RxJava

I'm testing RxJava at the moment, and I get confused.
I placed method with Thread.sleep(5000)

into Schedulers.io()

CompositeDisposable compositeDisposable = new CompositeDisposable();
        compositeDisposable.add(longOperation()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeWith(new DisposableCompletableObserver() {
            @Override
            public void onComplete() {
                customToast("long operation done");
            }

            @Override
            public void onError(Throwable e) {

            }
        }));

I read, we should move long operation into schedulers.io because we can't freeze UI thread, but in this case, I got freeze UI. What is wrong ?
longOperation() have Thread.sleep(5000) inside.

//after edit Callable

private void doSomething() throws InterruptedException {
        CompositeDisposable compositeDisposable = new CompositeDisposable();
        compositeDisposable.add(Observable.fromCallable(()-> longOperation())
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeWith(new DisposableCompletableObserver() {
            @Override
            public void onComplete() {
                customToast("long operation done");
            }

            @Override
            public void onError(Throwable e) {

            }
        }));
    }

  private Completable longOperation() throws InterruptedException {
        Thread.sleep(5000);
        return Completable.complete();
    }

Upvotes: 0

Views: 705

Answers (1)

EpicPandaForce
EpicPandaForce

Reputation: 81539

You should be using defer() so that it actually executes on the scheduler instead of when you're trying to create the completable.

private void doSomething() throws InterruptedException {
    CompositeDisposable compositeDisposable = new CompositeDisposable();
    compositeDisposable.add(Completable.defer(new Callable<CompletableSource>() {
        @Override
        public CompletableSource call() throws Exception {
            return longOperation();
        }
    }).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeWith(new DisposableCompletableObserver() {
                @Override
                public void onComplete() {
                    customToast("long operation done");
                }

                @Override
                public void onError(Throwable e) {

                }
            }));
}

private Completable longOperation() throws InterruptedException {
    Thread.sleep(5000);
    return Completable.complete();
}

Upvotes: 2

Related Questions