Reputation: 834
I'm new to RxJava. I would like to download some data for each TempoAccount
entity from given collection and store it all in a map accountsWithProjects
. When the code of last onNext(TempoAccount tempoAccount)
is completed I'd like to call filterAccountsWithProjects(accountsWithProjects)
method. Is there some simple way to achieve it?
private void getProjectsForEachTempoAccount(Collection<TempoAccount> tempoAccounts) {
final Map<TempoAccount, Collection<TempoProject>> accountsWithProjects =
new HashMap<>(tempoAccounts.size());
Observable<TempoAccount> accountsObservable = Observable.from(tempoAccounts);
accountsObservable
.compose(ObservableUtils.applySchedulers())
.subscribe(new ObserverAdapter<TempoAccount>() {
@Override
public void onError(Throwable e) {
view.notifyAboutError(e.getMessage());
}
@Override
public void onNext(TempoAccount tempoAccount) {
jira.requestProjectsInfoForTempoAccount(String.valueOf(tempoAccount.getId()))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new ObserverAdapter<Collection<TempoProject>>() {
@Override
public void onError(Throwable e) {
view.notifyAboutError(e.getMessage());
}
@Override
public void onNext(Collection<TempoProject> projects) {
accountsWithProjects.put(tempoAccount, projects);
}
});
}
@Override
public void onCompleted() {
filterAccountsWithProjects(accountsWithProjects);
}
});
}
Problem: In the code above filterAccountsWithProjects(accountsWithProjects)
is fired before all observables from onNext(TempoAccount tempoAccount)
are completed.
Edit:
I want to create an Observable of such a type: Observable<Map<TempoAccount, Collection<TempoProject>>
.
I have two observables given:
Observable<TempoAccount> accountsObservable = Observable.from(tempoAccounts)
Observable<Collection<TempoProject>> projectsForAccountObservable = jira.requestProjectsInfoForTempoAccount(TempoAccount account)
So my questions is: can I connnect them somehow and create the map having these two observables.
Upvotes: 0
Views: 463
Reputation: 7772
You should use the flatMap()
function on your original stream to do the things you are currently doing in the onNext()
. Also, you don't need to filter the stream in onComplete()
. You could use filter()
on the stream itself and deal with the problem in a more "Reactive" way.
Here is an example:
accountsObservable
.compose(ObservableUtils.applySchedulers())
.map(tempoAccount -> new Pair<TempoAccount, Collection<TempoProject>>(tempoAccount, fetchInfoAccountForTempoAccount(tempoAccount)))
.filter(pair -> hasProjects(pair))
.toMap(pair -> pair.first(), pair -> pair.second)
.subscribe(...)
EDIT:
Updated the suggested answer - you get the TempoAccount
s, then you map each account to a Pair
of account and collection of TempoProject
s. You filter the pairs to see if you have any projects and then you use toMap()
to create your desired result. Be aware, that to get toMap()
working, your observables have to call onComplete()
when the end of the stream is reached.
Upvotes: 1