Reputation: 14409
I'm making a simple weather app to learn RxAndroid and I'm faced with the following issue.
I first load cities I'm interested in and then ask for the weather of each one of them.
getCitiesUseCase returns an Observable<List<City>>
that I load from the data base. I send that list of cities to my view to display them and then ask for the weather individually (flatmap) inside the subscriber.
Subscription subscription = getCitiesUseCase.execute().flatMap(new Func1<List<City>, Observable<City>>() {
@Override
public Observable<City> call(List<City> cities) {
citiesView.addCities(cities);
return Observable.from(cities);
}
}).subscribe(new Subscriber<City>() {
@Override
public void onCompleted() {
subscriptions.remove(this);
this.unsubscribe();
}
@Override
public void onError(Throwable e) {
Log.e(this.getClass().getSimpleName(), e.toString());
}
@Override
public void onNext(City city) {
getCityWeatherUseCase.setLatLon(city.getLat().toString(), city.getLon().toString(), city.getId());
getCityWeather(city);
}
});
subscriptions.add(subscription);
Now the getCityWeather() method looks like this:
private void getCityWeather(final City city) {
subscriptions.add(getCityWeatherUseCase.execute().subscribe(new Subscriber<CityWeather>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
Log.e("error", e.toString());
}
@Override
public void onNext(CityWeather cityWeather) {
city.setCityWeather(cityWeather);
citiesView.updateCity(city);
}
}));
}
Everything works fine and as expected, but the fact that I'm subscribing to an observer inside a subcriber doesnt feel right. I know rxJava lets you play around with subscribers to prevent this kind of things but I really dont know how to improve my code further. Keep in mind that I need a city in order to ask for its weather. Merry chrismas!
Upvotes: 0
Views: 74
Reputation: 2281
One approach could be the following. (I'm using retrolambda - so wherever you see ->
, just replace with a new anonymous inner class).
Note that I'm using flatMap
to spin up the weather data requests, rather than Observable.concat
like your question suggests. The reason for this is that your scheduler (e.g. io()
) will handle these in parallel and send the results through when they are available. However, with Observable.concat
, these requests would be serialized so they'd be forced to happen one at a time - nullifying the benefits of a thread pool like io()
.
private class City {
public String name;
public City(String name) {
this.name = name;
}
public void setWeather(Weather weather) { /*...*/ }
}
private class Weather {
public String status;
public Weather(String status) {
this.status = status;
}
}
private Observable<Weather> getWeather(City city) {
// call your weather API here..
return Observable.just(new Weather("Sunny"));
}
@Test
public void test() {
Observable<List<City>> citiesObs = Observable.create(new Observable.OnSubscribe<List<City>>() {
@Override
public void call(Subscriber<? super List<City>> subscriber) {
// do work
final List<City> cities = new ArrayList<>();
cities.add(new City("Paris"));
cities.add(new City("Tokyo"));
cities.add(new City("Oslo"));
// send results
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(cities);
subscriber.onCompleted();
}
}
});
Observable<City> obs = citiesObs
// inject a side effect
.doOnNext(list -> {
// pass `list` to your view here
})
// turn Observable<Iterable<T>> into Observable<T>
.flatMapIterable(list -> list)
// Map a city to an observable that fetches Weather data
// Your scheduler can take care of these at once.
.flatMap(city -> {
return getWeather(city)
// another side effect
.doOnNext(weather -> {
city.setWeather(weather);
})
// map baack to city, just for the heck of it
.map($ -> city);
});
TestSubscriber sub = TestSubscriber.create();
obs.subscribe(sub);
sub.awaitTerminalEvent();
sub.assertValueCount(3);
}
Also note that in order to take advantage of io()
, you'd need to add a call to subscribeOn(Schedulers.io())
to tell the observable to begin doing work on the io
thread pool. When you want to pass control to another thread, for example your view, you could insert a observeOn(AndroidSchedulers.mainThread())
before your side-effect (or mapping). If you want to bounce control back to the background thread(s) for your weather calls, you could then add another call to observeOn(Schedulers.io())
right before you flatMap
to getWeather(City)
.
Upvotes: 1