Reputation: 852
In the example below I want my subscriber to initially request two items from the observable and then every 5 seconds request two more:
public class RxJavaExample {
public static void main(String[] args) {
Observable.range(1, 6)
.flatMap(testData -> {
System.out.println("next item");
return Observable.just(testData);
}, 1)
.toBlocking()
.subscribe(new Subscriber<Integer>() {
public void onStart() {
System.out.println("requesting\n\n");
request(2);
Observable.interval(5, SECONDS)
.subscribe(
x -> {
System.out.println("requesting\n\n");
request(2);
});
}
@Override
public void onCompleted() {
System.out.println("done");
}
@Override
public void onError(Throwable e) {
System.err.println(e);
}
@Override
public void onNext(Integer testData) {
System.out.println("OnNext");
}
});
}
}
In this case I would expect the output to be:
requesting
next item
next item
OnNext
OnNext
requesting
next item
next item
OnNext
OnNext
requesting
next item
next item
OnNext
OnNext
done
But this is the actual output I get:
next item
requesting
next item
next item
OnNext
OnNext
requesting
next item
OnNext
OnNext
next item
requesting
next item
OnNext
OnNext
done
It seems like the Observable still emits one item before my subscriber requests anything. Why does this happen and is there a way to ensure the Observable only emits items once my subscriber requests them?
My example is rather contrived as I could just use map
instead of flatMap
which will give me the behaviour I want but for my real-world use case I do need to use flatMap
.
Upvotes: 1
Views: 705
Reputation: 10267
That is happens because of flatMap()
operator, which adds request to the producer (source range()
Observable) depending on the concurrency level requested, in your case you ask for concurrency of 1, so flatMap
will call request(1)
on the range Observable
, thus the producer will produce single item before the subscriber requests anything.
Note, that you have 2 buffers here, although the flatMap()
had requested item before the subscriber,it will just cause the range() to produce item, but you will not get not requested item at your onNext()
, meaning your onNext sequence is as expected, it is just the range()
that generates items in different manner.
BTW, in the default case of flatMap()
meaning without concurrency limit, it will request infinite items, and with this case all the items will be generated immediately, not just one as your case.
Upvotes: 1