Reputation: 9925
Assuming i have a hot observable of lists of some items;
Observable<List<Item>> observable = ...;
I need to convert it to single items stream and perform some operation on each item, like filtering, after that i should convert it back to list and process it in onNext
method of subscriber:
observable.flatMap(Observable::from)
.filter(Item::isFiltered)
.toList()
.subscribe(this::onNext, this::onError)
public void onNext(List<Item> items) {...}
From the first look it seems ok, but it isn't, because our observable is hot, so toList() won't be ever executed (because it waits for source observable completion) and whole stream stucks.
How can i resolve this problem? Also please note that near the filter
could be any amount of additional operations over single item.
Upvotes: 4
Views: 1931
Reputation: 2487
You can do all of your operations on single items and your final toList
operator on the Observable you create in flatMap
.. That way, you receive onComplete call and toList
will collect and transform the items.
observable.flatMap(list -> {
return Observable.from(list)
.filter(Item::isFiltered)
.toList()
})
.subscribe(this::onNext, this::onError)
Upvotes: 3