alla
alla

Reputation: 549

Invoke onNext in different threads (RxJava Android)

I receive an object (city) and want to dispatch this data in different threads (reflect on mobile screen and save to DB this city). This is my code:

    private CompositeSubscription subscriptions;
    ...
        @Override
            public void onAddButtonClick(String cityName) {
                Observable<City> obs = repository.getCity(cityName);
                if (obs != null){
                    Subscription subscription = obs
                            .subscribeOn(backgroundThread)
                            .observeOn(mainThread)
                            .subscribe(
                                    city -> {
                                        view.cityExists();
                                        repository.saveCityToDb(city);
                                    }

                            );

                    subscriptions.add(subscription);    
                } else {
                    view.showCouldNotFindCity();
                }    
            }

    @Override
    public void subscribe() {
        subscriptions = new CompositeSubscription();
    }

    @Override
    public void unsubscribe() {
        subscriptions.clear();
    }

So this method view.cityExists(); I want to run on UI thread and repository.saveCityToDb(city); on the background thread. How can I do it?

The method getCity(...); returns Observable<City>:

public Observable<City> getCity(String name){
        return fileRepository.getCityFromFile(name);
    }

Upvotes: 1

Views: 669

Answers (2)

R. Zag&#243;rski
R. Zag&#243;rski

Reputation: 20258

You might look for flatMap operator. It allows to make the operation in the same thread as the Observable (it transforms Observable).

Subscription subscription = obs
   .subscribeOn(backgroundThread)
   .flatMap(repository::saveCityToDb)
   .observeOn(mainThread)
   .subscribe(city -> view.cityExists());

Remember, that the function

repository::saveCityToDb

Must return something for the Observable to continue. I would suggest it to return the object Observable.just(city).

Upvotes: 0

Tassos Bassoukos
Tassos Bassoukos

Reputation: 16142

This should do the trick:

Subscription subscription = obs
       .subscribeOn(backgroundThread)
       .doOnNext(city -> view.cityExists())
       .observeOn(mainThread)
       .subscribe(repository::saveCityToDb);

Upvotes: 2

Related Questions