alexpfx
alexpfx

Reputation: 6700

Transform a Flowable of Single's<User> into a Flowable of List<User>

I have this Rx Stream that was used to group a list of UserScoreTO into a single User object (many-to-one relation).

public void execute() {
    getUsers()
            .flatMap(list -> Flowable.fromIterable(list))
            .groupBy(userScoreTO -> userScoreTO.id)
            .flatMap(groups -> Flowable.fromCallable(() -> groups.collect(User::new, (user, userscore) -> {
                user.id = userscore.id;
                user.name = userscore.name;
                user.totalScore += userscore.score;
            }))).subscribe(userSingle -> userSingle.subscribe(new SingleObserver<User>() {
        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("onSubscribe");
        }

        @Override
        public void onSuccess(User user) {
            System.out.println(user);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println(e);

        }
    }));

}

As you can see, to consume this flowable I subscribed to it, consuming some Single's that it emits. So I subscribe to that single's too. Although this even works, it's a bit annoying... I would like to make only one subscription and to consume a collection of users...

A few days ago I asked another question about this same code. the full code of that class is there.

Upvotes: 3

Views: 3127

Answers (1)

ESala
ESala

Reputation: 7058

Putting a flatMap at the end removes the need for that nested subscribe.

Example:

getUsers()
    .flatMap(list -> Flowable.fromIterable(list))
    .groupBy(userScoreTO -> userScoreTO.id)
    .flatMap(groups -> 
        Flowable.fromCallable(() -> 
            groups.collect(User::new, (user, userscore) -> {
                user.id = userscore.id;
                user.name = userscore.name;
                user.totalScore += userscore.score;
            }
    )))
    .flatMap(it -> it.toFlowable()) // <-- unwrap the singles
    .subscribe(user -> System.out.println(user));

Upvotes: 4

Related Questions