Reputation: 2715
I have a list of transactions. Each transaction has currency and amount information among others. I want to create a list of holdings, so the current amount held by currency. I started with groupBy() and continued with reduce. It seems I have to subscribe before I can do anything with the results, because this gives me an error:
Observable.fromIterable(transactions)
.groupBy(Transaction::getCurrency)
.flatMap(t -> t.reduce(new Holding(t.getKey()), (holding, transaction) -> holding.addTransaction(transaction.getAmount()))
It says "no instance of type variable R exist so that Single conforms to ObservableSource< ? extends R>".
On the other hand if I try this:
Observable.fromIterable(transactions)
.groupBy(Transaction::getCurrency)
.subscribe((GroupedObservable<String, Transaction> r) -> r.reduce(new Holding(r.getKey()), (holding, transaction) -> holding.addTransaction(transaction.getAmount()))
.toObservable()
.subscribe(t -> {
//t is a single Holding.
}
));
I cannot get a list, because I already subscribed to the grouped stream. I could add it up, but I'm pretty sure there is a more elegant solution, but I cannot figure it out.
Solution based on akarnokd's answer:
Observable.fromIterable(transactions)
.groupBy(Transaction::getCurrency)
.flatMapSingle(Observable::toList)
.map(Holding::new)
.toList()
.subscribe(holdings -> {
whatever(holdings);
});
Upvotes: 2
Views: 3069
Reputation: 6952
This will work for sure
public Single<Map<Integer, List<Category>>> getSubCategoryListById(List<Category> categoryList) {
return Flowable.just(categoryList)
.flatMapIterable(new Function<List<Category>, Iterable<Category>>() {
@Override public Iterable<Category> apply(List<Category> categories) throws Exception {
return categories;
}
})
.filter(new Predicate<Category>() {
@Override public boolean test(Category category) throws Exception {
return category.parent_id != 0;
}
})
.groupBy(new Function<Category, Integer>() {
@Override public Integer apply(Category category) throws Exception {
return category.category_id;
}
})
.flatMapSingle(new Function<GroupedFlowable<Integer, Category>, Single<List<Category>>>() {
@Override public Single<List<Category>> apply(
GroupedFlowable<Integer, Category> integerCategoryGroupedFlowable) throws Exception {
return integerCategoryGroupedFlowable.toList();
}
})
.toMap(new Function<List<Category>, Integer>() {
@Override public Integer apply(List<Category> categories) throws Exception {
return categories.get(0).category_id;
}
});
}
Upvotes: 2
Reputation: 69997
(From my comment to the post):
Try flatMapSingle
in the upper case. Also, subscribing from within an onNext
handler is a bad practice as you lose the composition properties of RxJava.
Upvotes: 4
Reputation: 17429
As the documentation says, the reduce
function
applies a function to each item emitted by an Observable, sequentially, and emit the final value.
This is way you get a single value (actually for each Observable
of the group you get a single item).
You can defer your reduce
operation after you get a list. You could probably replace your first long subscribe
with this:
.subscribe(group -> group.toList()
Then you get some Observable
s based on the number of groups that you have, each emitting a single List
of your predefined type.
NOTE: not sure about it, but probably you can replace the first subscribe
with a flatMap
that transforms your GroupedObservable
into an Observable
that emit a list of items.
Upvotes: 0