Reputation: 1411
Ive been reading a few docs for backpressure in RxJava, but i cant find a detailed explanation like how does it happen internally in the Library, everyone just summarize it like "producer" is too fast and "consumer" is too slow.
For example like the code below:
Observable.interval(1, TimeUnit.MILLISECONDS)
.observeOn(Schedulers.newThread())
.subscribe(
i -> {
System.out.println(i);
try {
Thread.sleep(100);
} catch (Exception e) { }
},
System.out::println);
Ive been going through the RxJava source code , so my understanding is that in main thread we are gonna emit events in every milliseconds and once we emit it we pass the value to the System.out.println(i) method and throw it into the newThead scheduler's thread pool and run the method inside a runnable.
So my question is , how does the exception happen internally? Cause when we call Thread.sleep(), we are just sleeping the thread that handles the method call -> System.out.println() without effecting other threads in the thread pool, how come it will cause an exception. Is it because the thread pool doesnt have enough available threads anymore?
Thanks
Upvotes: 3
Views: 581
Reputation: 69997
You can think of backpressure as a system of permits one operator hands out to its upstream source: you can give me 128 elements. A bit later this operator may say "okay, give me another 96" so there could be in total 224 permits outstanding. Some sources, such as interval
don't care about permits and just hands out values in a periodic manner. Since the number of permits is usually strongly tied to available capacity in a queue or buffer, handing out more than these storages can hold yields MissingBackpressureException
.
Detecting backpressure violation happens mainly when an offer
to a bounded queue returns false, such as the one in observeOn
indicating the queue is full.
The second way of detecting violations is by tracking the outstanding permit count in an operator, such as onBackpressureDrop
and whenever the upstream sends more than this, the operator simply won't forward it:
// in onBackpressureDrop
public void onNext(T value) {
if (emitted != availablePermits) {
emitted++;
child.onNext(value);
} else {
// ignoring this value
}
}
The child subscriber signals its permits via request() that usually leads to something like this in onBackpressureDrop
:
public void childRequested(long n) {
availablePermits += n;
}
In practice, due to possible asynchronous execution, availablePermits
is an AtomicLong
(and is called requested
).
Upvotes: 8