Marcin Kunert
Marcin Kunert

Reputation: 6285

RxJava: How to wait for all subscriptions to complete?

I am fairly new to rxJava, trying stuff by my own. I would like to get some advice if I'm doing it right.

Usecase: On the first run of my app, after a successful login I have to download and save in a local database several dictionaries for the app to run with. The user has to wait till the downloading process finishes.

Current solution: I am using retrofit 2 with rxjava adapter in order to get the data. I am bundling all Observables into one using the zip operator. After all downloads are done the callback triggers and saving into database begins.

Nothing speaks better than some code:

Observable<List<OrderType>> orderTypesObservable = backendService.getOrderTypes();
Observable<List<OrderStatus>> orderStatusObservable = mockBackendService.getOrderStatuses();
Observable<List<Priority>> prioritiesObservable = backendService.getPriorities();

return Observable.zip(orderTypesObservable,
        orderStatusObservable,
        prioritiesObservable,
        (orderTypes, orderStatuses, priorities) -> {
            orderTypeDao.deleteAll();
            orderTypeDao.insertInTx(orderTypes);
            orderStatusDao.deleteAll();
            orderStatusDao.insertInTx(orderStatuses);
            priorityDao.deleteAll();
            priorityDao.insertInTx(priorities);

            return null;
        });

Questions:

Should I use the zip operator or is there a better one to fit my cause?

It seems a bit messy doing it this way. This is only a part of the code, I have currently 12 dictionaries to load. Is there a way to refactor it?

I would like to insert a single dictionary data as soon as it finishes downloading and have a retry mechanism it the download fails. How can I achieve that?

Upvotes: 7

Views: 8286

Answers (2)

Maksim Ostrovidov
Maksim Ostrovidov

Reputation: 11058

I think in your case it's better to use Completable, because for you matter only tasks completion.

Completable getAndStoreOrderTypes = backendService.getOrderTypes()
    .doOnNext(types -> *store to db*)
    .toCompletable();

Completable getAndStoreOrderStatuses = backendService.getOrderStatuses()
    .doOnNext(statuses -> *store to db*)
    .toCompletable();

Completable getAndStoreOrderPriorities = backendService.getOrderPriorities()
    .doOnNext(priorities -> *store to db*)
    .toCompletable();

return Completable.merge(getAndStoreOrderTypes, 
                         getAndStoreOrderStatuses, 
                         getAndStoreOrderPriorities);

If you need serial execution - use Completable.concat() instead of merge()

a retry mechanism if the download fails

Use handy retry() operator

Upvotes: 7

Sergey Zabelnikov
Sergey Zabelnikov

Reputation: 1955

It is not good, to throw null value object into Rx Stream (in zip your return null, it is bad). Try to not doing that.

In your case, you have 1 api call and 2 actions to save response into the database, so you can create the chain with flatMap.

It will look like:

backendService.getOrderTypes()
.doOnNext(savingToDatabaseLogic) 
.flatMap(data -> mockBackendService.getOrderStatuses())
.doOnNext(...)
.flatMap(data -> backendService.getPriorities())
.doOnNext(...)

if you want to react on error situation, in particular, observable, you can add onErrorResumeNext(exception->Observable.empty()) and chain will continue even if something happened

Also, you can create something like BaseDao, which can save any Dao objects.

Upvotes: 1

Related Questions