Reputation: 1307
I have a SyncService.
@Override
public int onStartCommand(Intent intent, int flags, final int startId) {
Timber.i("Starting sync...");
...
RxUtil.unsubscribe(mSubscription);
mSubscription = mDataManager.syncEvents()
.subscribeOn(Schedulers.io())
.subscribe(new Observer<Event>() {
@Override
public void onCompleted() {
Timber.i("Synced successfully!");
stopSelf(startId);
}
@Override
public void onError(Throwable e) {
Timber.w(e, "Error syncing.");
stopSelf(startId);
}
@Override
public void onNext(Event event) {
}
});
return START_STICKY;
}
Observable events = mDataManager.syncEvents() is an API call.
I want to do a parallel call:
Single userInfo = mDataManager.getUserInfo()
and call stopSelf(startId); after these two calls will finish.
How can I do it?
I tried RxJava Fetching Observables In Parallel but this is a little different case.
I think I have to use .zip or .merge method. But in my case one method call returns Observable (list on Events) and second Single (one UserInfo object).
I created z result class which could be a result of .zip method, but I don't know how to fill it:
public class SyncResponse {
List<Event> events;
UserInfo userInfo;
...
}
Upvotes: 3
Views: 3423
Reputation: 1162
Since you have just two observables you can combine them using android Pair to combine results.
mDataManager.syncEvents()
.zipWith(mDataManager.getUserInfo().toObservable(), Pair::create)
.subscribeOn(Schedulers.io())
.subscribe(pair -> {
Event event = pair.first;
UserInfo userInfo = pair.second;
// your code here
Timber.i("Synced successfully!");
stopSelf(startId);
}, throwable -> {
Timber.w(e, "Error syncing.");
stopSelf(startId);
});
You can do the same without java8/retrolmbda but why
If you need collect all events till completed and combine them all with single user info it would be little bit complicated
Observable<ArrayList<String>> eventsObservable = mDataManager.syncEvents()
.collect(ArrayList::new, ArrayList::add);
mDataManager.getUserInfo()
.zipWith(eventsObservable.toSingle(), Pair::create)
.subscribeOn(Schedulers.io())
.subscribe(pair -> {
UserInfo userInfo = pair.first;
List<Event> event = pair.second;
// your code here
Timber.i("Synced successfully!");
stopSelf(startId);
}, throwable -> {
Timber.w(e, "Error syncing.");
stopSelf(startId);
});
Upvotes: 2
Reputation: 1673
You can use observable.zip and single.toObservable to accomplish this. I believe you will need to also do your thread scheduling within the zip call on each observable separately to do them in parallel.
Observable.zip(call1.subscribeOn(Schedulers.io()), call2.toObservable().subscribeOn(Schedulers.io(), zip-function)
Since both calls are different wrapped types.
The zip-function is where you would use your class to combine your results.
It should be noted that since one of your observables is a single, this method with zip will only ever produce one result at most, as the single will send an onComplete event right after its first onNext event.
If you know your list of events will complete, you can use toList on call1 to buffer them and emit them as a single, collected list of events.
Upvotes: 0