Mariusz
Mariusz

Reputation: 1865

RxJava: count emitted elements during emitting

I want to know which item has been just emitted by a blocking long running observable which will emit thousands of items. The code below works, but it creates a huge buffer of coming from range().

sourceOservable
.zipWith(Observable.range(0, Integer.MAX_VALUE), (any, counter) -> counter)
.whatever(...)

Is there any way to avoid this behavior without introducing any external counter field?

Upvotes: 0

Views: 1215

Answers (1)

Sergej Isbrecht
Sergej Isbrecht

Reputation: 4002

The buffer is due to the Observable.range. It may produce way faster than the sourceObservable. It has to buffer all values in order to zip with the right one from sourceObservable.

Please have a look at my implementation:

@Test
void stackoverflow44004014() {
    Observable.just("i", "b", "c")
            .scan(0, (counter, sourceValue) -> {
                return ++counter;
            })
            .skip(1)
            .test()
            .assertResult(1, 2, 3);
}

Upvotes: 4

Related Questions