Reputation: 1865
I want to know which item has been just emitted by a blocking long running observable which will emit thousands of items. The code below works, but it creates a huge buffer of coming from range()
.
sourceOservable
.zipWith(Observable.range(0, Integer.MAX_VALUE), (any, counter) -> counter)
.whatever(...)
Is there any way to avoid this behavior without introducing any external counter field?
Upvotes: 0
Views: 1215
Reputation: 4002
The buffer is due to the Observable.range. It may produce way faster than the sourceObservable. It has to buffer all values in order to zip with the right one from sourceObservable.
Please have a look at my implementation:
@Test
void stackoverflow44004014() {
Observable.just("i", "b", "c")
.scan(0, (counter, sourceValue) -> {
return ++counter;
})
.skip(1)
.test()
.assertResult(1, 2, 3);
}
Upvotes: 4