Herrbert74
Herrbert74

Reputation: 2715

How to get a list after groupBy in RxJava 2?

I have a list of transactions. Each transaction has currency and amount information among others. I want to create a list of holdings, so the current amount held by currency. I started with groupBy() and continued with reduce. It seems I have to subscribe before I can do anything with the results, because this gives me an error:

Observable.fromIterable(transactions)
            .groupBy(Transaction::getCurrency)
            .flatMap(t -> t.reduce(new Holding(t.getKey()), (holding, transaction) -> holding.addTransaction(transaction.getAmount()))

It says "no instance of type variable R exist so that Single conforms to ObservableSource< ? extends R>".

On the other hand if I try this:

Observable.fromIterable(transactions)
            .groupBy(Transaction::getCurrency)
            .subscribe((GroupedObservable<String, Transaction> r) -> r.reduce(new Holding(r.getKey()), (holding, transaction) -> holding.addTransaction(transaction.getAmount()))
                    .toObservable()
                    .subscribe(t -> {
                                //t is a single Holding.
                            }
                    ));

I cannot get a list, because I already subscribed to the grouped stream. I could add it up, but I'm pretty sure there is a more elegant solution, but I cannot figure it out.

Solution based on akarnokd's answer:

Observable.fromIterable(transactions)
            .groupBy(Transaction::getCurrency)
            .flatMapSingle(Observable::toList)
            .map(Holding::new)
            .toList()
            .subscribe(holdings -> {
                whatever(holdings);
            });

Upvotes: 2

Views: 3069

Answers (3)

Raja Jawahar
Raja Jawahar

Reputation: 6952

This will work for sure

public Single<Map<Integer, List<Category>>> getSubCategoryListById(List<Category> categoryList) {

    return Flowable.just(categoryList)
        .flatMapIterable(new Function<List<Category>, Iterable<Category>>() {
          @Override public Iterable<Category> apply(List<Category> categories) throws Exception {
            return categories;
          }
        })
        .filter(new Predicate<Category>() {
          @Override public boolean test(Category category) throws Exception {
            return category.parent_id != 0;
          }
        })
        .groupBy(new Function<Category, Integer>() {
          @Override public Integer apply(Category category) throws Exception {
            return category.category_id;
          }
        })
        .flatMapSingle(new Function<GroupedFlowable<Integer, Category>, Single<List<Category>>>() {
          @Override public Single<List<Category>> apply(
              GroupedFlowable<Integer, Category> integerCategoryGroupedFlowable) throws Exception {
            return integerCategoryGroupedFlowable.toList();
          }
        })
        .toMap(new Function<List<Category>, Integer>() {
          @Override public Integer apply(List<Category> categories) throws Exception {
            return categories.get(0).category_id;
          }
        });
  }

Upvotes: 2

akarnokd
akarnokd

Reputation: 69997

(From my comment to the post):

Try flatMapSingle in the upper case. Also, subscribing from within an onNext handler is a bad practice as you lose the composition properties of RxJava.

Upvotes: 4

GVillani82
GVillani82

Reputation: 17429

As the documentation says, the reduce function

applies a function to each item emitted by an Observable, sequentially, and emit the final value.

This is way you get a single value (actually for each Observable of the group you get a single item).

You can defer your reduce operation after you get a list. You could probably replace your first long subscribe with this:

.subscribe(group -> group.toList()

Then you get some Observables based on the number of groups that you have, each emitting a single List of your predefined type.

NOTE: not sure about it, but probably you can replace the first subscribe with a flatMap that transforms your GroupedObservable into an Observable that emit a list of items.

Upvotes: 0

Related Questions