Umang Bhola
Umang Bhola

Reputation: 97

Zip operator in RxJava is not working with Retrofit

I am trying to use zip operator in RxJava in android, where I am trying to execute 3 parallel API Calls to get their result together. But my zip operator is not producing result. The code for my sample problem is as follows:

Code for my gradle file

compile 'com.squareup.retrofit2:retrofit:2.0.2'
compile 'com.squareup.retrofit2:converter-gson:2.0.2'
compile 'com.squareup.retrofit2:adapter-rxjava:2.0.2'

I have also included this in my gradle file

exclude 'META-INF/rxjava.properties'

Code for my Retrofit Client

retrofit = new Retrofit.Builder().baseUrl(BASE_URL)
    .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
    .addConverterFactory(GsonConverterFactory.create())
    .client(client).build();

Code for my API Interface

public interface ApiInterface {
    @GET("/users/{UUID}/count.json")
    Observable<Count> getCountInfo(@Path("UUID") String UUID, @Query("store_id") String sort);
    @GET("v1/users/{UUID}.json")
    Observable<GetStatus> getState(@Path("UUID") String UUID);
    @GET("v1/user/{UUID}/points.json")
    Observable<Response> getResponse(@Path("UUID") String UUID);
}

Code for my Observables is

Retrofit repo = APIClient.getClient(baseUrl);
Observable<Count> userObservable = repo.create(ApiInterface.class)
    .getCount(userid,"1")
    .subscribeOn(Schedulers.newThread())
    .observeOn(Schedulers.io());
Observable<GetStatus> eventObservable = APIClient.getClient(baseUrl)
    .create(ApiInterface.class)
    .getState(userid)
    .subscribeOn(Schedulers.newThread())
    .observeOn(Schedulers.io());
Observable<Response> eventsObservable1 = APIClient
    .getClient(baseUrl)
    .create(ApiInterface.class)
    .getPoints(userid)
    .subscribeOn(Schedulers.newThread())
    .observeOn(Schedulers.io());

Code for my combined observable and zip operator is:

Observable<CommonSplashResponse> combined = Observable.zip(userObservable, eventsObservable, eventsObservable1,
new Func3<Count, GetStatus, Response, CommonResponse>() {
    @Override
    public CommonResponse call(Count count, GetStatus uStatus, 
        Response lResponse) {
        return new CommonResponse(count, uStatus, lResponse);
    }
});
combined.subscribe(new Subscriber<CommonSplashResponse>() {
    @Override
    public void onCompleted() {
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onNext(CommonResponse o) {
        LOG.info("Count Value is " + o.getCount());
        /**
        ***
        */
    }
});

The problem I am facing is that, the statements inside the onNext of the Combined Observable are not being executed. What could be the reason for glitch in execution? So I wanted to ask:

  1. Is there any issue in my dependencies?
  2. Should AndroidSchedulers.mainThread() be used instead of Schedulers.io()

Upvotes: 3

Views: 2589

Answers (1)

cdehning
cdehning

Reputation: 245

For the .zip() operator to emit anything, all zipped observables have to emit at least once. If one of your observables emits an error, or does not emit at all, you will never receive an onNext event.

  • For checking for error emissions, add logging or breakpoints into your onError within subscribe
  • For checking for missing emissions, you can add doOnNext and doOnCompleted calls with logging after all your zipped Observables and see which one does not emit

Cheers!

Upvotes: 1

Related Questions