Reputation: 1171
I have a stream of objects emitted from an Observable.
Each time there is an emit I want to map it to a list of all emits so far,
I know I can do it by storing a global list and do it manualy, but I was wondering if there is an operator that does this out of the box.
a -> [a]
b -> [a,b]
c -> [a,b,c]
a -> [a,b,c,a]
d -> [a,b,c,a,d]
f -> [a,b,c,a,d,f]
b -> [a,b,c,a,d,f,b]
g -> [a,b,c,a,d,f,b,g]
when I use toList(). it only sends a single list once the stream completes.
EDIT 1: here is the way my flow looks right now:
source.itemStream()
.doOnNext( item -> handleItem())
.subscribe()
and what I would prefer is more along the lines of:
source.itemStream()
.someOperator() // this will map to a stream of all items so far.
.doOnNext( item -> handleItem())
.subscribe()
or alternatively
source.itemStream()
.compose(x -> listBuildingFunction(x)) // this will map to a stream of all items so far.
.doOnNext( item -> handleItem())
.subscribe()
Upvotes: 0
Views: 121
Reputation: 1171
using akarnokd's Answer above I did the following: (in Kotlin)
private fun toPartialList(): ObservableTransformer<Item>, List<Item>> {
return ObservableTransformer { observable ->
Observable.defer {
val tempList = ArrayList<Item>()
observable.map { item ->
tempList.add(item)
return@map ArrayList(tempList)
}
}
}
}
and then used it in my stream:
source.itemStream()
.compose(toPartialList())
.doOnNext { handleItems(it) }
.subscribe()
Upvotes: 1
Reputation: 69997
Collect the items into a list and emit a copy of that list:
Observable.defer(() -> {
List<Integer> seen = new LinkedList<>();
return Observable.range(1, 10)
.map(v -> {
seen.add(v);
return new ArrayList<>(seen);
});
})
.subscribe(System.out::println);
Upvotes: 1