Reputation: 777
Help in composing multiple network calls and accumulate the result in Rxjava. (I am using in an Android application.)
State
-- List<City> cityList;
City
- cityId;
RestCall 1
Observable<State> stateRequest = restService.getStates();
RestCall 2
Observable<CityDetail> cityRequest = restService.getCityDetail(cityId);
In UI i have to display list of cities after getting all the details of each city and then show in the listview. How do i achieve the parllel network calls and accumulate the result. ?
I want all the city detail results to be put in List in source State 'object'. As state object has some information which need to be dislayed as well.Is this possible ?
stateRequest ???
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<State>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(State result) {
// Get city list and display
}
});
I checked this example which shows how we can zip more tha one observable response. Below snippet shows 3 observables combined. But in my case i have to make 20 network calls parallel or sequential ( i mean in background but one after another). How do i achieve this. Any help or directions ?
https://gist.github.com/skehlet/9418379
Observable.zip(f3Observable, f4Observable, f5Observable, new Func3<String, Integer, Integer, Map<String, String>>() {
@Override
public Map<String, String> call(String s, Integer integer, Integer integer2) {
Map<String, String> map = new HashMap<String, String>();
map.put("f3", s);
map.put("f4", String.valueOf(integer));
map.put("f5", String.valueOf(integer2));
return map;
}
Upvotes: 2
Views: 2682
Reputation: 2886
Take a look at flatMap(Function.., BiFunction..). Maybe that's what you need.
statesRepository.getStates()
.flatMap(states-> Observable.fromIterable(states))
.flatMap(
state-> cityRepository.getStateCities(state),
(state, cityList) -> {
state.setCities(cityList);
return state;
})
.subscribe(state-> showStateWithCity(state));
Upvotes: 0
Reputation: 11515
I think that your code can be simplified to something like this, as your use of the zip
operator is close to the use of toList
operator
stateRequest
.subscribe(State state -> {
Observable.from(state.getCityList())
.flatMap(City city -> restService.getCityDetail(city.getId())
.toList()
.subscribe(List<City> cities -> {
state.clear();
state.addAll(cities);
});
});
As RxJava doesn't provide a throttle
operator, you may build something similar like this :
Observable<City> limiter = Observable.zip(Observable.interval(1, SECONDS), aCity, (i, c) -> c);
Using this, limiter is an observable that will emit a city each second.
So, with your code, if you want to limit call to getCityDetail
for example :
Observable<Object> limiter = Observable.interval(1, SECONDS);
stateRequest
.subscribe(State state -> {
Observable.zip(limiter, Observable.from(state.getCityList()), (i, c) -> c)
.flatMap(City city -> restService.getCityDetail(city.getId())
.toList()
.subscribe(List<City> cities -> {
state.clear();
state.addAll(cities);
});
});
Upvotes: 3
Reputation: 777
stateRequest
.flatMap(new Func1<State, Observable<State>>() {
@Override
public Observable<State> call(final State state) {
List<Observable> cityObservablesList = new ArrayList<Observable>();
for(City city: state.getCityList()) {
cityObservablesList.add(restService.getCityDetail(city.getId());
}
Observable cityObservables = Observable.from(cityObservablesList);
return Observables.zip(cityObservables, new FuncN<State>() {
@Override
public State call(Object... args) {
List<City> cityList = state.getCityList();
cityList.clear();
for(Object object: args) {
cityList.add((City)object);
}
return state;
}
})
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<State>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(State result) {
// Get city list and display
}
});
I got it working with the help of zip operator and Iterable for city list as first parameter. But i face another issue. Since the zip executes the job in parallel, 10-15 network calls are executed in parallel and server rejecting with Maximum Query Per Second error (QPS - 403). How do i instruct the zip operator to execute the tasks one after another ?
I did solve this issue by adding a delay [delay(c*200, TimeUnit.MILLISECONDS))] to city observable. But doesn't seem like a proper solution.
Any advise ?
Upvotes: 1