Reputation: 31
I'm using RXJava on Android and trying to chain together multiple API calls and do something after both API calls have finished. My API calls all look similar to the code sample provided. Basically make the API call, write each record to the DB in onNext, and after all records have been written, update some cache. I want to fire off both of these calls asynchronously and then after both have hit onCompleted, then do something else. What's the proper way in RX to do this? I don't think I need zip as I don't need to tie together the different streams. I was thinking maybe merge, but my two API calls return a different type of Observable. Please let me know. Thanks.
getUsers()
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.flatMap(Observable::from)
.subscribe(new Subscriber<User>() {
@Override
public void onCompleted() {
updateUserCache();
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "Error loading users", e);
}
@Override
public void onNext(User user) {
insertUserToDB(user);
}
});
getLocations()
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.flatMap(Observable::from)
.subscribe(new Subscriber<Location>() {
@Override
public void onCompleted() {
updateLocationCache();
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "Error loading Locations", e);
}
@Override
public void onNext(Location location) {
insertLocationToDB(location);
}
});
Upvotes: 2
Views: 1797
Reputation: 31
In case someone needs it, here's the code I used based on R. Zagórski suggestion:
List<Observable<?>> observableList = new ArrayList<>();
observableList.add(
getUsers()
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.flatMap(Observable::from)
.doOnNext(user->insertUser(user))
.toList()
);
observableList.add(
getLocations()
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.flatMap(Observable::from)
.doOnNext(location->insertLocation(location))
.toList()
);
Observable.zip(observableList, new FuncN<Object>() {
@Override
public Observable<?> call(Object...args) {
return Observable.empty();
}).subscribe(new Subscriber<Object>() {
@Override
public void onCompleted() {
updateUserCache();
updateLocationCache();
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Object o) {
}
});
Upvotes: 1
Reputation: 4077
.zip() is the proper way to do it
you may want to make Retrofit return Single instead of Observable though
Upvotes: 0
Reputation: 20268
You are thinking correctly. You should use zip
operator.
Every one of your function should make a call, write to database and do everything you need. Theat zip
output function differently: when it is invoked, you can be sure all Observable
's has completed successfully -> just complete your reactive stream.
Create a list of Observable
:
List<Observable<?>> observableList = new ArrayList<>();
observableList.add(
getUsers()
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.flatMap(Observable::from)
.insertUserToDB(user)
.toList());
observableList.add(
getLocations()
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.flatMap(Observable::from)
.insertLocationToDB(location)
.toList());
Then zip
alll Observable
's:
Observable.zip(observableList, new FuncN<Object, Observable<?>>() {
@Override
public Observable<?> call(Object... args) {
return Observable.empty();
}
}).subscribe(new Subscriber<Object>() {
@Override
public void onCompleted() {
updateUserCache();
updateLocationCache();
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Object o) {
}
});
This is pseudocode, but I hope You understand the idea.
Upvotes: 1