Mani
Mani

Reputation: 777

Compose multiple network calls RxJava - Android

Help in composing multiple network calls and accumulate the result in Rxjava. (I am using in an Android application.)

State 
 -- List<City> cityList;

 City
 - cityId;

RestCall 1
Observable<State> stateRequest = restService.getStates();

RestCall 2
Observable<CityDetail> cityRequest = restService.getCityDetail(cityId);

In UI i have to display list of cities after getting all the details of each city and then show in the listview. How do i achieve the parllel network calls and accumulate the result. ?

I want all the city detail results to be put in List in source State 'object'. As state object has some information which need to be dislayed as well.Is this possible ?

stateRequest ??? 
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<State>() {
    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {
    }

    @Override
    public void onNext(State result) {
    // Get city list and display
    }
});

I checked this example which shows how we can zip more tha one observable response. Below snippet shows 3 observables combined. But in my case i have to make 20 network calls parallel or sequential ( i mean in background but one after another). How do i achieve this. Any help or directions ?

https://gist.github.com/skehlet/9418379

Observable.zip(f3Observable, f4Observable, f5Observable, new Func3<String, Integer, Integer, Map<String, String>>() {
    @Override
    public Map<String, String> call(String s, Integer integer, Integer integer2) {
        Map<String, String> map = new HashMap<String, String>();
        map.put("f3", s);
        map.put("f4", String.valueOf(integer));
        map.put("f5", String.valueOf(integer2));
        return map;
    }

Upvotes: 2

Views: 2682

Answers (3)

Andrew
Andrew

Reputation: 2886

Take a look at flatMap(Function.., BiFunction..). Maybe that's what you need.

statesRepository.getStates()
   .flatMap(states-> Observable.fromIterable(states))
   .flatMap(
           state-> cityRepository.getStateCities(state),
           (state, cityList) -> {
               state.setCities(cityList);
               return state;
           })
   .subscribe(state-> showStateWithCity(state));

Upvotes: 0

dwursteisen
dwursteisen

Reputation: 11515

I think that your code can be simplified to something like this, as your use of the zip operator is close to the use of toList operator

 stateRequest
 .subscribe(State state ->  {
     Observable.from(state.getCityList())
               .flatMap(City city -> restService.getCityDetail(city.getId())
               .toList()
               .subscribe(List<City> cities -> {

                     state.clear();
                     state.addAll(cities);
               });
     });

As RxJava doesn't provide a throttle operator, you may build something similar like this :

Observable<City> limiter = Observable.zip(Observable.interval(1, SECONDS), aCity, (i, c) -> c);

Using this, limiter is an observable that will emit a city each second.

So, with your code, if you want to limit call to getCityDetail for example :

 Observable<Object> limiter = Observable.interval(1, SECONDS);
 stateRequest
 .subscribe(State state ->  {
     Observable.zip(limiter, Observable.from(state.getCityList()), (i, c) -> c)
               .flatMap(City city -> restService.getCityDetail(city.getId())
               .toList()
               .subscribe(List<City> cities -> {

                     state.clear();
                     state.addAll(cities);
               });
     });

Upvotes: 3

Mani
Mani

Reputation: 777

stateRequest
.flatMap(new Func1<State, Observable<State>>() {
    @Override
    public Observable<State> call(final State state) {

        List<Observable> cityObservablesList = new ArrayList<Observable>();

        for(City city: state.getCityList()) {
            cityObservablesList.add(restService.getCityDetail(city.getId());
        }

        Observable cityObservables = Observable.from(cityObservablesList);
        return Observables.zip(cityObservables, new FuncN<State>() {
            @Override
            public State call(Object... args) {
                List<City> cityList = state.getCityList();
                cityList.clear();
                for(Object object: args) {
                    cityList.add((City)object);
                }

                return state;
            }
        })
    })
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<State>() {
    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {
    }

    @Override
    public void onNext(State result) {
    // Get city list and display
    }
});

I got it working with the help of zip operator and Iterable for city list as first parameter. But i face another issue. Since the zip executes the job in parallel, 10-15 network calls are executed in parallel and server rejecting with Maximum Query Per Second error (QPS - 403). How do i instruct the zip operator to execute the tasks one after another ?

I did solve this issue by adding a delay [delay(c*200, TimeUnit.MILLISECONDS))] to city observable. But doesn't seem like a proper solution.

Any advise ?

Upvotes: 1

Related Questions