krupal.agile
krupal.agile

Reputation: 815

How to wrap observable inside another observable in RxJava?

I want to wrap retrofit api call in another method where I can additionally show/hide loader, check for network etc. As my api returns observable, the way I ended up is below:

private <T> Observable<T> request(final Observable<T> apiCall, final ViewManager viewManager) {
    return Observable.create(new Action1<Emitter<T>>() {
        @Override
        public void call(final Emitter<T> emitter) {
            if (!NetworkUtils.isConnected(context)) {
                emitter.onError(new ConnectException("network not connected"));
                return;
            }
            viewManager.showLoader();
            apiCall.subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Observer<T>() {
                        @Override
                        public void onCompleted() {
                            viewManager.hideLoader();
                            emitter.onCompleted();
                        }

                        @Override
                        public void onError(Throwable e) {
                            viewManager.hideLoader();
                            emitter.onError(e);
                        }

                        @Override
                        public void onNext(T response) {
                            emitter.onNext(response);
                        }
                    });
        }
    }, Emitter.BackpressureMode.BUFFER);
}

Is this a standard way of dealing with the problem? How do you wrap an observable inside another observable? Can anyone guide?

Upvotes: 2

Views: 4062

Answers (1)

yosriz
yosriz

Reputation: 10267

the idiomatic way with reactive extensions is to use composition, and this is one of the great powers of RX.

first let's define the desired behaviors using operators, what you want is something like this:

apiCall
  .observeOn(AndroidSchedulers.mainThread())
  .startWith(Observable.defer(() -> {
      if (!NetworkUtils.isConnected(context)) {
          return Observable.error(new ConnectException("network not connected"));
      } else {
          return Observable.empty();
      }
  }))
  .doOnSubscribe(() -> viewManager.showLoader())
  .doOnCompleted(() -> viewManager.hideLoader())
  .doOnError(throwable -> viewManager.hideLoader());

now, for composing it to any network apiCall Observable, you can use compose() operator and encapsulate this logic into Transformer for that:

class CustomTransformer<T> implements Observable.Transformer<T, T> {

    private final ViewManager viewManager;
    private final Context context;

    CustomTransformer(ViewManager viewManager, Context context) {
        this.viewManager = viewManager;
        this.context = context;
    }

    @Override
    public Observable<T> call(Observable<T> apiCall) {
        return apiCall
                .observeOn(AndroidSchedulers.mainThread())
                .startWith(Observable.defer(() -> {
                    if (!NetworkUtils.isConnected(context)) {
                        return Observable.error(new ConnectException("network not connected"));
                    } else {
                        return Observable.empty();
                    }
                }))
                .doOnSubscribe(() -> viewManager.showLoader())
                .doOnCompleted(() -> viewManager.hideLoader())
                .doOnError(throwable -> viewManager.hideLoader());
        ;
    }
}

then you can compose it with any network Observable:

someRetrofitQuery
   .compose(new CustomTransformer<>(viewManager, context))
    ...
   .subscribe();

Upvotes: 2

Related Questions