Zackline
Zackline

Reputation: 834

RxJava - Nested Observables? (Retrofit)

I'm facing the problem that I need an authentication token to create my Retrofit service. I currently use an Observable to obtain said token, causing a rather ugly Observable construct:

Observable<MyService> observable = application.getMyService();
    observable.observeOn(AndroidSchedulers.mainThread())
            .subscribeOn(application.defaultSubscribeScheduler())
            .subscribe(new Subscriber<MyService>() {
                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable e) {
                    Log.e(TAG, "Error creating service: ", e);
                }

                @Override
                public void onNext(MyService myService) {
                    subscription = myService.searchStuff(searchFor)
                            .observeOn(AndroidSchedulers.mainThread())
                            .subscribeOn(application.defaultSubscribeScheduler())
                            .subscribe(new Subscriber<AResponseWrapper>() {
                                @Override
                                public void onCompleted() {

                                }

                                @Override
                                public void onError(Throwable error) {
                                    Log.e(TAG, "Error loading stuff: ", error);
                                }

                                @Override
                                public void onNext(AResponseWrapper wrapper) {
                                    MainPresenter.this.stuff = wrapper.getStuff();
                                }
                            });
                }
            });

I can't help but feel that this is not how it should be done. Am I right?

Upvotes: 2

Views: 1843

Answers (1)

Zackline
Zackline

Reputation: 834

The Observable.flatMap is what I was looking for. It allows mapping the result to another observable:

Observable<MyService> observable = application.getMyService();
    subscription = observable
            .observeOn(application.defaultSubscribeScheduler())
            .subscribeOn(application.defaultSubscribeScheduler())
            .flatMap(service -> service.searchStuff(searchFor))
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<AResponseWrapper>() {
                @Override
                public void onCompleted() {
                    if (series.size() < 1) {
                        mainView.showMessage(R.string.no_stuff_found);
                    } else {
                        mainView.showStuff(stuff);
                    }
                }

                @Override
                public void onError(Throwable error) {
                    Log.e(TAG, "Error loading stuff: ", error);
                }

                @Override
                public void onNext(AResponseWrapper wrapper) {
                    MainPresenter.this.stuff= wrapper.getStuff();
                }
            });

Note that I first observe on the IO Scheduler and only after the flatMap I'll subscribe on the main thread. Otherwise the service.searchStuff call (at least I think it's that part) would be executed on the Main thread, yielding a NetworkOnMainThreadException.

Thanks to @ahmed-ashraf-g who pointed me to this answer.

Upvotes: 1

Related Questions