Vasili Fedotov
Vasili Fedotov

Reputation: 1171

RxJava2, how to emit a list of all emits so far on each emit

I have a stream of objects emitted from an Observable.

Each time there is an emit I want to map it to a list of all emits so far,

I know I can do it by storing a global list and do it manualy, but I was wondering if there is an operator that does this out of the box.

a -> [a]
b -> [a,b]
c -> [a,b,c]
a -> [a,b,c,a]
d -> [a,b,c,a,d]
f -> [a,b,c,a,d,f]
b -> [a,b,c,a,d,f,b]
g -> [a,b,c,a,d,f,b,g]

when I use toList(). it only sends a single list once the stream completes.

EDIT 1: here is the way my flow looks right now:

source.itemStream()
     .doOnNext( item -> handleItem())
     .subscribe()

and what I would prefer is more along the lines of:

source.itemStream()
     .someOperator() // this will map to a stream of all items so far.
     .doOnNext( item -> handleItem())
     .subscribe()

or alternatively

source.itemStream()
     .compose(x -> listBuildingFunction(x)) // this will map to a stream of all items so far.
     .doOnNext( item -> handleItem())
     .subscribe()

Upvotes: 0

Views: 121

Answers (2)

Vasili Fedotov
Vasili Fedotov

Reputation: 1171

using akarnokd's Answer above I did the following: (in Kotlin)

private fun toPartialList(): ObservableTransformer<Item>, List<Item>> {
    return ObservableTransformer { observable ->
        Observable.defer { 
            val tempList = ArrayList<Item>()
            observable.map { item ->
                tempList.add(item)
                return@map ArrayList(tempList)
            }
        }
    }
}

and then used it in my stream:

source.itemStream()
     .compose(toPartialList()) 
     .doOnNext { handleItems(it) }
     .subscribe()

Upvotes: 1

akarnokd
akarnokd

Reputation: 69997

Collect the items into a list and emit a copy of that list:

Observable.defer(() -> {
    List<Integer> seen = new LinkedList<>();
    return Observable.range(1, 10)
           .map(v -> {
                seen.add(v);
                return new ArrayList<>(seen);
           });
})
.subscribe(System.out::println);

Upvotes: 1

Related Questions