Reputation: 6700
I have this Rx Stream that was used to group a list of UserScoreTO into a single User object (many-to-one relation).
public void execute() {
getUsers()
.flatMap(list -> Flowable.fromIterable(list))
.groupBy(userScoreTO -> userScoreTO.id)
.flatMap(groups -> Flowable.fromCallable(() -> groups.collect(User::new, (user, userscore) -> {
user.id = userscore.id;
user.name = userscore.name;
user.totalScore += userscore.score;
}))).subscribe(userSingle -> userSingle.subscribe(new SingleObserver<User>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onSuccess(User user) {
System.out.println(user);
}
@Override
public void onError(Throwable e) {
System.out.println(e);
}
}));
}
As you can see, to consume this flowable I subscribed to it, consuming some Single's
that it emits. So I subscribe to that single's too. Although this even works, it's a bit annoying... I would like to make only one subscription and to consume a collection of users...
A few days ago I asked another question about this same code. the full code of that class is there.
Upvotes: 3
Views: 3127
Reputation: 7058
Putting a flatMap
at the end removes the need for that nested subscribe
.
Example:
getUsers()
.flatMap(list -> Flowable.fromIterable(list))
.groupBy(userScoreTO -> userScoreTO.id)
.flatMap(groups ->
Flowable.fromCallable(() ->
groups.collect(User::new, (user, userscore) -> {
user.id = userscore.id;
user.name = userscore.name;
user.totalScore += userscore.score;
}
)))
.flatMap(it -> it.toFlowable()) // <-- unwrap the singles
.subscribe(user -> System.out.println(user));
Upvotes: 4