Sam
Sam

Reputation: 31

Do something after multiple observables have completed

I'm using RXJava on Android and trying to chain together multiple API calls and do something after both API calls have finished. My API calls all look similar to the code sample provided. Basically make the API call, write each record to the DB in onNext, and after all records have been written, update some cache. I want to fire off both of these calls asynchronously and then after both have hit onCompleted, then do something else. What's the proper way in RX to do this? I don't think I need zip as I don't need to tie together the different streams. I was thinking maybe merge, but my two API calls return a different type of Observable. Please let me know. Thanks.

    getUsers()
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.io())
            .flatMap(Observable::from)
            .subscribe(new Subscriber<User>() {
                @Override
                public void onCompleted() {
                   updateUserCache();
                }

                @Override
                public void onError(Throwable e) {
                    Log.e(TAG, "Error loading users", e);
                }

                @Override
                public void onNext(User user) {
                    insertUserToDB(user);
                }
            });

    getLocations()
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.io())
            .flatMap(Observable::from)
            .subscribe(new Subscriber<Location>() {
                @Override
                public void onCompleted() {
                   updateLocationCache();
                }

                @Override
                public void onError(Throwable e) {
                    Log.e(TAG, "Error loading Locations", e);
                }

                @Override
                public void onNext(Location location) {
                    insertLocationToDB(location);
                }
            });  

Upvotes: 2

Views: 1797

Answers (3)

Sam
Sam

Reputation: 31

In case someone needs it, here's the code I used based on R. Zagórski suggestion:

    List<Observable<?>> observableList = new ArrayList<>();
    observableList.add(
            getUsers()
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.io())
                .flatMap(Observable::from)
                .doOnNext(user->insertUser(user))
                .toList()
    );
    observableList.add(
            getLocations()
                    .subscribeOn(Schedulers.io())
                    .observeOn(Schedulers.io())
                    .flatMap(Observable::from)
                    .doOnNext(location->insertLocation(location))
                    .toList()
    );

    Observable.zip(observableList, new FuncN<Object>() {
        @Override
        public Observable<?> call(Object...args) {
            return Observable.empty();
        }).subscribe(new Subscriber<Object>() {
            @Override
            public void onCompleted() {
                updateUserCache();
                updateLocationCache();
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Object o) {

            }
        });

Upvotes: 1

Maxim Volgin
Maxim Volgin

Reputation: 4077

.zip() is the proper way to do it

you may want to make Retrofit return Single instead of Observable though

Upvotes: 0

R. Zag&#243;rski
R. Zag&#243;rski

Reputation: 20268

You are thinking correctly. You should use zip operator.

Every one of your function should make a call, write to database and do everything you need. Theat zip output function differently: when it is invoked, you can be sure all Observable's has completed successfully -> just complete your reactive stream.

Create a list of Observable:

List<Observable<?>> observableList = new ArrayList<>();
observableList.add(
        getUsers()
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.io())
            .flatMap(Observable::from)
            .insertUserToDB(user)
            .toList());

observableList.add(
        getLocations()
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.io())
            .flatMap(Observable::from)
            .insertLocationToDB(location)
            .toList());

Then zip alll Observable's:

Observable.zip(observableList, new FuncN<Object, Observable<?>>() {
    @Override
    public Observable<?> call(Object... args) {
        return Observable.empty();
    }
}).subscribe(new Subscriber<Object>() {
    @Override
    public void onCompleted() {
        updateUserCache();
        updateLocationCache();
    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onNext(Object o) {

    }
});

This is pseudocode, but I hope You understand the idea.

Upvotes: 1

Related Questions