Samuel Urbanowicz
Samuel Urbanowicz

Reputation: 834

RxJava callback for last onNext() execution finished

I'm new to RxJava. I would like to download some data for each TempoAccount entity from given collection and store it all in a map accountsWithProjects. When the code of last onNext(TempoAccount tempoAccount) is completed I'd like to call filterAccountsWithProjects(accountsWithProjects) method. Is there some simple way to achieve it?

private void getProjectsForEachTempoAccount(Collection<TempoAccount> tempoAccounts) {
    final Map<TempoAccount, Collection<TempoProject>> accountsWithProjects =
            new HashMap<>(tempoAccounts.size());
    Observable<TempoAccount> accountsObservable = Observable.from(tempoAccounts);
    accountsObservable
            .compose(ObservableUtils.applySchedulers())
            .subscribe(new ObserverAdapter<TempoAccount>() {
                @Override
                public void onError(Throwable e) {
                    view.notifyAboutError(e.getMessage());
                }

                @Override
                public void onNext(TempoAccount tempoAccount) {
                    jira.requestProjectsInfoForTempoAccount(String.valueOf(tempoAccount.getId()))
                            .subscribeOn(Schedulers.io())
                            .observeOn(AndroidSchedulers.mainThread())
                            .subscribe(new ObserverAdapter<Collection<TempoProject>>() {
                                @Override
                                public void onError(Throwable e) {
                                    view.notifyAboutError(e.getMessage());
                                }

                                @Override
                                public void onNext(Collection<TempoProject> projects) {
                                    accountsWithProjects.put(tempoAccount, projects);
                                }

                            });
                }

                @Override
                public void onCompleted() {
                    filterAccountsWithProjects(accountsWithProjects);
                }
            });
}

Problem: In the code above filterAccountsWithProjects(accountsWithProjects) is fired before all observables from onNext(TempoAccount tempoAccount) are completed.

Edit:

I want to create an Observable of such a type: Observable<Map<TempoAccount, Collection<TempoProject>>. I have two observables given:

  1. Observable<TempoAccount> accountsObservable = Observable.from(tempoAccounts)

  2. Observable<Collection<TempoProject>> projectsForAccountObservable = jira.requestProjectsInfoForTempoAccount(TempoAccount account)

So my questions is: can I connnect them somehow and create the map having these two observables.

Upvotes: 0

Views: 463

Answers (1)

Danail Alexiev
Danail Alexiev

Reputation: 7772

You should use the flatMap() function on your original stream to do the things you are currently doing in the onNext(). Also, you don't need to filter the stream in onComplete(). You could use filter() on the stream itself and deal with the problem in a more "Reactive" way.

Here is an example:

 accountsObservable
        .compose(ObservableUtils.applySchedulers())
        .map(tempoAccount -> new Pair<TempoAccount, Collection<TempoProject>>(tempoAccount, fetchInfoAccountForTempoAccount(tempoAccount)))
        .filter(pair -> hasProjects(pair))
        .toMap(pair -> pair.first(), pair -> pair.second)
        .subscribe(...)

EDIT:

Updated the suggested answer - you get the TempoAccounts, then you map each account to a Pair of account and collection of TempoProjects. You filter the pairs to see if you have any projects and then you use toMap() to create your desired result. Be aware, that to get toMap() working, your observables have to call onComplete() when the end of the stream is reached.

Upvotes: 1

Related Questions